जैसा कि हम काफ्का स्ट्रीम में जानते हैं डॉक, पीक, फिल्टर, ब्रांच स्टेटलेस ऑपरेशन हैं? हालांकि, मैं इस प्रोसेसर में कुछ स्टेटफुल ऑपरेशन करना चाहता हूं? उदाहरण के लिए, मैं कुछ क्वेरी करना चाहता हूं, और संदेशों को परिणामों के आधार पर फ़िल्टर करना चाहता हूं, क्या मैं ऐसा कर सकता हूं?

3
zydzjy 8 अप्रैल 2020, 14:37

2 जवाब

संचालन peek(), filter(), और branch() स्वाभाविक रूप से स्टेटलेस हैं। जब आप कहें:

मैं कुछ क्वेरी करना चाहता हूं, और संदेशों को फ़िल्टर करना परिणामों को आधार बनाता है

से यह निर्भर करता है कि आप क्या पूछना चाहते हैं? "बाहरी" एपीआई से पूछताछ करना संभव है (लेकिन अनुशंसित नहीं)। हालांकि, इसके लिए कोई अंतर्निहित समर्थन नहीं है, और इसे मजबूत बनाने के लिए विचार करने के लिए कई कोने के मामले हैं। ध्यान दें कि बाहरी सिस्टम को क्वेरी करने से ऑपरेशन स्टेटफुल नहीं हो जाता है।

यदि आप राज्य के साथ काम करना चाहते हैं, तो आप transform() (और भाई-बहन) का उपयोग कर सकते हैं और कस्टम ऑपरेटर बना सकते हैं। यदि आप अपने सभी डाउनस्ट्रीम ऑपरेटरों का नाम लेते हैं (Named और इसी तरह के माध्यम से) तो आप कस्टम शाखा को लागू करने के लिए context.forward(..., To.child(...)) का उपयोग कर सकते हैं। फ़िल्टर करने के लिए आप कुछ भी अग्रेषित न करने के लिए null वापस कर सकते हैं।

सुनिश्चित नहीं है कि एक स्टेटफुल पीक() का उपयोग किस लिए किया जाएगा, लेकिन आप ऐसा भी कर सकते हैं।

उपयोग-मामले के आधार पर, स्ट्रीम-टेबल जॉइन या स्ट्रीम-ग्लोबलटेबल जॉइन के माध्यम से "स्टेटफुल फ़िल्टर" को कार्यान्वित करना भी संभव है।

1
Matthias J. Sax 9 अप्रैल 2020, 07:04

IMO, ऐसा करने का सबसे अच्छा तरीका है टेबल लुकअप का उपयोग करना KStream#...join का उपयोग करना या Processor API का उपयोग करना अंतर्निहित स्टेट स्टोर तक पहुंच प्राप्त करने के लिए (KStream#transformValues का उपयोग करके)।

आप ऐसा कर सकते हैं, लेकिन कोड बहुत बुरा होगा (इसकी अनुशंसा नहीं करेंगे), लेकिन स्ट्रीम स्थिति के REBALANCING से RUNNING में स्थानांतरित होने के बाद आप केवल ReadOnlyKeyValueStore तक केवल पढ़ने के लिए पहुंच प्राप्त कर सकते हैं:

kafkaStreams.setStateListener((newState, oldState) -> {
    if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
        ReadOnlyKeyValueStore<Object, Object> kvStore = kafkaStreams.store("stateStore", QueryableStoreTypes.keyValueStore());
        //assign this kvStore to some place so you can later using this referrer access this in filter or in peek
    }
});
0
Tuyen Luong 9 अप्रैल 2020, 07:01
KStream#transformValuesया KStream#transform?ब्यूकॉज, बाद वाला शून्य वापस आ सकता है, जो संदेशों को अग्रेषित करना समाप्त कर सकता है, जो कि फ़िल्टर फ़ंक्शन है ?!
 – 
zydzjy
9 अप्रैल 2020, 04:30
हां, KStream#transform एक फिल्टर के रूप में भी काम करेगा जब आप नल कीवैल्यू लौटाते हैं जैसे आपने कहा था, लेकिन सावधान रहें कि KStream#transform डेटा री-पार्टीशन को चिह्नित करेगा और जब आप कुछ ऑपरेटर जैसे ज्वाइन, ग्रुपबायकी, को कॉल करेंगे तो ट्रिगर हो जाएगा।
 – 
Tuyen Luong
9 अप्रैल 2020, 04:50
"इंटरएक्टिव क्वेरीज़" का उपयोग करना एक कार्यशील समाधान के रूप में नहीं लगता है।
 – 
Matthias J. Sax
9 अप्रैल 2020, 06:03
हां, सुरुचिपूर्ण समाधान अभी भी प्रोसेसर एपीआई या टेबल लुकअप का उपयोग कर रहा है
 – 
Tuyen Luong
9 अप्रैल 2020, 07:02
तो मैं ट्रांसफॉर्म वैल्यू को फ़िल्टर फ़ंक्शन के रूप में कैसे उपयोग कर सकता हूं? धन्यवाद!
 – 
zydzjy
9 अप्रैल 2020, 08:25