मैंने अपने स्ट्रीम निष्पादन वातावरण में स्रोत के रूप में flinkkafkaconsumer जोड़ा है। जब किसी विशिष्ट समय (काफ्का पोलटाइम के समान) के लिए कोई नया संदेश प्राप्त नहीं होता है, तो मैं डेटा का उपभोग करने से फ्लिंक को बंद/बंद करना चाहता हूं। वर्तमान में यह अनिश्चित काल तक चल रहा है और निष्पादन को अगले चरण (संदेशों को मान्य करना) पर जाने से रोक रहा है। कृपया सुझाव दें कि क्या कोई उपाय है।

नोट: मैंने deserialization से endofstream के साथ प्रयास किया और यह काम नहीं करेगा क्योंकि स्ट्रीम व्यावहारिक रूप से अनिश्चित है।

अग्रिम में धन्यवाद।

0
dv123 5 पद 2019, 21:47

1 उत्तर

सबसे बढ़िया उत्तर

यदि यह परीक्षण के लिए है, तो एक तरीका यह है कि आप अपना स्वयं का कस्टम स्रोत बनाएं जो FlinkKafkaConsumer को "रैप" करता है। आपके स्रोत की run() विधि एक धागे से काफ्का स्रोत की run() विधि को कॉल करेगी, एक कलेक्टर में गुजरती है जो असली कलेक्टर को लपेटती है, और जब भी कुछ भी एकत्र किया जाता है तो "अंतिम एकत्रित समय" अपडेट करता है। अपने स्रोत की run() विधि में आप इस पर मतदान करेंगे, और बहुत अधिक समय बीत जाने पर काक्फा स्रोत की cancel() विधि को कॉल करें, फिर बाहर निकलें।

यह सब कहने के बाद, आम तौर पर यूनिट परीक्षण के लिए आप एक नकली स्रोत का उपयोग करना चाहते हैं जो आपको वास्तव में क्या उत्पन्न हो रहा है, और कब, एक काफ्का प्रणाली को कताई के विरुद्ध नियंत्रित करने देता है।

2
kkrugler 6 पद 2019, 06:19