मेरे पास एक काफ्का विषय है ग्राहक जिसमें मैंने स्ट्रीम किया था a सीएसवी फ़ाइल। अब क्या CSV
गिनती. जैसे अगर गिनती कम 20 से कम है तो इसे विषय A पर भेजें और यदि गिनती अधिक है तो इसे विषय B पर भेजें
मैं काफ्का के लिए नया हूं और मैं इस तरह कोशिश कर रहा था लेकिन यह काम नहीं कर रहा था

 builder.stream("Customer")
            .groupByKey()
            .count()
            .toStream()
            .filter((k,v)-> String(v) > 20)
            .to("test_A");

यह कोड गलत है, मुझे इसके बारे में पूरा यकीन है, लेकिन कृपया कोई मेरी मदद कर सकता है

0
Fury Fazu 1 जिंदा 2021, 14:26

3 जवाब

सबसे बढ़िया उत्तर

सरल उपाय यह है कि एक निर्माता बनाएं और अपनी इनपुट सीएसवी फ़ाइल से एक बार में एक पंक्ति पढ़ें और गणना की स्थिति के अनुसार इसे विशेष विषय पर भेजें।

psuedo code:
while no row left in csv:
    row = readrow()
    if row.counts<20
       producer.send(topicA,row)
    else
       producer.send(topicB,row)

आप https://towardsdatascience.com/kafka को फ़ॉलो कर सकते हैं अजगर के साथ काफ्का को आसानी से समझने के लिए -पायथन-एक्सप्लेन्ड-इन-10-लाइन्स-ऑफ-कोड-800e3e07dad1

1
Mr.Danish Mukhtar Zargar 1 जिंदा 2021, 20:55

अभी आप जो कर रहे हैं वह एक शर्त के आधार पर रिकॉर्ड के केवल एक हिस्से को एक विषय पर भेजना है और दूसरे को छोड़ देना है। यदि आप उस का एक हिस्सा एक विषय पर और दूसरे को दूसरे विषय पर भेजना चाहते हैं तो आपको शाखा संचालक का उपयोग करना चाहिए।

कुछ इस तरह:

KStream<K, Long>[] branches = builder.stream("Customer")
        .groupByKey()
        .count()
        .toStream()
        .branch((k, v) -> v > 20),
                (k, v) -> v <= 20);
branches[0].to("topicB");
branches[1].to("topicA");

एक और बात जिस पर आपको ध्यान देना चाहिए वह यह है कि तुलना संख्याओं के बीच होनी चाहिए, न कि स्ट्रिंग्स के बीच क्योंकि जब आप स्ट्रिंग्स के साथ अधिक तुलनित्र का उपयोग करते हैं तो आप स्ट्रिंग्स की लंबाई और लेक्सिकोग्राफिकल ऑर्डर की तुलना कर रहे होते हैं।

3
JArgente 1 जिंदा 2021, 15:26
   StreamsBuilder builder=new StreamsBuilder();


    KStream<String, String> inputTopic = builder.stream("Trans_Topic");

            inputTopic
                   .filter((k,v)->{
                       return Long.parseLong(v.split(",")[6]) < 20L; //6 is the column id.
                   }).to("trans_topic_result_1");
    inputTopic
            .filter((k,v)->{
                return Long.parseLong(v.split(",")[6]) > 20L;
            }).to("trans_topic_result_2");
0
Fury Fazu 2 जिंदा 2021, 02:42