मैं फ़ाइलों को संसाधित करने के लिए वसंत एकीकरण का उपयोग कर रहा हूं। मैं पंक्ति से फ़ाइल पंक्ति पढ़ना चाहता हूं, फ़िल्टर करना, कनवर्ट करना और काफ्का को भेजना चाहता हूं। फ़ाइल को एसिंक्रोनस गेटवे के माध्यम से प्रसंस्करण के लिए पास किया जाता है। प्रसंस्करण के अंत में मैंने 2 उद्देश्यों के लिए एग्रीगेटर जोड़ा है:

  • मैं पूरी फाइल संसाधित होने तक प्रतीक्षा करना चाहता हूं। यही है, ठीक से संसाधित सभी पंक्तियों को काफ्का को भेज दिया गया है और त्रुटियों का कारण बनने वाली सभी पंक्तियों को प्रवाह द्वारा नियंत्रित किया गया है जो त्रुटि चैनल से पढ़ता है।
  • गेटवे के लिए लौटाए गए मान के रूप में मैं फ़ाइल प्रोसेसिंग के लिए समेकित परिणाम प्राप्त करना चाहता हूं। फ़ाइल से कितनी पंक्तियाँ पढ़ी गईं, कितनी त्रुटियों के बिना संसाधित हुईं आदि।

मैंने अब तक जो किया है, मैंने एग्रीगेटर बनाया है जो फ़ाइल मार्कर START और END + सभी संसाधित पंक्तियों की प्रतीक्षा करता है। मैंने कस्टम संदेश समूह बनाया है जो संसाधित पंक्तियों की गणना करता है और replyChannel को भी संग्रहीत करता है जो फ़ाइल START मार्कर संदेश और लाइनों की गणना से लिया जाता है जो फ़ाइल END मार्कर संदेश से लिया जाता है। समूह के लिए 5s टाइमआउट भी निर्धारित है। जब समूह पूरा करता है तो यह उन आंकड़ों के साथ संदेश उत्सर्जित करता है जो मुझे चाहिए और replyChannel START मार्कर से लिए गए मान के साथ हेडर। मेरी समझ यह है कि गेटवे इस replyChannel पर प्रतिक्रिया की प्रतीक्षा कर रहा है। संदेश समूह के लिए कोड यहां दिया गया है:

public void add(Message<?> messageToAdd) {
        if(messageToAdd.getPayload() instanceof FileMarker) {
            FileMarker marker = (FileMarker) messageToAdd.getPayload();
            switch (marker.getMark()) {
                case START:
                    replyChannel = messageToAdd.getHeaders().get(MessageHeaders.REPLY_CHANNEL);
                    break;
                case END:
                    expectedRowCount.set(marker.getLineCount());
                    break;
                default:
                    throw new IllegalStateException("Unexpected file mark");
            }
        } else {
            ProcessingResult processingResult = messageToAdd.getHeaders()
                    .get(ProcessingConstants.PROCESSING_RESULT_HEADER, ProcessingResult.class);
            assert processingResult != null;
            rowCount.incrementAndGet();
            switch (processingResult) {
                case FILTERED:
                    filteredCount.incrementAndGet();
                    break;
                case PROCESSED:
                    processedCount.incrementAndGet();
                    break;
                case PROCESSING_ERROR:
                    processingErrorCount.incrementAndGet();
                    break;
                case KAFKA_ERROR:
                    kafkaErrorCount.incrementAndGet();
                    break;
                default:
                    throw new IllegalStateException("Unrecognized processing result: " + processingResult);
            }
        }
    }

मेरे पास समस्या यह है कि मेरे गेटवे को कभी भी प्रतिक्रिया नहीं मिलती है और अनंत प्रतीक्षा करता है। मैं एसिंक्रोनस गेटवे को एग्रीगेटर द्वारा अपना संदेश भेजने और गेटवे प्रोसेसिंग के परिणामस्वरूप इस संदेश का पेलोड प्राप्त करने के लिए कैसे प्रतीक्षा कर सकता हूं?

समस्या को फिर से बनाने का परीक्षण यहां पाया जा सकता है https://github.com/hawk1234/spring-integration- उदाहरण प्रतिबद्ध 9f121f0729d8076872e6fbdcd7b1b91ca9ea8cb4। जब आप परीक्षण चलाते हैं तो एप्लिकेशन लॉग build/logs/spring-integration-example.log पथ के अंतर्गत उपलब्ध होते हैं। वर्तमान में टेस्ट हैंग हो जाता है क्योंकि गेटवे को कभी प्रतिक्रिया नहीं मिलती है। साथ ही पूरा प्रवाह कार्य प्रगति पर है इसलिए समूह समय समाप्त होने के बाद ही रिलीज करता है।

0
MAREK 20 मार्च 2020, 12:52

2 जवाब

सबसे बढ़िया उत्तर

मैं समस्या को ठीक करने में कामयाब रहा हूं। ऐसा करने के लिए मुझे MessageHeaders.REPLY_CHANNEL को मैन्युअल रूप से एकत्रित संदेश भेजना पड़ा, मैंने विधि BaseIntegrationFlowDefinition#logAndReply का उपयोग किया। फिक्स, साथ ही समाप्त उदाहरण, कमिट eefd5f472891ded39f363ff71ec530ada800f704 के साथ उपलब्ध है।

0
MAREK 23 मार्च 2020, 17:44

मैंने कस्टम संदेश समूह बनाया है कि

यह कुछ गलत है।

आपको सभी आवश्यक तर्क करने होंगे जब आपके पास पहले से ही सभी संदेश (FileMarker सहित) समूह, मानक समूह में हों। एक MessageGroupProcessor है, जिसे 3 तब कहा जाता है जब ReleaseStrategy, true लौटाता है। तो, यहां आपके पास समूह के लिए सभी संदेश हैं और आप वांछित उत्तर देने के लिए सभी आवश्यक आंकड़े एकत्र कर सकते हैं।

आपका आवेदन जल्दी पचने के लिए बहुत जटिल है। मैं देख रहा हूँ कि आप किसी एग्रीगेटर को त्रुटियाँ नहीं भेजते हैं। और उसे शीर्ष प्रवाह पर कहीं होना चाहिए, न कि केवल FileMarker उप-प्रवाह में। मेरा मतलब है कि अलग-अलग शीर्ष-स्तरीय प्रवाह में एक एग्रीगेटर होना बेहतर होगा और वास्तव में FileMarker सीधे उसी को भेजें, लेकिन बाकी sendSuccessChannel() से और त्रुटियों को संसाधित करने के बाद भी।

0
Artem Bilan 20 मार्च 2020, 17:35