दबाने () प्रलेखन पर पढ़ते समय, मैंने देखा कि समय खिड़की तब तक आगे नहीं बढ़ेगी जब तक कि विषय पर रिकॉर्ड प्रकाशित नहीं किए जा रहे हों, क्योंकि यह घटना के समय पर आधारित है। अभी, मेरा कोड प्रत्येक कुंजी के लिए अंतिम मान को आउटपुट कर रहा है, क्योंकि विषय पर ट्रैफ़िक स्थिर है, लेकिन जब उस सिस्टम को नीचे लाया जाता है, तो डाउनटाइम होते हैं, जिससे राज्य के स्टोर में मौजूदा रिकॉर्ड "जमे हुए" हो जाते हैं। मैं सोच रहा था कि कम करने () के बजाय () को कम करने के बीच क्या अंतर है। दबाएं ()। क्या कम () दबाने की तरह कार्य करता है () जिसमें वे दोनों घटना समय संचालित होते हैं? मेरी समझ यह है कि दोनों एक ही काम कर रहे हैं, एक निश्चित समय खिड़की के भीतर चाबियों को एकत्रित करना।

मेरी टोपोलॉजी निम्नलिखित है:

    final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
    final Serde<EligibilityKey> keySpecificAvroSerde = new SpecificAvroSerde<EligibilityKey>();
    keySpecificAvroSerde.configure(serdeConfig, true);
    final Serde<Eligibility> valueSpecificAvroSerde = new SpecificAvroSerde<Eligibility>();
    valueSpecificAvroSerde.configure(serdeConfig, false);

    // KStream<EligibilityKey, Eligibility>
    KStream<EligibilityKey, Eligibility> kStreamInput = builder.stream(input,
            Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde));

    // KStream<EligibilityKey, String>
    KStream<EligibilityKey, String> kStreamMapValues = kStreamInput
            .mapValues((key, value) -> Processor.process(key, value));

    // WindowBytesStoreSupplier
    WindowBytesStoreSupplier windowBytesStoreSupplier = Stores.inMemoryWindowStore("in-mem",
            Duration.ofSeconds(retentionPeriod), Duration.ofSeconds(windowSize), false);

    // Materialized
    Materialized<EligibilityKey, String, WindowStore<Bytes, byte[]>> materialized = Materialized
            .as(windowBytesStoreSupplier);
    materialized = Materialized.with(keySpecificAvroSerde, Serdes.String());

    // TimeWindows
    TimeWindows timeWindows = TimeWindows.of(Duration.ofSeconds(size)).advanceBy(Duration.ofSeconds(advance))
            .grace(Duration.ofSeconds(afterWindowEnd));

    // KTable<Windowed<EligibilityKey>, String>
    KTable<Windowed<EligibilityKey>, String> kTable = kStreamMapValues
            .groupByKey(Grouped.with(keySpecificAvroSerde, Serdes.String())).windowedBy(timeWindows)
            .reduce((a, b) -> b, materialized.withLoggingDisabled().withRetention(Duration.ofSeconds(retention)))
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withLoggingDisabled()));

    // KStream<Windowed<EligibilityKey>, String>
    KStream<Windowed<EligibilityKey>, String> kStreamOutput = kTable.toStream();
0
mikeayonguyen 18 नवम्बर 2020, 19:16

1 उत्तर

सबसे बढ़िया उत्तर

बिना दबाने के reduce() का उपयोग करके, एकत्रीकरण के परिणाम को लगातार अपडेट किया जाता है, यानी, KTable के अपडेट जो reduce() के परिणाम रखते हैं, उन्हें विंडो के सभी रिकॉर्ड संसाधित होने से पहले डाउनस्ट्रीम भी भेजा जाता है।

एक कमी मान लें जो अवधि 3 की विंडो में ग्रेस 0 और निम्नलिखित इनपुट रिकॉर्ड्स (कुंजी, मान, टाइमस्टैम्प) से reduce() तक मानों को सारांशित करता है:

  • W1 का इनपुट रिकॉर्ड (A, 1, 1) -> आउटपुट रिकॉर्ड ((W1,A), 1) डाउनस्ट्रीम भेजा जाता है
  • W1 का इनपुट रिकॉर्ड (A, 2, 2) -> आउटपुट रिकॉर्ड ((W1,A), 3) डाउनस्ट्रीम भेजा जाता है
  • W1 का इनपुट रिकॉर्ड (A, 3, 3) -> आउटपुट रिकॉर्ड ((W1,A), 6) डाउनस्ट्रीम भेजा जाता है
  • W2 का इनपुट रिकॉर्ड (A, 4, 4) -> आउटपुट रिकॉर्ड ((W2,A), 4) डाउनस्ट्रीम भेजा जाता है

reduce().suppress() के साथ, विंडो बंद होने तक परिणाम बफ़र किए जाते हैं। परिणाम होगा:

  • W1 का इनपुट रिकॉर्ड (A, 1, 1) -> कोई आउटपुट नहीं
  • W1 का इनपुट रिकॉर्ड (A, 2, 2) -> कोई आउटपुट नहीं
  • W1 का इनपुट रिकॉर्ड (A, 3, 3) -> कोई आउटपुट नहीं
  • W2 का इनपुट रिकॉर्ड (A, 4, 4) -> आउटपुट रिकॉर्ड ((W1,A), 6) डाउनस्ट्रीम भेजा जाता है

ध्यान दें कि बिना suppress() के मामले के लिए मैंने मान लिया था कि कैशे को cache.max.bytes.buffering = 0 के साथ बंद कर दिया गया है। cache.max.bytes.buffering > 0 (डिफ़ॉल्ट 10 एमबी है) के साथ, कैश एक केटेबल के आउटपुट रिकॉर्ड को बफर करेगा और एक बार कैश भर जाने के बाद, यह उस कुंजी के साथ रिकॉर्ड को आउटपुट करेगा जिसे कम से कम हाल ही में अपडेट किया गया था।

0
Bruno Cadonna 20 नवम्बर 2020, 13:31