मैं काफ्का-धाराओं का उपयोग करके शब्द गणना की समस्या को क्रियान्वित कर रहा था। काफ्का का इस्तेमाल किया संस्करण 2.3.1 था, दो विषयों शब्द-गणना-इनपुट और शब्द-गणना-आउटपुट बनाया। उसी के लिए कोड है

public static void main( String[] args )
{
    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    StreamsBuilder builder = new StreamsBuilder();

    KStream<String, String> textLines = builder.stream("word-count-input");

    KTable<String, Long> wordCounts = textLines.mapValues(value -> value.toLowerCase())
                                               .flatMapValues(value -> Arrays.asList(value.split(" ")))
                                               .selectKey((key,value) -> value)
                                               .groupByKey()
                                               .count();

    // Writing back to the kafka topic
    wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));

    KafkaStreams streams = new KafkaStreams(builder.build(), properties);
    streams.start();


    //printing the topology
    System.out.println(streams.toString());

    // shutdown hook to correctly close the stream application
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

}   

कमांड प्रॉम्प्ट का उपयोग करके विषय वर्ड-काउंट-आउटपुट के लिए एक उपभोक्ता को लॉन्च करने पर, यह key.deserializer और value.deserializer के गुणों को सेट करने के लिए क्लास नॉट फाउंड अपवाद फेंकता है।

आप कमांड प्रॉम्प्ट यहां देख सकते हैं।

1
Saajan Kumar Jha 7 अप्रैल 2020, 11:27

1 उत्तर

सबसे बढ़िया उत्तर

समस्या काफ्का संस्करण में अंतर के साथ थी। सिस्टम पर स्थापित संस्करण 2.3.1 था और मावेन में 2.1.1 था। आशा है कि यह किसी की मदद करता है।

0
Saajan Kumar Jha 9 अप्रैल 2020, 15:19