मैंने KafkaEmbedded
(और KafkaTemplate
) का उपयोग करके एक इकाई परीक्षण किया, लेकिन संदेश क्रम यादृच्छिक है। क्या किसी को पता है कि क्या यह तार्किक है, और यदि यह संभव है तो गारंटी आदेश?
यहाँ मेरा कोड है:
public class KafkaTest {
private static String TOPIC = "test.topic";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TOPIC);
@Test
public void testEmbeddedKafkaSendOrder() throws Exception {
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
KafkaTemplate<String, byte[]> kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig));
kafkaTemplate.send(TOPIC, "TEST1".getBytes()).get();
kafkaTemplate.send(TOPIC, "TEST2".getBytes()).get();
kafkaTemplate.send(TOPIC, "TEST3".getBytes()).get();
kafkaTemplate.send(TOPIC, "TEST4".getBytes()).get();
kafkaTemplate.send(TOPIC, "TEST5".getBytes()).get();
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-test-group");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfig.put("auto.offset.reset", "earliest");
final Consumer<String, byte[]> consumer = new KafkaConsumer<>(consumerConfig);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, TOPIC);
ConsumerRecords<String, byte[]> records = consumer.poll(100L);
// Tests
final Iterator<ConsumerRecord<String, byte[]>> recordIterator = records.iterator();
while (recordIterator.hasNext()) {
System.out.println("received:" + new String(recordIterator.next().value()));
}
}
यह कोड उदाहरण के लिए प्रिंट करता है (लेकिन ऑर्डर बदल सकता है):
received:TEST2
received:TEST4
received:TEST1
received:TEST3
received:TEST5
1 उत्तर
काफ्का में, आप यह सुनिश्चित कर सकते हैं कि संदेशों का क्रम उसी विभाजन पर समान है, लेकिन विषय पर नहीं।
Note that as a topic typically has multiple partitions, there is
no guarantee of message time-ordering across the entire topic, just within a single
partition
Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale
किताब का उद्धरण। आप इसके बारे में क्या कर सकते हैं और संदेशों को क्रम में कैसे प्राप्त करें? विकल्प 1:
kafkaTemplate.send(TOPIC,"1", "TEST1".getBytes()).get();
kafkaTemplate.send(TOPIC,"1", "TEST2".getBytes()).get();
kafkaTemplate.send(TOPIC,"1", "TEST3".getBytes()).get();
kafkaTemplate.send(TOPIC,"1", "TEST4".getBytes()).get();
kafkaTemplate.send(TOPIC,"1", "TEST5".getBytes()).get();
इस तरह, प्रत्येक मान के लिए, आप एक ही कुंजी "1" भेजते हैं। काफ्का आपकी कुंजी के आधार पर विभाजन का चयन करेगा। चूंकि सभी कुंजियां समान हैं, सभी संदेश एक ही विभाजन में जाएंगे और आपको अपने रिकॉर्ड क्रम में प्राप्त होंगे।
विकल्प 2: काफ्का को इस तरह से इनिशियलाइज़ करें:
new KafkaEmbedded(1, true,1, TOPIC);
इस तरह आप काफ्का को बता रहे हैं कि इस विषय के लिए आप केवल एक विभाजन रखना चाहेंगे, इसलिए प्रत्येक रिकॉर्ड उस विभाजन में जाएगा।