मैं रैबिटएमक्यू के साथ मासट्रांसिट का उपयोग कर रहा हूं। कस्टम मिडलवेयर के लिए आधिकारिक दस्तावेज़ीकरण पृष्ठ पर उदाहरण के बाद, मैं कोशिश कर रहा हूं संदेश खपत पाइपलाइन पर एक फ़िल्टर बनाएं जो एक निश्चित स्थिति के आधार पर कुछ संदेशों को फ़िल्टर-आउट कर देगा। मेरा फ़िल्टर इस तरह दिखता है:

public class MyCustomFilter<T> : IFilter<T>
    where T : class, ConsumeContext
{
    public void Probe(ProbeContext context) { }

    public async Task Send(T context, IPipe<T> next)
    {
        if (/* certain condition */)
        {
            await next.Send(context);
        }
    }
}

समस्या यह है कि जब संदेश पाइपलाइन से नीचे नहीं जाता है (यानी await next.Send(context) को नहीं कहा जाता है), तो संदेश _स्किप किए गए उपभोक्ता RabbitMQ कतार में समाप्त हो जाता है। संदेश को उस कतार में जाने से रोकने का कोई तरीका है?

0
ripe_bananas 16 मार्च 2020, 17:23

1 उत्तर

सबसे बढ़िया उत्तर

skipped (मृत-अक्षर) कतार को DeadLetterFilter आह्वान द्वारा संदेश मिलता है। यहाँ कोड है:

async Task IFilter<ReceiveContext>.Send(ReceiveContext context, IPipe<ReceiveContext> next)
{
    await next.Send(context).ConfigureAwait(false);

    if (context.IsDelivered || context.IsFaulted)
        return;

    context.LogSkipped();

    await _deadLetterPipe.Send(context).ConfigureAwait(false);
}

तो, आप कल्पना कर सकते हैं कि यदि संदर्भ में IsDelivered या IsFaulted को true पर सेट किया गया है, तो आपके संदेश मृत-अक्षर कतार में समाप्त नहीं होंगे।

यदि आप फ़िल्टर डालते हैं, तो आपके संदेश ज़हर (error) कतार में समाप्त हो जाते हैं, इसलिए मुझे लगता है कि यह कोई विकल्प नहीं है।

आप अपने फ़िल्टर में फ़िल्टर किए गए संदेशों के लिए कुछ इस तरह से वितरित किए जा रहे अपने संदेशों का अनुकरण कर सकते हैं:

public Task Send(T context, IPipe<T> next)
    => condition
        ? next.Send(context)
        : context.NotifyConsumed(context as ConsumeContext<MyMessage>, TimeSpan.Zero, "Filtered");
2
Alexey Zimarev 16 मार्च 2020, 16:15