मैंने KafkaEmbedded (और KafkaTemplate) का उपयोग करके एक इकाई परीक्षण किया, लेकिन संदेश क्रम यादृच्छिक है। क्या किसी को पता है कि क्या यह तार्किक है, और यदि यह संभव है तो गारंटी आदेश?

यहाँ मेरा कोड है:

public class KafkaTest {

  private static String TOPIC = "test.topic";

  @ClassRule
  public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TOPIC);

  @Test
  public void testEmbeddedKafkaSendOrder() throws Exception {
    Map<String, Object> producerConfig = new HashMap<>();
    producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
    producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

    KafkaTemplate<String, byte[]> kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig));
    kafkaTemplate.send(TOPIC, "TEST1".getBytes()).get();
    kafkaTemplate.send(TOPIC, "TEST2".getBytes()).get();
    kafkaTemplate.send(TOPIC, "TEST3".getBytes()).get();
    kafkaTemplate.send(TOPIC, "TEST4".getBytes()).get();
    kafkaTemplate.send(TOPIC, "TEST5".getBytes()).get();

    Map<String, Object> consumerConfig = new HashMap<>();
    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-test-group");
    consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    consumerConfig.put("auto.offset.reset", "earliest");

    final Consumer<String, byte[]> consumer = new KafkaConsumer<>(consumerConfig);
    embeddedKafka.consumeFromAnEmbeddedTopic(consumer, TOPIC);
    ConsumerRecords<String, byte[]> records = consumer.poll(100L);

    // Tests
    final Iterator<ConsumerRecord<String, byte[]>> recordIterator = records.iterator();
    while (recordIterator.hasNext()) {
      System.out.println("received:" + new String(recordIterator.next().value()));
    }
  }

यह कोड उदाहरण के लिए प्रिंट करता है (लेकिन ऑर्डर बदल सकता है):

received:TEST2
received:TEST4
received:TEST1
received:TEST3
received:TEST5
0
Juh_ 4 जुलाई 2019, 18:54

1 उत्तर

सबसे बढ़िया उत्तर

काफ्का में, आप यह सुनिश्चित कर सकते हैं कि संदेशों का क्रम उसी विभाजन पर समान है, लेकिन विषय पर नहीं।

Note that as a topic typically has multiple partitions, there is
no guarantee of message time-ordering across the entire topic, just within a single
partition

Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale किताब का उद्धरण। आप इसके बारे में क्या कर सकते हैं और संदेशों को क्रम में कैसे प्राप्त करें? विकल्प 1:

        kafkaTemplate.send(TOPIC,"1", "TEST1".getBytes()).get();
        kafkaTemplate.send(TOPIC,"1", "TEST2".getBytes()).get();
        kafkaTemplate.send(TOPIC,"1", "TEST3".getBytes()).get();
        kafkaTemplate.send(TOPIC,"1", "TEST4".getBytes()).get();
        kafkaTemplate.send(TOPIC,"1", "TEST5".getBytes()).get();

इस तरह, प्रत्येक मान के लिए, आप एक ही कुंजी "1" भेजते हैं। काफ्का आपकी कुंजी के आधार पर विभाजन का चयन करेगा। चूंकि सभी कुंजियां समान हैं, सभी संदेश एक ही विभाजन में जाएंगे और आपको अपने रिकॉर्ड क्रम में प्राप्त होंगे।

विकल्प 2: काफ्का को इस तरह से इनिशियलाइज़ करें:

new KafkaEmbedded(1, true,1, TOPIC);

इस तरह आप काफ्का को बता रहे हैं कि इस विषय के लिए आप केवल एक विभाजन रखना चाहेंगे, इसलिए प्रत्येक रिकॉर्ड उस विभाजन में जाएगा।

2
Spasoje Petronijević 4 जुलाई 2019, 21:05