मैं kafka_python==2.0.0 पुस्तकालय का उपयोग करता हूं,

नीचे दिए गए कोड के टुकड़े के साथ, अगर मुझे 1 घंटे के लिए संदेश नहीं मिलता है, तो काफ्का विषय में धक्का दिया गया अगला संदेश उपभोक्ता द्वारा संसाधित नहीं किया जाता है, हालांकि लूप बंद नहीं होता है।

मैं चाहूंगा कि मेरा श्रोता बिना कनेक्शन खोए 24/24 चलाए

 consumer = KafkaConsumer(
    os.environ.get('MY_TOPIC'),
    bootstrap_servers=broker,
    api_version=my_version,
    security_protocol='SASL_PLAINTEXT',
    sasl_mechanism='GSSAPI',
    sasl_kerberos_service_name=service_name,
    group_id='MY_GRP_ID',
    max_poll_records=1
    
)

try:
    for msg in consumer:
        ##PROCESS function ... 
        consumer.commit()
       
finally:
    consumer.close()
0
L. Quastana 10 नवम्बर 2020, 14:04

1 उत्तर

सबसे बढ़िया उत्तर

मैं अंत में मतदान विधि का उपयोग करता हूं:

from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
while True:
    # Response format is {TopicPartiton('topic1', 1): [msg1, msg2]}
    msg_pack = consumer.poll(timeout_ms=500)

    for tp, messages in msg_pack.items():
        # message value and key are raw bytes -- decode if necessary!
        # e.g., for unicode: `message.value.decode('utf-8')`
        print ("%s:%d:%d: key=%s value=%s" % (tp.topic, tp.partition,
                                              message.offset, message.key,
                                              message.value))

इस वाक्य रचना के लिए मेरी बात का लाभ संदेशों को पुनः प्राप्त करने के तरीके पर बेहतर दृश्यता है। यह मेरे उदाहरण में नहीं है, लेकिन मैं सिग्टर्म सिग्नल की तलाश करके प्रोग्राम को रोकने का बेहतर प्रबंधन कर सकता हूं

0
L. Quastana 11 नवम्बर 2020, 12:16