मैं एक कार्यकर्ता इंजन को संगामिति की ऊपरी सीमा के साथ लागू कर रहा हूं। मैं एक सेमाफोर का उपयोग तब तक प्रतीक्षा करने के लिए कर रहा हूं जब तक कि संगामिति अधिकतम से कम न हो जाए, फिर Task.Factory.StartNew
का उपयोग try
/catch
में async हैंडलर को लपेटने के लिए करें, एक finally
के साथ जो रिलीज करता है सेमाफोर।
मुझे एहसास है कि यह थ्रेड पूल पर धागे बनाता है - लेकिन मेरा सवाल यह है कि, जब उन कार्य-चलने वाले धागे में से एक वास्तव में इंतजार कर रहा है (असली आईओ कॉल या प्रतीक्षा हैंडल पर), थ्रेड पूल में वापस आ गया है, जैसा कि मुझे उम्मीद है यह होने वाला?
यदि सीमित संगामिति के साथ कार्य शेड्यूलर को लागू करने का एक बेहतर तरीका है जहां कार्य हैंडलर एक एसिंक विधि है (रिटर्न Task
), मुझे यह भी सुनना अच्छा लगेगा। या, आदर्श रूप से कहें, यदि एक एसिंक विधि को कतारबद्ध करने का कोई तरीका है (फिर से, यह एक Task
है - एसिंक विधि लौटाना) जो इसे एक सिंक्रोनस प्रतिनिधि में लपेटने और इसे पास करने से कम डोडी महसूस करता है Task.Factory.StartNew
, यह एकदम सही लगेगा..?
(इससे मुझे यह भी लगता है कि यहां दो प्रकार की समानताएं हैं: कुल मिलाकर कितने कार्यों को संसाधित किया जा रहा है, लेकिन यह भी कि अलग-अलग धागे पर एक साथ कितनी निरंतरता चल रही है। दोनों के लिए विन्यास योग्य विकल्प होने के लिए अच्छा हो सकता है, हालांकि निश्चित आवश्यकता नहीं है ..)
संपादित करें: स्निपेट:
concurrencySemaphore.Wait(cancelToken);
deferRelease = false;
try
{
var result = GetWorkItem();
if (result == null)
{ // no work, wait for new work or exit signal
signal = WaitHandle.WaitAny(signals);
continue;
}
deferRelease = true;
tasks.Add(Task.Factory.StartNew(() =>
{
try
{
DoWorkHereAsync(result); // guess I'd think to .GetAwaiter().GetResult() here.. not run this yet
}
finally
{
concurrencySemaphore.Release();
}
}, cancelToken));
}
finally
{
if (!deferRelease)
{
concurrencySemaphore.Release();
}
}
2 जवाब
यहां टास्कवर्कर का एक उदाहरण है, जो अनगिनत वर्कर थ्रेड्स का उत्पादन नहीं करेगा।
जादू SemaphoreSlim.WaitAsync()
की प्रतीक्षा करके किया जाता है जो एक आईओ कार्य है (और कोई धागा नहीं है)।
class TaskWorker
{
private readonly SemaphoreSlim _semaphore;
public TaskWorker(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
}
_semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
}
public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
// No ConfigureAwait(false) here to keep the SyncContext if any
// for the real task
await _semaphore.WaitAsync(cancellationToken);
try
{
await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
await _semaphore.WaitAsync(cancellationToken);
try
{
return await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
}
और परीक्षण करने के लिए एक सरल कंसोल ऐप
class Program
{
static void Main(string[] args)
{
var worker = new TaskWorker(1);
var cts = new CancellationTokenSource();
var token = cts.Token;
var tasks = Enumerable.Range(1, 10)
.Select(e => worker.RunAsync(() => SomeWorkAsync(e, token), token))
.ToArray();
Task.WhenAll(tasks).GetAwaiter().GetResult();
}
static async Task SomeWorkAsync(int id, CancellationToken cancellationToken)
{
Console.WriteLine($"Some Started {id}");
await Task.Delay(2000, cancellationToken).ConfigureAwait(false);
Console.WriteLine($"Some Finished {id}");
}
}
अपडेट करें
TaskWorker
लागू करना IDisposable
class TaskWorker : IDisposable
{
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly SemaphoreSlim _semaphore;
private readonly int _maxDegreeOfParallelism;
public TaskWorker(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
}
_maxDegreeOfParallelism = maxDegreeOfParallelism;
_semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
}
public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
ThrowIfDisposed();
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
{
// No ConfigureAwait(false) here to keep the SyncContext if any
// for the real task
await _semaphore.WaitAsync(cts.Token);
try
{
await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
}
public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
ThrowIfDisposed();
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
{
await _semaphore.WaitAsync(cts.Token);
try
{
return await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
}
private void ThrowIfDisposed()
{
if (disposedValue)
{
throw new ObjectDisposedException(this.GetType().FullName);
}
}
#region IDisposable Support
private bool disposedValue = false;
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
_cts.Cancel();
// consume all semaphore slots
for (int i = 0; i < _maxDegreeOfParallelism; i++)
{
_semaphore.WaitAsync().GetAwaiter().GetResult();
}
_semaphore.Dispose();
_cts.Dispose();
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(true);
}
#endregion
}
आप सोच सकते हैं कि धागा ThreadPool
पर वापस आ गया है, यहां तक कि सोचा कि यह वास्तव में वापसी नहीं है। जब एसिंक्स ऑपरेशन शुरू होता है तो थ्रेड बस अगली कतारबद्ध वस्तु चुनता है।
मेरा सुझाव है कि आप Task.Factory.StartNew
के बजाय Task.Run
देखें टास्क.रन बनाम टास्क.Factory.StartNew।
और TPL DataFlow पर भी एक नज़र डालें. मुझे लगता है कि यह आपकी आवश्यकताओं से मेल खाएगा।
संबंधित सवाल
नए सवाल
c#
C # (उच्चारण "तेज देखें") Microsoft द्वारा विकसित एक उच्च स्तरीय, सांख्यिकीय रूप से टाइप किया हुआ, बहु-प्रतिमान प्रोग्रामिंग भाषा है। C # कोड आमतौर पर Microsoft के .NET परिवार के टूल और रन-टाइम को लक्षित करता है, जिसमें .NET फ्रेमवर्क, .NET कोर और Xamarin अन्य शामिल हैं। C # या C # के औपचारिक विनिर्देश में लिखे गए कोड के बारे में प्रश्नों के लिए इस टैग का उपयोग करें।