मैं बस से संदेश शेड्यूल करने के लिए MassTransit + RabbitMq बस का उपयोग करना चाहता हूं। मैं दो सी # कंसोल एप्लिकेशन लिखता हूं, एक संदेश निर्माता के लिए और शेड्यूलर को संदेश भेजता है, और दूसरा संदेश उपभोक्ता के लिए।

बस में शेड्यूलिंग के लिए निम्न कोड, ताकि शेड्यूलर को एक प्रति सेकंड संदेश भेजें और फिर शेड्यूलर उपभोक्ता को 10 सेकंड की देरी से भेजे। मेरी समस्या यह है कि खरगोश एमक्यू क्लाइंट में उपभोक्ता या उपभोक्ता कतार में कोई संदेश नहीं भेजा जाता है। मेरी गलती कहाँ है?

नोट: UseInMemoryScheduler ठीक काम करता है लेकिन UseMessageScheduler काम नहीं करता है।

बस संदेश निर्माता

class Program
{
    public static void Main(string[] args)
    {
        MainAsync(args).GetAwaiter().GetResult();

        Console.ReadKey();
    }

    static async Task MainAsync(string[] args)
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
        {
            var host = rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>
            {
                settings.Username("guest");
                settings.Password("guest");
            });

            //rabbit.UseInMemoryScheduler(); // This works
            rabbit.UseMessageScheduler(new Uri("rabbitmq://localhost/quartz"));// This doesn't work,
        });

        busControl.Start();

        var sendEndpoint = await busControl.GetSendEndpoint(new Uri("rabbitmq://localhost/quartz"));

        for (int i = 0; i < 1000000; i++)
        {
            await sendEndpoint.ScheduleSend(new Uri("rabbitmq://localhost/publisher"), 
                DateTime.Now.AddSeconds(10), 
                new MessageCreated()
                {
                    Text = $"message {i}"
                });

            Thread.Sleep(1000);
        }

        Console.ReadKey();

        busControl.Stop();
    }
}

संदेश उपभोक्ता।

class Program
{
    static void Main(string[] args)
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
        {
            var host = rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>
            {
                settings.Password("guest");
                settings.Username("guest");
            });

            rabbit.ReceiveEndpoint(host, "publisher", conf =>
            {
                conf.Consumer<Consumer>();
            });
        });

        busControl.Start();
        Console.ReadKey();
        busControl.Stop();
    }
}

public class Consumer : IConsumer<MessageCreated>
{
    public Task Consume(ConsumeContext<MessageCreated> context)
    {
        MessageCreated message = context.Message;
        Console.WriteLine(message.Text);

        context.Publish(new MessagePublished
        {
            Text = message.Text,
        });

        return Task.FromResult(context.Message);
    }
}

अपडेट किया गया @maldworth उत्तर के आधार पर मैंने निम्नलिखित के लिए अपना कोड बदल दिया। लेकिन समस्या का समाधान नहीं हुआ।

class Program
{
    public static void Main(string[] args)
    {
        MainAsync(args).GetAwaiter().GetResult();

        Console.ReadKey();
    }

    private static async Task<IScheduler> CreateSchedulerAsync()
    {
        var schedulerFactory = new StdSchedulerFactory();

        var scheduler = await schedulerFactory.GetScheduler();

        return scheduler;
    }

    static async Task MainAsync(string[] args)
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(async cfg =>
        {
            var host = cfg.Host(new Uri("rabbitmq://localhost:5672"), settings =>
            {
                settings.Password("guest");
                settings.Username("guest");
            });

            var scheduler = await CreateSchedulerAsync();

            cfg.ReceiveEndpoint("quartz", e =>
            {
                cfg.UseMessageScheduler(e.InputAddress);

                e.Consumer(() => new ScheduleMessageConsumer(scheduler));
                e.Consumer(() => new CancelScheduledMessageConsumer(scheduler));
            });

            cfg.ReceiveEndpoint(host, "publisher", conf =>
            {
                conf.Consumer<PublisherConsumer>();
            });

            cfg.ReceiveEndpoint(host, "subscriber", conf =>
            {
                conf.Consumer<SubscriberConsumer>();
            });
        });

        busControl.Start();

        for (int i = 0; i < 1000000; i++)
        {
            var text = $"message {i}";

            Console.WriteLine($"Schedule: {text}");

            await busControl.ScheduleSend(new Uri("rabbitmq://localhost/publisher"),
                DateTime.Now.AddSeconds(30),
                new ScheduleMessage()
                {
                    Text = text
                });

            Thread.Sleep(10000);
        }

        Console.ReadKey();

        busControl.Stop();
    }
}

public class PublisherConsumer : IConsumer<ScheduleMessage>
{
    public Task Consume(ConsumeContext<ScheduleMessage> context)
    {
        Console.WriteLine($"In Publisher: {context.Message.Text}");

        context.Publish(new PublishMessage
        {
            Text = context.Message.Text,
        });

        return Task.FromResult(context.Message);
    }
}

public class SubscriberConsumer : IConsumer<PublishMessage>
{
    public Task Consume(ConsumeContext<PublishMessage> context)
    {
        Console.WriteLine($"In Subscriber: {context.Message.Text}");

        return Task.FromResult(context.Message);
    }
}

और App.config फ़ाइल सामग्री है:

  <configSections>
    <section name="quartz" type="System.Configuration.NameValueSectionHandler, System, Version=1.0.5000.0,Culture=neutral, PublicKeyToken=b77a5c561934e089" />
  </configSections>
    <startup> 
        <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6.2" />
    </startup>
    <quartz>
      <add key="quartz.scheduler.instanceName" value="MassTransit-Quartz" />
      <add key="quartz.scheduler.instanceId" value="AUTO" />
      <add key="quartz.threadPool.type" value="Quartz.Simpl.SimpleThreadPool, Quartz" />
      <add key="quartz.threadPool.threadCount" value="4" />
      <add key="quartz.jobStore.misfireThreshold" value="60000" />
      <add key="quartz.serializer.type" value="binary" />
      <add key="quartz.jobStore.type" value="Quartz.Impl.AdoJobStore.JobStoreTX, Quartz" />
      <add key="quartz.jobStore.useProperties" value="false" />
      <add key="quartz.jobStore.driverDelegateType" value="Quartz.Impl.AdoJobStore.SqlServerDelegate, Quartz" />
      <add key="quartz.jobStore.clustered" value="true" />
      <add key="quartz.jobStore.tablePrefix" value="QRTZ_" />
      <add key="quartz.jobStore.dataSource" value="quartzDS" />
      <add key="quartz.dataSource.quartzDS.connectionString" value="Server=.;Database=QuartzDB;Integrated Security=SSPI" />
      <add key="quartz.dataSource.quartzDS.provider" value="SqlServer" />
    </quartz>
1
Mohammad Akbari 24 जुलाई 2018, 09:16

2 जवाब

आप लगभग वहां हैं, बस इसके साथ तर्क अपडेट करें:

busControl.Start();
scheduler.JobFactory = new MassTransitJobFactory(busControl);
scheduler.Start().Wait();

Console.ReadKey();
busControl.Stop();
1
One of many 10 सितंबर 2018, 06:42

तो आप उल्लेख नहीं करते हैं कि आपके पास तीसरा उपभोक्ता चल रहा है (तीसरे कंसोल ऐप में)। क्वार्ट्ज के साथ काम करने के लिए शेड्यूलिंग के लिए, आपको विशेष रूप से केवल क्वार्ट्ज के लिए तीसरे उपभोक्ता की आवश्यकता है। इसे चलाना होगा, और इस मामले में क्वार्ट्ज प्राप्त समापन बिंदु "क्वार्ट्ज" कतार को सुन रहा होगा।

[अपडेट किया गया]

तीसरे कंसोल ऐप (क्वार्ट्ज सेवा) के लिए आप जो कॉन्फ़िगरेशन चाहते हैं उसका एक नमूना यहां दिया गया है:

var scheduler = CreateScheduler();
configurator.ReceiveEndpoint("quartz", e =>
{
    configurator.UseMessageScheduler(e.InputAddress);

    e.Consumer(() => new ScheduleMessageConsumer(scheduler));
    e.Consumer(() => new CancelScheduledMessageConsumer(scheduler));
});

...

private static IScheduler CreateScheduler()
{
    ISchedulerFactory schedulerFactory = new StdSchedulerFactory();

    var scheduler = schedulerFactory.GetScheduler();

    return scheduler;
}

और आपको क्वार्ट्ज को हमारे लिए एक स्टोर (SQLite, MSSql, RAM यदि आप स्मृति में परीक्षण करना चाहते हैं) को कॉन्फ़िगर करने की भी आवश्यकता होगी। कृपया देखें

[/अपडेटेड]

[अपडेट 2]

किसी ने समूहों में ऐसा ही प्रश्न पोस्ट किया है। सौभाग्य से, वे एक नमूना जीथब प्रदान करते हैं जिसमें विभिन्न एमटी कार्यक्षमता का एक समूह होता है, जिनमें से एक है अलग अनुसूचक। कृपया एक बार देख लें, इसमें वह सब कुछ होना चाहिए जो आपको चाहिए।

[अपडेट 2]

यदि आप पूर्ण विकसित क्वार्ट्ज शेड्यूलर चलाए बिना परीक्षण करना चाहते हैं, तो आप इनमेमोरी शेड्यूलर। लेकिन यह केवल परीक्षण उद्देश्यों के लिए है, आपको इसे उत्पादन में उपयोग नहीं करना चाहिए।

साथ ही, आपके पहले कोड स्निपेट में, आपको शेड्यूलर के लिए प्रेषण समापन बिंदु प्राप्त करने की आवश्यकता नहीं है: var sendEndpoint = await busControl.GetSendEndpoint(new Uri("rabbitmq://localhost/quartz"));

क्योंकि, बस कॉन्फ़िगरेशन में, rabbit.UseMessageScheduler(new Uri("rabbitmq://localhost/quartz")); इंगित करता है कि जब भी आप शेड्यूलसेंड (ConsumContext, या IBus/IBusControl से) को कॉल करते हैं, तो यह हमेशा उस /क्वार्ट्ज पते का उपयोग करेगा।

और अंत में, यह लाइन await sendEndpoint.ScheduleSend(new Uri("rabbitmq://localhost/publisher"),, फिर आप busControl.ScheduleSend(...) में बदल सकते हैं

0
maldworth 7 सितंबर 2018, 15:39
आपके उत्तर के लिए धन्यवाद। UseInMemoryScheduler द्वारा सब कुछ ठीक है। समस्या तब होती है जब मैं UseMessageScheduler का उपयोग करता हूं
 – 
Mohammad Akbari
25 जुलाई 2018, 13:28
मैंने आपके समाधान के आधार पर अपना प्रोजेक्ट बदल दिया लेकिन समस्या हल नहीं हुई। मैं प्रश्न के अद्यतन खंड में नया कोड जोड़ता हूं। कृपया इसे जांचें, और मेरी मदद करें।
 – 
Mohammad Akbari
30 जुलाई 2018, 08:08