मैं एक इनपुट फ़ाइल में पंक्तियों की संख्या गिनने की कोशिश कर रहा हूं और मैं टेम्पलेट बनाने के लिए क्लाउड डेटाफ्लो रनर का उपयोग कर रहा हूं। नीचे दिए गए कोड में, मैं फ़ाइल को GCS बकेट से पढ़ रहा हूं, इसे संसाधित कर रहा हूं और फिर आउटपुट को Redis उदाहरण में संग्रहीत कर रहा हूं।

लेकिन मैं इनपुट फ़ाइल की पंक्तियों की संख्या गिनने में असमर्थ हूँ।

मुख्य कक्षा

 public static void main(String[] args) {
    /**
     * Constructed StorageToRedisOptions object using the method PipelineOptionsFactory.fromArgs to read options from command-line
     */
    StorageToRedisOptions options = PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(StorageToRedisOptions.class);

    Pipeline p = Pipeline.create(options);
    p.apply("Reading Lines...", TextIO.read().from(options.getInputFile()))
            .apply("Transforming data...",
                    ParDo.of(new DoFn<String, String[]>() {
                        @ProcessElement
                        public void TransformData(@Element String line, OutputReceiver<String[]> out) {
                            String[] fields = line.split("\\|");
                            out.output(fields);
                        }
                    }))
            .apply("Processing data...",
                    ParDo.of(new DoFn<String[], KV<String, String>>() {
                        @ProcessElement
                        public void ProcessData(@Element String[] fields, OutputReceiver<KV<String, String>> out) {
                            if (fields[RedisIndex.GUID.getValue()] != null) {

                                out.output(KV.of("firstname:"
                                        .concat(fields[RedisIndex.FIRSTNAME.getValue()]), fields[RedisIndex.GUID.getValue()]));

                                out.output(KV.of("lastname:"
                                        .concat(fields[RedisIndex.LASTNAME.getValue()]), fields[RedisIndex.GUID.getValue()]));

                                out.output(KV.of("dob:"
                                        .concat(fields[RedisIndex.DOB.getValue()]), fields[RedisIndex.GUID.getValue()]));

                                out.output(KV.of("postalcode:"
                                        .concat(fields[RedisIndex.POSTAL_CODE.getValue()]), fields[RedisIndex.GUID.getValue()]));

                            }
                        }
                    }))
            .apply("Writing field indexes into redis",
            RedisIO.write().withMethod(RedisIO.Write.Method.SADD)
                    .withEndpoint(options.getRedisHost(), options.getRedisPort()));
    p.run();

}

नमूना इनपुट फ़ाइल

xxxxxxxxxxxxxxxx|bruce|wayne|31051989|444444444444
yyyyyyyyyyyyyyyy|selina|thomas|01051989|222222222222
aaaaaaaaaaaaaaaa|clark|kent|31051990|666666666666

पाइपलाइन निष्पादित करने का आदेश

mvn compile exec:java \
  -Dexec.mainClass=com.viveknaskar.DataFlowPipelineForMemStore \
  -Dexec.args="--project=my-project-id \
  --jobName=dataflow-job \
  --inputFile=gs://my-input-bucket/*.txt \
  --redisHost=127.0.0.1 \
  --stagingLocation=gs://pipeline-bucket/stage/ \
  --dataflowJobFile=gs://pipeline-bucket/templates/dataflow-template \
  --runner=DataflowRunner"

मैंने StackOverflow Solution से नीचे दिए गए कोड का उपयोग करने का प्रयास किया है, लेकिन यह मेरे काम नहीं करता है।

PipelineOptions options = ...;
DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options);
Pipeline p = Pipeline.create(options);
PCollection<Long> countPC =
    p.apply(TextIO.Read.from("gs://..."))
     .apply(Count.<String>globally());
DirectPipelineRunner.EvaluationResults results = runner.run(p);
long count = results.getPCollection(countPC).get(0);

मैं अपाचे बीम दस्तावेज के माध्यम से भी गया हूं लेकिन कुछ भी उपयोगी नहीं मिला। इस पर किसी भी मदद की वास्तव में सराहना की जाएगी।

3
viveknaskar 17 सितंबर 2020, 21:15

2 जवाब

सबसे बढ़िया उत्तर

पाइपलाइन द्वारा फ़ाइल को पढ़ने के बाद मैंने Count.globally() को जोड़कर और PCollection<String> पर आवेदन करके इस समस्या का समाधान किया।

मैंने नीचे दिया गया कोड जोड़ा है:

PCollection<String> lines = p.apply("Reading Lines...", TextIO.read().from(options.getInputFile()));

 lines.apply(Count.globally()).apply("Count the total records", ParDo.of(new RecordCount()));

जहां मैंने एक नया वर्ग (RecordCount.java) बनाया है जो DoFn को बढ़ाता है जो सिर्फ गिनती को लॉग करता है।

RecordCount.java

import org.apache.beam.sdk.transforms.DoFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecordCount extends DoFn<Long, Void> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RecordCount.class);

    @ProcessElement
    public void processElement(@Element Long count) {
       LOGGER.info("The total number of records in the input file is: ", count);

        }
    }

}
4
viveknaskar 19 सितंबर 2020, 12:14

ऐसा करने का उचित तरीका है कि एक बीम कनेक्टर (या बीम पारडो का उपयोग करके) का उपयोग करके स्टोरेज सिस्टम में गिनती लिखें। पाइपलाइन परिणाम मुख्य कार्यक्रम के लिए सीधे उपलब्ध नहीं है क्योंकि बीम धावक संगणना को समानांतर कर सकता है और निष्पादन एक ही कंप्यूटर में नहीं हो सकता है।

उदाहरण के लिए (छद्म कोड):

    p.apply(TextIO.Read.from("gs://..."))
     .apply(Count.<String>globally())
     .apply(ParDo(MyLongToStringParDo()))
     .apply(TextIO.Write.to("gs://..."));

यदि आपको मुख्य कार्यक्रम में सीधे आउटपुट को संभालने की आवश्यकता है, तो आप बीम प्रोग्राम समाप्त होने के बाद क्लाइंट लाइब्रेरी का उपयोग करके जीसीएस से पढ़ सकते हैं (इस मामले में p.run().waitUntilFinish() निर्दिष्ट करना सुनिश्चित करें)। वैकल्पिक रूप से, आप अपनी गणना (जिसे गिनती की आवश्यकता है) को बीम PTransform में स्थानांतरित कर सकते हैं और अपनी पाइपलाइन का वह हिस्सा बना सकते हैं।

2
chamikara 18 सितंबर 2020, 22:23