संदर्भ:

"स्टेटफुल रिट्री" में डॉक्स (https://docs.spring .io/spring-kafka/reference/html/#stateful-retry) और "सीक टू करंट" (https://docs.spring.io/spring-kafka/reference/html/#seek-to-current) इसे इस तरह ध्वनि दें उपयोगकर्ता, मुझे SeekToCurrentErrorHandler में बैकऑफ फ़ंक्शन का उपयोग करने के लिए RetryTemplate से दूर माइग्रेट करना चाहिए।

मेरे पास वर्तमान में कुछ अपवादों के लिए अनंत लूप के साथ RetryTemplate का मिश्रण है + SeekToCurrentErrorHandler 3 बार की एक निश्चित पुनर्प्रयास के साथ जो अन्य सभी अपवादों के लिए काम करता है।

अब मैं इस प्रयास को handler.setBackOffFunction((record, ex) -> { ... }); से बदलना चाहता हूं, लेकिन मुझे निम्नलिखित समस्या का सामना करना पड़ रहा है

लेकिन मुझे यकीन नहीं है कि यह इरादा है, मैं गलत कॉन्फ़िगर कर रहा हूं या यदि यह एक बग है।

  • स्प्रिंग बूट 2.4.0
  • स्प्रिंग-काफ्का 2.6.3

प्रश्न:

जब मैं बड़े अंतराल के साथ SeekToCurrentErrorHandler का उपयोग कर रहा हूं, तो अंतराल के बाद "अरे आपके श्रोता ने अपवाद फेंक दिया" के लिए त्रुटि-संदेश लॉग करने के लिए प्रकट होता है। क्या यह जानबूझकर है? मेरा कोड एक अपवाद फेंक रहा है और एक लॉग-एंट्री बहुत बाद में दिखाई दे सकती है।

यहाँ हम पंक्ति १ को 22:59:14 पर क्रियान्वित करते हैं। कुछ ही समय बाद एक अपवाद फेंका जाता है, लेकिन लॉग में 10 के बाद 22:59:24 पर दिखाई देता है। ExponentialBackOff का उपयोग करते समय, वह समय-सीमा बड़ी और बड़ी हो जाती है।

foo1 at 2020-11-20T22:59:14.850721600
2020-11-20 22:59:24.976  INFO 21456 --- [  kgh1235-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-kgh1235-1, groupId=kgh1235] Seeking to offset 21 for partition kgh1235-0
2020-11-20 22:59:24.976 ERROR 21456 --- [  kgh1235-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void de.roppelt.Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: Thrown at 2020-11-20T22:59:14.850721600; nested exception is java.lang.RuntimeException: Thrown at 2020-11-20T22:59:14.850721600
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:157) ~[spring-kafka-2.6.3.jar:2.6.3]

निम्नलिखित के साथ पुन: उत्पन्न करने योग्य:

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @KafkaListener(id = "so64937593", topics = "so64937593")
    public void listen(String in) {
        LocalDateTime now = LocalDateTime.now();
        System.out.println(in + " at " + now);
        throw new RuntimeException("Thrown at " + now);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler((rec,ex) -> System.out.println("I am the dlq"), new ExponentialBackOff());
        factory.setErrorHandler(eh);
        return factory;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so64937593", 1, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send("so64937593", "foo" + i));
        };
    }

}

मुझे इसके ठीक बाद लॉगिंग देने के लिए निम्नलिखित मिला है:

RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setListeners(new RetryListener[]{new RetryListener() {
    @Override
    public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
        return true;
    }
    @Override
    public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
    }
    @Override
    public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
        System.out.println(LocalDateTime.now() + "Oops happened at " + context);
    }
}});
retryTemplate.setRetryPolicy(new NeverRetryPolicy());
factory.setRetryTemplate(retryTemplate);

तो मैं एक "नेवरट्री" नीति के साथ एक पुनः प्रयास टेम्पलेट जोड़ रहा हूं, इसलिए मैं लॉगर का उपयोग कर सकता हूं। , जहां अब मेरे पास तत्काल प्रतिक्रिया है कि एक त्रुटि हुई है।

foo1 at 2020-11-20T23:05:57.769425100
2020-11-20T23:05:57.769425100Oops happened at [RetryContext: count=1, lastException=org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void de.roppelt.Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: Thrown at 2020-11-20T23:05:57.769425100, exhausted=false]
2020-11-20 23:06:07.897  INFO 22608 --- [  kgh1235-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-kgh1235-1, groupId=kgh1235] Seeking to offset 21 for partition kgh1235-0
2020-11-20 23:06:07.897 ERROR 22608 --- [  kgh1235-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception

मुझे लगता है कि यह एक बग है, लेकिन मैं जीथब मुद्दा जमा करने से पहले यहां रुकना और पोस्ट करना चाहता था।

0
DRoppelt 21 नवम्बर 2020, 01:08

2 जवाब

सबसे बढ़िया उत्तर

लॉग कंटेनर द्वारा लिखा जाता है के बाद त्रुटि हैंडलर बाहर निकलता है (हमारे पास इसके बारे में कोई विकल्प नहीं है)।

हालांकि, आप SeekToCurrentErrorHandler पर लॉग स्तर को बदलकर उन लॉग को दबा सकते हैं। यह अपवाद पर स्तर सेट करता है और कंटेनर इसे उस स्तर पर लॉग करेगा।

/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    Assert.notNull(logLevel, "'logLevel' cannot be null");
    this.logLevel = logLevel;
}

यह 2.5 से उपलब्ध है।

https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers

आम तौर पर, फ्रेमवर्क द्वारा प्रदान किए गए त्रुटि हैंडलर एक अपवाद फेंक देंगे जब त्रुटि "हैंडल" नहीं होती है (उदाहरण के लिए एक खोज ऑपरेशन करने के बाद)। डिफ़ॉल्ट रूप से, ऐसे अपवादों को कंटेनर द्वारा ERROR स्तर पर लॉग किया जाता है। संस्करण 2.5 से शुरू होकर, सभी फ्रेमवर्क त्रुटि हैंडलर KafkaExceptionLogLevelAware का विस्तार करते हैं जो आपको उस स्तर को नियंत्रित करने की अनुमति देता है जिस पर इन अपवादों को लॉग किया जाता है।

यदि आप बैक ऑफ से पहले मूल कारण (मूल अपवाद) को लॉग करना चाहते हैं, तो आप त्रुटि हैंडलर को उपवर्गित कर सकते हैं और super.handle(...) को कॉल करने से पहले लॉग इन कर सकते हैं।

यदि आप फ्रेमवर्क के लिए त्रुटि लॉग करने का विकल्प चाहते हैं, तो बैक ऑफ करने से पहले, गिटहब समस्या को खोलने के लिए स्वतंत्र महसूस करें।

0
Gary Russell 21 नवम्बर 2020, 02:17

गैरी के उत्तर के आधार पर, मैं निम्नलिखित के साथ आया।

यह कुछ संदर्भ (प्रयास, विषय, ऑफसेट) के साथ तुरंत अपवाद को लॉग करता है। जो कुछ हद तक वैसा ही है जैसा कि RetryTemplate में LoggingListener ऑफ़र करता है।

public class LoggingSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {

    public LoggingSeekToCurrentErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> handler, BackOff backOff) {
        super(handler, backOff);
    }

    @Override
    public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        ConsumerRecord<?, ?> record = records.get(0);
        TopicPartitionOffset recordPosition = new TopicPartitionOffset(record.topic(), record.partition(), record.offset());
        String delivery = container.getContainerProperties().isDeliveryAttemptHeader() ? ""+deliveryAttempt(recordPosition) : "NA";
        log.error("Listener threw an exception, attempt {}. On partition {} at offset {}. Record timestamp {}", delivery, recordPosition.getTopicPartition(), recordPosition.getOffset(), record.timestamp(), thrownException);

        super.handle(thrownException, records, consumer, container);
    }
}

कारखाने में जोड़ा गया:

factory.getContainerProperties().setDeliveryAttemptHeader(true); // optional, see https://docs.spring.io/spring-kafka/reference/html/#delivery-header
errorHandler.setLogLevel(KafkaException.Level.DEBUG); // since our handler already logs. Prevent double logging of stacktrace

परिणामी नमूना:

@Slf4j
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @KafkaListener(id = "so64937593", topics = "so64937593")
    public void listen(String in) {
        log.info("Processing {}", in);
        throw new RuntimeException("Thrown at " + LocalDateTime.now());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setDeliveryAttemptHeader(true); // optional, see https://docs.spring.io/spring-kafka/reference/html/#delivery-header
        SeekToCurrentErrorHandler eh = new LoggingSeekToCurrentErrorHandler((rec,ex) -> log.error("I am the dlq"), new FixedBackOff(10_000, Long.MAX_VALUE));
        eh.setLogLevel(KafkaException.Level.DEBUG); // since our handler already logs. Prevent double logging of stacktrace
        factory.setErrorHandler(eh);
        return factory;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so64937593", 1, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send("so64937593", "foo" + i));
        };
    }


    public static class LoggingSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {

        public LoggingSeekToCurrentErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> handler, BackOff backOff) {
            super(handler, backOff);
        }

        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
            ConsumerRecord<?, ?> record = records.get(0);
            TopicPartitionOffset recordPosition = new TopicPartitionOffset(record.topic(), record.partition(), record.offset());
            String delivery = container.getContainerProperties().isDeliveryAttemptHeader() ? ""+deliveryAttempt(recordPosition) : "NA";
            log.error("Listener threw an exception, attempt {}. On partition {} at offset {}. Record timestamp {}", delivery, recordPosition.getTopicPartition(), recordPosition.getOffset(), record.timestamp(), thrownException);

            super.handle(thrownException, records, consumer, container);
        }
    }
}
0
DRoppelt 21 नवम्बर 2020, 10:38