StreamsBuilder builder = new StreamsBuilder();

    Map<String, ?> serdeConfig = Collections.singletonMap(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);

    Serde keySerde= getSerde(keyClass);
    keySerde.configure(serdeConfig,true);

    Serde valueSerde = getSerde(valueClass);
    valueSerde.configure(serdeConfig,false);

    StoreBuilder<KeyValueStore<K,V>> store =
        Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("mystore"),
            keySerde,valueSerde).withCachingEnabled();

    builder.addGlobalStore(store,"mytopic", Consumed.with(keySerde,valueSerde),this::processMessage);

    streams=new KafkaStreams(builder.build(),properties);

    registerShutdownHook();

    streams.start();

    readOnlyKeyValueStore = waitUntilStoreIsQueryable("mystore", QueryableStoreTypes.<Object, V>keyValueStore(), streams);


private <T> T waitUntilStoreIsQueryable(final String storeName,
      final QueryableStoreType<T> queryableStoreType,
      final KafkaStreams streams) {

    // 25 seconds
    long timeout=250;

    while (timeout>0) {
      try {
        timeout--;
        return streams.store(storeName, queryableStoreType);
      } catch (InvalidStateStoreException ignored) {
        // store not yet ready for querying
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          logger.error(e);
        }
      }
    }
    throw new StreamsException("ReadOnlyKeyValueStore is not queryable within 25 seconds");
  }

त्रुटि इस प्रकार है:

19:42:35.049 [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread] ERROR org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer - global-stream-thread [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread] Updating global state failed. You can restart KafkaStreams to recover from this error.
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {my_component-0=6}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:990) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:491) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239) [kafka-streams-2.3.0.jar:?]
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290) [kafka-streams-2.3.0.jar:?]
19:42:35.169 [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d] Global thread has died. The instance will be in error state and should be closed.
19:42:35.169 [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread] ERROR org.apache.zookeeper.server.NIOServerCnxnFactory - Thread Thread[my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread,5,main] died
org.apache.kafka.streams.errors.StreamsException: Updating global state failed. You can restart KafkaStreams to recover from this error.
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:250) ~[kafka-streams-2.3.0.jar:?]
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290) ~[kafka-streams-2.3.0.jar:?]
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {my_component-0=6}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:990) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:491) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239) ~[kafka-streams-2.3.0.jar:?]
    ... 1 more

org.apache.kafka.streams.errors.InvalidStateStoreException: State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.

    at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:60)

मुझे दो अलग-अलग अपवाद दिखाई देते हैं।

  1. InvalidStateStoreException - स्टोर खुला नहीं है

  2. InvalidStateStoreException - Store अब उपलब्ध नहीं है और हो सकता है कि किसी अन्य उदाहरण में माइग्रेट हो गया हो

मेरे पास एप्लिकेशन आईडी के साथ विंडोज़ पर चल रहे स्ट्रीम एप्लिकेशन का केवल एक उदाहरण है।

उपरोक्त कोर से, मैं तब तक प्रतीक्षा कर रहा हूं जब तक स्टोर क्वेरी करने योग्य नहीं है, लेकिन फिर भी मुझे स्टोर नहीं खुला है और स्टोर उपलब्ध नहीं हो सकता है।

अपवाद (और उसके समाधान) के संभावित कारण क्या हैं?

सबसे पहले, क्या उपरोक्त कोड राइट-अप सही है?

2
JavaTechnical 25 जुलाई 2019, 17:22

1 उत्तर

सबसे बढ़िया उत्तर

OffsetOutOfRangeException का अर्थ है कि राज्य में .checkpoint फ़ाइल में संग्रहीत ऑफसेट काफ्का क्लस्टर में विषय के उन ऑफसेट के साथ सीमा से बाहर हैं।

ऐसा तब होता है जब विषय को साफ़ कर दिया जाता है और या फिर से बनाया जाता है। इसमें चेकपॉइंट में दिए गए ऑफ़सेट के जितने संदेश नहीं हो सकते हैं।

मैंने पाया है कि, .checkpoint फ़ाइल को रीसेट करने से मदद मिलेगी। .checkpoint फाइल कुछ इस तरह होगी।

0
1
my_component 0  6
my_component 1  0

यहां, 0 विभाजन है और 6 ऑफसेट है। इसी तरह, 1 विभाजन है और 0 ऑफसेट है।

अपवाद में विवरण my_component-0-6 का अर्थ है कि my_component विषय के 0वें विभाजन का छठा ऑफसेट सीमा से बाहर है।

चूंकि, विषय को फिर से बनाया गया है, छठा ऑफसेट मौजूद नहीं है। इसलिए 6 से 0 बदलें।


यह ध्यान रखना महत्वपूर्ण है कि, काफ्का का परीक्षण करते समय, आपको परीक्षण पूरा होने के बाद राज्य निर्देशिका को साफ करना होगा, क्योंकि परीक्षण पूरा होने के बाद आपका एम्बेडेड काफ्का क्लस्टर और उसके विषय मौजूद नहीं हैं और इसलिए इसे बनाए रखने का कोई मतलब नहीं है आपके राज्य के स्टोर में ऑफ़सेट (क्योंकि वे बासी हो जाएंगे)।

इसलिए, सुनिश्चित करें कि परीक्षण के बाद आपकी राज्य निर्देशिका (आमतौर पर, /tmp/kafka-streams या विंडोज़ C:\tmp\kafka-streams) को साफ कर दिया गया है।

साथ ही, चेकपॉइंट फ़ाइल को रीसेट करना केवल एक समाधान है, और उत्पादन में एक आदर्श समाधान नहीं है।


उत्पादन में, यदि राज्य स्टोर अपने संबंधित विषय के साथ असंगत है (अर्थात ऑफ़सेट सीमा से बाहर हैं), तो इसका मतलब है कि कुछ भ्रष्टाचार है, संभव है कि किसी ने विषय को हटा दिया और फिर से बनाया हो।

ऐसी स्थिति में, मुझे लगता है, सफाई ही एकमात्र संभावित समाधान हो सकता है। क्योंकि, आपके राज्य के स्टोर में पुरानी जानकारी है जो अब मान्य नहीं है (जहां तक ​​​​नए विषय का संबंध है)।

2
JavaTechnical 1 अगस्त 2019, 07:56