मैं एक कार्यकर्ता इंजन को संगामिति की ऊपरी सीमा के साथ लागू कर रहा हूं। मैं एक सेमाफोर का उपयोग तब तक प्रतीक्षा करने के लिए कर रहा हूं जब तक कि संगामिति अधिकतम से कम न हो जाए, फिर Task.Factory.StartNew का उपयोग try/catch में async हैंडलर को लपेटने के लिए करें, एक finally के साथ जो रिलीज करता है सेमाफोर।

मुझे एहसास है कि यह थ्रेड पूल पर धागे बनाता है - लेकिन मेरा सवाल यह है कि, जब उन कार्य-चलने वाले धागे में से एक वास्तव में इंतजार कर रहा है (असली आईओ कॉल या प्रतीक्षा हैंडल पर), थ्रेड पूल में वापस आ गया है, जैसा कि मुझे उम्मीद है यह होने वाला?

यदि सीमित संगामिति के साथ कार्य शेड्यूलर को लागू करने का एक बेहतर तरीका है जहां कार्य हैंडलर एक एसिंक विधि है (रिटर्न Task), मुझे यह भी सुनना अच्छा लगेगा। या, आदर्श रूप से कहें, यदि एक एसिंक विधि को कतारबद्ध करने का कोई तरीका है (फिर से, यह एक Task है - एसिंक विधि लौटाना) जो इसे एक सिंक्रोनस प्रतिनिधि में लपेटने और इसे पास करने से कम डोडी महसूस करता है Task.Factory.StartNew, यह एकदम सही लगेगा..?

(इससे मुझे यह भी लगता है कि यहां दो प्रकार की समानताएं हैं: कुल मिलाकर कितने कार्यों को संसाधित किया जा रहा है, लेकिन यह भी कि अलग-अलग धागे पर एक साथ कितनी निरंतरता चल रही है। दोनों के लिए विन्यास योग्य विकल्प होने के लिए अच्छा हो सकता है, हालांकि निश्चित आवश्यकता नहीं है ..)

संपादित करें: स्निपेट:

                    concurrencySemaphore.Wait(cancelToken);
                    deferRelease = false;
                    try
                    {
                        var result = GetWorkItem();
                        if (result == null)
                        { // no work, wait for new work or exit signal
                            signal = WaitHandle.WaitAny(signals);
                            continue;
                        }

                        deferRelease = true;
                        tasks.Add(Task.Factory.StartNew(() =>
                        {
                            try
                            {
                                DoWorkHereAsync(result); // guess I'd think to .GetAwaiter().GetResult() here.. not run this yet
                            }
                            finally
                            {
                                concurrencySemaphore.Release();
                            }
                        }, cancelToken));
                    }
                    finally
                    {
                        if (!deferRelease)
                        {
                            concurrencySemaphore.Release();
                        }
                    }
3
Kieren Johnstone 3 नवम्बर 2017, 14:26

2 जवाब

सबसे बढ़िया उत्तर

यहां टास्कवर्कर का एक उदाहरण है, जो अनगिनत वर्कर थ्रेड्स का उत्पादन नहीं करेगा।

जादू SemaphoreSlim.WaitAsync() की प्रतीक्षा करके किया जाता है जो एक आईओ कार्य है (और कोई धागा नहीं है)।

class TaskWorker
{
    private readonly SemaphoreSlim _semaphore;

    public TaskWorker(int maxDegreeOfParallelism)
    {
        if (maxDegreeOfParallelism <= 0)
        {
            throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
        }

        _semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
    }

    public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
    {
        // No ConfigureAwait(false) here to keep the SyncContext if any
        // for the real task
        await _semaphore.WaitAsync(cancellationToken);
        try
        {
            await taskFactory().ConfigureAwait(false);
        }
        finally
        {
            _semaphore.Release(1);
        }
    }

    public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
    {
        await _semaphore.WaitAsync(cancellationToken);
        try
        {
            return await taskFactory().ConfigureAwait(false);
        }
        finally
        {
            _semaphore.Release(1);
        }
    }
}

और परीक्षण करने के लिए एक सरल कंसोल ऐप

class Program
{
    static void Main(string[] args)
    {
        var worker = new TaskWorker(1);
        var cts = new CancellationTokenSource();
        var token = cts.Token;

        var tasks = Enumerable.Range(1, 10)
            .Select(e => worker.RunAsync(() => SomeWorkAsync(e, token), token))
            .ToArray();

        Task.WhenAll(tasks).GetAwaiter().GetResult();
    }

    static async Task SomeWorkAsync(int id, CancellationToken cancellationToken)
    {
        Console.WriteLine($"Some Started {id}");
        await Task.Delay(2000, cancellationToken).ConfigureAwait(false);
        Console.WriteLine($"Some Finished {id}");
    }
}

अपडेट करें

TaskWorker लागू करना IDisposable

class TaskWorker : IDisposable
{
    private readonly CancellationTokenSource _cts = new CancellationTokenSource();
    private readonly SemaphoreSlim _semaphore;
    private readonly int _maxDegreeOfParallelism;

    public TaskWorker(int maxDegreeOfParallelism)
    {
        if (maxDegreeOfParallelism <= 0)
        {
            throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
        }

        _maxDegreeOfParallelism = maxDegreeOfParallelism;
        _semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
    }

    public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
    {
        ThrowIfDisposed();

        using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
        {
            // No ConfigureAwait(false) here to keep the SyncContext if any
            // for the real task
            await _semaphore.WaitAsync(cts.Token);
            try
            {
                await taskFactory().ConfigureAwait(false);
            }
            finally
            {
                _semaphore.Release(1);
            }
        }
    }

    public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
    {
        ThrowIfDisposed();

        using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
        {
            await _semaphore.WaitAsync(cts.Token);
            try
            {
                return await taskFactory().ConfigureAwait(false);
            }
            finally
            {
                _semaphore.Release(1);
            }
        }
    }

    private void ThrowIfDisposed()
    {
        if (disposedValue)
        {
            throw new ObjectDisposedException(this.GetType().FullName);
        }
    }

    #region IDisposable Support
    private bool disposedValue = false;

    protected virtual void Dispose(bool disposing)
    {
        if (!disposedValue)
        {
            if (disposing)
            {
                _cts.Cancel();
                // consume all semaphore slots
                for (int i = 0; i < _maxDegreeOfParallelism; i++)
                {
                    _semaphore.WaitAsync().GetAwaiter().GetResult();
                }
                _semaphore.Dispose();
                _cts.Dispose();
            }
            disposedValue = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
    }
    #endregion
}
2
Sir Rufo 3 नवम्बर 2017, 17:06

आप सोच सकते हैं कि धागा ThreadPool पर वापस आ गया है, यहां तक ​​​​कि सोचा कि यह वास्तव में वापसी नहीं है। जब एसिंक्स ऑपरेशन शुरू होता है तो थ्रेड बस अगली कतारबद्ध वस्तु चुनता है।

मेरा सुझाव है कि आप Task.Factory.StartNew के बजाय Task.Run देखें टास्क.रन बनाम टास्क.Factory.StartNew

और TPL DataFlow पर भी एक नज़र डालें. मुझे लगता है कि यह आपकी आवश्यकताओं से मेल खाएगा।

3
Andrii Litvinov 3 नवम्बर 2017, 14:39