नीचे की तरह एक निर्माता विन्यास के साथ, मैं एक सिंगलटन निर्माता बना रहा हूं जिसका उपयोग पूरे एप्लिकेशन में किया जाता है:
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.consul1:9092,kafka.consul2:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
मैं k8s होस्ट किए गए काफ्का क्लस्टर से जुड़ा हूं। ब्रोकर का advertised.listeners
मुझे आईपी पते वापस करने के लिए कॉन्फ़िगर किया गया है न कि होस्ट नाम के लिए। जबकि आम तौर पर सब कुछ अपेक्षित रूप से काम करता है, समस्या तब होती है जब काफ्का को फिर से शुरू किया जाता है, कभी-कभी आईपी पता बदल जाता है। चूंकि निर्माता केवल पुराने आईपी के बारे में जानता है, वह संदेश भेजने के लिए उस होस्ट से जुड़ने की कोशिश करता रहता है और कोई भी संदेश नहीं जाता है।
मैं देखता हूं कि जब प्रेषण विफल हो जाता है तो एक org.apache.kafka.common.errors.TimeoutException
अपवाद फेंक दिया जाता है। वर्तमान में संदेश एसिंक्स भेजे जाते हैं:
producer.send(data,
(RecordMetadata recordMetadata, Exception e) -> {
if (e != null) {
LOGGER.error("Exception while sending message to kafka", e);
}
});
टाइमआउट अपवाद को अब कैसे संभाला जाना चाहिए? यह देखते हुए कि निर्माता को पूरे एप्लिकेशन में साझा किया गया है, कॉलबैक में बंद करना और फिर से बनाना सही नहीं लगता है।
1 उत्तर
JavaDocs के अनुसार कॉलबैक इंटरफ़ेसपर a> TimeoutException
एक पुन: लाने योग्य अपवाद है जिसे निर्माता के retries
की संख्या बढ़ाकर नियंत्रित किया जा सकता है।
काफ्का दस्तावेज में आपको retries
कॉन्फ़िगरेशन पर विवरण मिलता है:
पुन: प्रयास (डिफ़ॉल्ट 0): शून्य से अधिक मान सेट करने से क्लाइंट किसी भी रिकॉर्ड को फिर से भेज देगा जिसका प्रेषण संभावित क्षणिक त्रुटि के साथ विफल हो जाता है। ध्यान दें कि यदि क्लाइंट त्रुटि प्राप्त करने पर रिकॉर्ड को नाराज करता है तो यह पुनर्प्रयास अलग नहीं है। max.in.flight.requests.per.connection को 1 पर सेट किए बिना पुन: प्रयास की अनुमति देने से रिकॉर्ड का क्रम संभावित रूप से बदल जाएगा क्योंकि यदि दो बैच एक ही विभाजन में भेजे जाते हैं, और पहला विफल हो जाता है और पुन: प्रयास किया जाता है, लेकिन दूसरा सफल होता है, तो रिकॉर्ड दूसरे बैच में पहले दिखाई दे सकता है।
संबंधित सवाल
नए सवाल
apache-kafka
Apache Kafka एक वितरित स्ट्रीमिंग प्लेटफ़ॉर्म है जिसे उच्च-थ्रूपुट डेटा धाराओं को संग्रहीत और संसाधित करने के लिए डिज़ाइन किया गया है।