मेरे पास एक एपीआई है जो प्रति मिनट 20 अनुरोध स्वीकार करता है, उसके बाद, मुझे इसे पूछने से पहले 1 मिनट तक इंतजार करना होगा। मेरे पास वस्तुओं की एक सूची है (आमतौर पर 1000+) जिनके विवरण मुझे एपीआई से पूछने की ज़रूरत है, मेरा विचार था कि मैं अपनी सूची को 20 वस्तुओं/अनुरोधों में विभाजित करने के लिए Partitioner का उपयोग कर सकता हूं लेकिन जल्द ही मुझे एहसास हुआ Partitioner उस तरह काम नहीं करता है, मेरा दूसरा विचार विभाजन में delay जोड़ रहा था लेकिन वह भी एक बुरा विचार है, मेरी समझ से यह हर अनुरोध के बाद देरी जोड़ता है जिसकी आवश्यकता नहीं है, इसके बजाय, मुझे देरी की आवश्यकता है प्रत्येक Partition के बाद। नीचे मेरा कोड है:

public static async Task<IEnumerable<V>> ForEachAsync<T, V>(this IEnumerable<T> source,
    int degreeOfParallelism, Func<T, Task<V>> body, CancellationToken token,
    [Optional] int delay)
{
    var whenAll = await Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run(async delegate {
            var allResponses = new List<V>();
            using (partition)
                while (partition.MoveNext())
                {
                    allResponses.Add(await body(partition.Current));
                    await Task.Delay(TimeSpan.FromSeconds(delay));
                }
            return allResponses;
        }, token));
    return whenAll.SelectMany(x => x);
}

क्या कोई जानता है कि मैं इसे कैसे पूरा कर सकता हूं?

1
Nemo 21 जिंदा 2021, 13:41
1
क्या अनुरोधों की अवधि को अनुरोध-प्रति-मिनट नीति में गिना जाता है? दूसरे शब्दों में क्या आपको प्रति मिनट 20 अनुरोध शुरू करने की अनुमति है (उनकी अवधि से स्वतंत्र), या आपको पिछले 20 अनुरोधों के पूर्ण होने के बाद एक मिनट तक प्रतीक्षा करनी होगी?
 – 
Theodor Zoulias
21 जिंदा 2021, 14:30
नहीं, ऐसा नहीं है, क्या मायने रखता है; प्रति मिनट 20 कॉल।
 – 
Nemo
21 जिंदा 2021, 14:32
 – 
Theodor Zoulias
21 जिंदा 2021, 15:20
एक साइड नोट के रूप में, इस बात से अवगत रहें कि ForEachAsync का आपका वर्तमान कार्यान्वयन (जो संभवत: यह लेख), अपवादों को गैर-आदर्श तरीके से संभालता है। कारणों को इस उत्तर की टिप्पणियों में समझाया गया है।
 – 
Theodor Zoulias
21 जिंदा 2021, 16:10

1 उत्तर

सबसे बढ़िया उत्तर

यहां एक RateLimiter वर्ग है जिसका उपयोग आप अतुल्यकालिक संचालन की आवृत्ति को सीमित करने के लिए कर सकते हैं। यह RateLimiter वर्ग का एक सरल कार्यान्वयन है जो यह उत्तर।

/// <summary>
/// Limits the number of workflows that can access a resource during the
/// specified time span.
/// </summary>
public class RateLimiter : IDisposable
{
    private readonly SemaphoreSlim _semaphore;
    private readonly TimeSpan _timeUnit;
    private readonly CancellationTokenSource _disposeCts;
    private readonly CancellationToken _disposeToken;
    private bool _disposed;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        if (maxActionsPerTimeUnit < 1)
            throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
        if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
            throw new ArgumentOutOfRangeException(nameof(timeUnit));
        _semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
        _timeUnit = timeUnit;
        _disposeCts = new CancellationTokenSource();
        _disposeToken = _disposeCts.Token;
    }

    public async Task WaitAsync(CancellationToken cancellationToken = default)
    {
        await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        ScheduleSemaphoreRelease();
    }

    private async void ScheduleSemaphoreRelease()
    {
        try { await Task.Delay(_timeUnit, _disposeToken).ConfigureAwait(false); }
        catch (OperationCanceledException) { } // Ignore
        lock (_semaphore) { if (!_disposed) _semaphore.Release(); }
    }

    /// <summary>Call Dispose when you are finished using the RateLimiter.</summary>
    public void Dispose()
    {
        lock (_semaphore)
        {
            if (_disposed) return;
            _semaphore.Dispose();
            _disposed = true;
            _disposeCts.Cancel();
            _disposeCts.Dispose();
        }
    }
}

उपयोग उदाहरण:

List<string> urls = GetUrls();

using var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));

string[] documents = await Task.WhenAll(urls.Select(async url =>
{
    await rateLimiter.WaitAsync();
    return await _httpClient.GetStringAsync(url);
}));

नोट: मैंने एक Dispose विधि जोड़ी है ताकि RateLimiter वर्ग द्वारा आंतरिक रूप से शुरू किए गए अतुल्यकालिक संचालन को रद्द किया जा सके। इस जब आप RateLimiter का उपयोग करना समाप्त कर लें, तो विधि को कॉल किया जाना चाहिए, अन्यथा लंबित अतुल्यकालिक संचालन रोक देगा RateLimiter सक्रिय Task.Delay कार्यों से जुड़े संसाधनों का उपभोग करने के शीर्ष पर, समय पर ढंग से एकत्रित कचरा होने से। मूल बहुत ही सरल लेकिन लीक से हटकर कार्यान्वयन इस उत्तर के दूसरा संशोधन में पाया जा सकता है।


मैं RateLimiter वर्ग का एक वैकल्पिक कार्यान्वयन जोड़ रहा हूं, जो अधिक जटिल है, जो एक Stopwatch के बजाय SemaphoreSlim। इसका लाभ यह है कि इसे डिस्पोजेबल होने की आवश्यकता नहीं है, क्योंकि यह पृष्ठभूमि में छिपे हुए एसिंक्रोनस ऑपरेशंस को लॉन्च नहीं कर रहा है। नुकसान यह है कि WaitAsync विधि CancellationToken तर्क का समर्थन नहीं करती है, और यह कि जटिलता के कारण बग की संभावना अधिक है।

public class RateLimiter
{
    private readonly Stopwatch _stopwatch;
    private readonly Queue<TimeSpan> _queue;
    private readonly int _maxActionsPerTimeUnit;
    private readonly TimeSpan _timeUnit;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        // Arguments validation omitted
        _stopwatch = Stopwatch.StartNew();
        _queue = new Queue<TimeSpan>();
        _maxActionsPerTimeUnit = maxActionsPerTimeUnit;
        _timeUnit = timeUnit;
    }

    public Task WaitAsync()
    {
        var delay = TimeSpan.Zero;
        lock (_stopwatch)
        {
            var currentTimestamp = _stopwatch.Elapsed;
            while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
            {
                _queue.Dequeue();
            }
            if (_queue.Count >= _maxActionsPerTimeUnit)
            {
                var refTimestamp = _queue
                    .Skip(_queue.Count - _maxActionsPerTimeUnit).First();
                delay = refTimestamp - currentTimestamp;
                Debug.Assert(delay >= TimeSpan.Zero);
                if (delay < TimeSpan.Zero) delay = TimeSpan.Zero; // Just in case
            }
            _queue.Enqueue(currentTimestamp + delay + _timeUnit);
        }
        if (delay == TimeSpan.Zero) return Task.CompletedTask;
        return Task.Delay(delay);
    }
}
4
Theodor Zoulias 15 अगस्त 2021, 00:58
1
यह आश्चर्यजनक रूप से अच्छी तरह से काम करता है और मेरे काम को सरल करता है, विभाजन की कोई आवश्यकता नहीं है। धन्यवाद।
 – 
Nemo
22 जिंदा 2021, 08:45