StreamsBuilder builder = new StreamsBuilder();
Map<String, ?> serdeConfig = Collections.singletonMap(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
Serde keySerde= getSerde(keyClass);
keySerde.configure(serdeConfig,true);
Serde valueSerde = getSerde(valueClass);
valueSerde.configure(serdeConfig,false);
StoreBuilder<KeyValueStore<K,V>> store =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("mystore"),
keySerde,valueSerde).withCachingEnabled();
builder.addGlobalStore(store,"mytopic", Consumed.with(keySerde,valueSerde),this::processMessage);
streams=new KafkaStreams(builder.build(),properties);
registerShutdownHook();
streams.start();
readOnlyKeyValueStore = waitUntilStoreIsQueryable("mystore", QueryableStoreTypes.<Object, V>keyValueStore(), streams);
private <T> T waitUntilStoreIsQueryable(final String storeName,
final QueryableStoreType<T> queryableStoreType,
final KafkaStreams streams) {
// 25 seconds
long timeout=250;
while (timeout>0) {
try {
timeout--;
return streams.store(storeName, queryableStoreType);
} catch (InvalidStateStoreException ignored) {
// store not yet ready for querying
try {
Thread.sleep(100);
} catch (InterruptedException e) {
logger.error(e);
}
}
}
throw new StreamsException("ReadOnlyKeyValueStore is not queryable within 25 seconds");
}
त्रुटि इस प्रकार है:
19:42:35.049 [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread] ERROR org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer - global-stream-thread [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread] Updating global state failed. You can restart KafkaStreams to recover from this error.
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {my_component-0=6}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:990) ~[kafka-clients-2.2.1.jar:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:491) ~[kafka-clients-2.2.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269) ~[kafka-clients-2.2.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200) ~[kafka-clients-2.2.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) ~[kafka-clients-2.2.1.jar:?]
at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239) [kafka-streams-2.3.0.jar:?]
at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290) [kafka-streams-2.3.0.jar:?]
19:42:35.169 [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d] Global thread has died. The instance will be in error state and should be closed.
19:42:35.169 [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread] ERROR org.apache.zookeeper.server.NIOServerCnxnFactory - Thread Thread[my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread,5,main] died
org.apache.kafka.streams.errors.StreamsException: Updating global state failed. You can restart KafkaStreams to recover from this error.
at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:250) ~[kafka-streams-2.3.0.jar:?]
at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290) ~[kafka-streams-2.3.0.jar:?]
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {my_component-0=6}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:990) ~[kafka-clients-2.2.1.jar:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:491) ~[kafka-clients-2.2.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269) ~[kafka-clients-2.2.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200) ~[kafka-clients-2.2.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) ~[kafka-clients-2.2.1.jar:?]
at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239) ~[kafka-streams-2.3.0.jar:?]
... 1 more
org.apache.kafka.streams.errors.InvalidStateStoreException: State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.
at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:60)
मुझे दो अलग-अलग अपवाद दिखाई देते हैं।
InvalidStateStoreException - स्टोर खुला नहीं है
InvalidStateStoreException - Store अब उपलब्ध नहीं है और हो सकता है कि किसी अन्य उदाहरण में माइग्रेट हो गया हो
मेरे पास एप्लिकेशन आईडी के साथ विंडोज़ पर चल रहे स्ट्रीम एप्लिकेशन का केवल एक उदाहरण है।
उपरोक्त कोर से, मैं तब तक प्रतीक्षा कर रहा हूं जब तक स्टोर क्वेरी करने योग्य नहीं है, लेकिन फिर भी मुझे स्टोर नहीं खुला है और स्टोर उपलब्ध नहीं हो सकता है।
अपवाद (और उसके समाधान) के संभावित कारण क्या हैं?
सबसे पहले, क्या उपरोक्त कोड राइट-अप सही है?
1 उत्तर
OffsetOutOfRangeException
का अर्थ है कि राज्य में .checkpoint
फ़ाइल में संग्रहीत ऑफसेट काफ्का क्लस्टर में विषय के उन ऑफसेट के साथ सीमा से बाहर हैं।
ऐसा तब होता है जब विषय को साफ़ कर दिया जाता है और या फिर से बनाया जाता है। इसमें चेकपॉइंट में दिए गए ऑफ़सेट के जितने संदेश नहीं हो सकते हैं।
मैंने पाया है कि, .checkpoint
फ़ाइल को रीसेट करने से मदद मिलेगी। .checkpoint
फाइल कुछ इस तरह होगी।
0
1
my_component 0 6
my_component 1 0
यहां, 0 विभाजन है और 6 ऑफसेट है। इसी तरह, 1 विभाजन है और 0 ऑफसेट है।
अपवाद में विवरण my_component-0-6
का अर्थ है कि my_component
विषय के 0वें विभाजन का छठा ऑफसेट सीमा से बाहर है।
चूंकि, विषय को फिर से बनाया गया है, छठा ऑफसेट मौजूद नहीं है। इसलिए 6 से 0 बदलें।
यह ध्यान रखना महत्वपूर्ण है कि, काफ्का का परीक्षण करते समय, आपको परीक्षण पूरा होने के बाद राज्य निर्देशिका को साफ करना होगा, क्योंकि परीक्षण पूरा होने के बाद आपका एम्बेडेड काफ्का क्लस्टर और उसके विषय मौजूद नहीं हैं और इसलिए इसे बनाए रखने का कोई मतलब नहीं है आपके राज्य के स्टोर में ऑफ़सेट (क्योंकि वे बासी हो जाएंगे)।
इसलिए, सुनिश्चित करें कि परीक्षण के बाद आपकी राज्य निर्देशिका (आमतौर पर, /tmp/kafka-streams
या विंडोज़ C:\tmp\kafka-streams
) को साफ कर दिया गया है।
साथ ही, चेकपॉइंट फ़ाइल को रीसेट करना केवल एक समाधान है, और उत्पादन में एक आदर्श समाधान नहीं है।
उत्पादन में, यदि राज्य स्टोर अपने संबंधित विषय के साथ असंगत है (अर्थात ऑफ़सेट सीमा से बाहर हैं), तो इसका मतलब है कि कुछ भ्रष्टाचार है, संभव है कि किसी ने विषय को हटा दिया और फिर से बनाया हो।
ऐसी स्थिति में, मुझे लगता है, सफाई ही एकमात्र संभावित समाधान हो सकता है। क्योंकि, आपके राज्य के स्टोर में पुरानी जानकारी है जो अब मान्य नहीं है (जहां तक नए विषय का संबंध है)।
संबंधित सवाल
जुड़े हुए प्रश्न
नए सवाल
apache-kafka
Apache Kafka एक वितरित स्ट्रीमिंग प्लेटफ़ॉर्म है जिसे उच्च-थ्रूपुट डेटा धाराओं को संग्रहीत और संसाधित करने के लिए डिज़ाइन किया गया है।