मेरे पास एक ऐसी स्थिति है जो निम्न जैसा दिखता है:

let mutable stopped = false

let runAsync() = async {
    while not stopped do
        let! item = fetchItemToProcessAsync
        match item with
        | Some job -> job |> runJobAsync |> Async.Start
        | None -> do! Async.Sleep(1000)
}

let run() = Async.Start runAsync
let stop() =
    stopped <- true

अब जब स्टॉप विधि कहा जाता है, तो मुझे डीबी से आगे की वस्तुओं को पढ़ना बंद करना होगा और इस फ़ंक्शन से लौटने से पहले उन लोगों की प्रतीक्षा करनी होगी जो वर्तमान में समाप्त हो गए हैं।

इसे निष्पादित करने का श्रेष्ठ तरीका क्या है? मैं काउंटर का उपयोग करने के बारे में सोच रहा था, (इंटरलॉक एपीआई के साथ) और काउंटर 0 तक पहुंचने पर स्टॉप विधि से वापस आ गया।

यदि इसे पूरा करने का कोई वैकल्पिक तरीका है, तो मैं मार्गदर्शन की सराहना करता हूं। मुझे लगता है कि मैं यहां एजेंटों का उपयोग कर सकता हूं, लेकिन मुझे यकीन नहीं है कि एजेंट के साथ इसे पूरा करने का कोई उपलब्ध तरीका है या यदि मुझे अभी भी यह निर्धारित करने के लिए अपना कस्टम तर्क लिखना है कि नौकरियों ने निष्पादन पूरा कर लिया है।

1
Charles Prakash Dasari 18 अगस्त 2011, 10:37

2 जवाब

सबसे बढ़िया उत्तर

अभिनेता-आधारित पैटर्न पर एक नज़र डालें और मेलबॉक्सप्रोसेसर

मूल रूप से आप कल्पना कर सकते हैं कि async-queue. यदि आप चलने की सूची का उपयोग करते हैं (Async.StartChild या < से प्रारंभ a href="http://msdn.microsoft.com/en-us/library/ee821132.aspx" rel="nofollow">Async.StartAsTask) मेलबॉक्सप्रोसेसर के अंदर अपने लूप के लिए पैरामीटर के रूप में आप इनायत से कर सकते हैं प्रतीक्षा या रद्दीकरण टोकन के माध्यम से शटडाउन संभालें)

यहां एक त्वरित नमूना है जिसे मैंने एक साथ रखा है:


type Commands = 
    | RunJob of Async
    | JobDone of int
    | Quit of AsyncReplyChannel

type JobRunner() =
    let processor =
        MailboxProcessor.Start (fun inbox ->
            let rec loop (nextId, jobs) = async {
                let! cmd = inbox.Receive()
                match cmd with
                | Quit cb ->
                    if not (Map.isEmpty jobs) 
                    then async {
                            do! Async.Sleep 100
                            inbox.Post (Quit cb)}
                        |> Async.Start
                        return! loop (nextId, jobs)
                    else 
                        cb.Reply()
                        return ()
                | JobDone id ->
                    return! loop (nextId, jobs |> Map.remove id)
                | RunJob job ->
                    let runJob i = async {
                        do! job
                        inbox.Post (JobDone i)
                    }
                    let! child = Async.StartChild (runJob nextId)
                    return! loop (nextId+1, jobs |> Map.add nextId child)
            }
            loop (0, Map.empty))
    member jr.PostJob(job) = processor.Post (RunJob job)
    member jr.Quit() = processor.PostAndReply(fun cb -> Quit cb)

let postWaitJob (jobRunner : JobRunner) time =
    let job = async {
        do! Async.Sleep time
        printfn "sleept for %d ms" time }
    jobRunner.PostJob job

let testRun() =
    let jr = new JobRunner()
    printfn "starting jobs..."
    [10..-1..1] |> List.iter (fun i -> postWaitJob jr (i*1000))
    printfn "sending quit"
    jr.Quit()
    printfn "done!"

हम्म ... यहां संपादक के साथ कुछ समस्याएं हैं: जब मैं पाइप-बैक ऑपरेटर का उपयोग करता हूं तो यह बहुत सारे कोड को मारता है ... grrr

संक्षिप्त व्याख्या: जैसा कि आप देख सकते हैं कि मैं हमेशा अगली मुफ्त जॉब-आईडी और आईडी का नक्शा-> AsyncChild नौकरियों के साथ आंतरिक लूप प्रदान करता हूं। (आप निश्चित रूप से अन्य/बेहतर समाधानों को लागू कर सकते हैं - इस उदाहरण में मानचित्र आवश्यक नहीं है, लेकिन आप "नौकरी रद्द करें" या इस तरह से जो कुछ भी आदेश दे सकते हैं) नौकरी किए गए संदेश का उपयोग केवल इस मानचित्र से नौकरियों को हटाने के लिए आंतरिक रूप से किया जाता है छोड़ो बस जांचता है कि नक्शा खाली है या नहीं - अगर यह कोई अतिरिक्त काम नहीं है और मेलबॉक्सप्रोसेसर छोड़ देता है (वापसी ()) - अगर यह खाली नहीं है तो एक नया एसिंक-चाइल्ड शुरू किया जाता है जो बस 100ms प्रतीक्षा करता है और फिर क्विट-मैसेज रनजॉब भेजता है इसके बजाय यह आसान है - यह केवल जॉबडोन की पोस्ट के साथ दिए गए जॉब को मेसाबेबॉक्सप्रोसेसर में चेन करता है और रिकर्सिवली अपडेटेड वैल्यू के साथ लूप को कॉल करता है (नेक्स्टआईड एक ऊपर है, और एक नया जॉब पुराने नेक्स्ट आईडी में मैप किया गया है)

1
Random Dev 18 अगस्त 2011, 11:35
उत्तर के लिए धन्यवाद - मैं भी उसी रास्ते पर जा रहा हूं। हालांकि, आपके उदाहरण को देखते हुए मेरे पास दो प्रश्न हैं: 1) जब हम Async.StartChild : Async<'u> के साथ एक एसिंक शुरू करते हैं तो क्या हमें 'u from Async<'u> (e.g. let! x = child or do! child) को पुनः प्राप्त करने के लिए एक और कॉल करने की आवश्यकता नहीं है? 2) यह देखते हुए कि ये कार्य अलग-अलग समय में समाप्त हो सकते हैं, क्या हमें मानचित्र कार्यों के लिए किसी प्रकार के सिंक्रनाइज़ेशन का उपयोग करने की आवश्यकता नहीं है?
 – 
Charles Prakash Dasari
18 अगस्त 2011, 12:08
1
नमस्ते - हाँ यदि आप परिणाम चाहते हैं तो आप एक और लेट नेट करेंगे! बच्चे के लिए - लेकिन जैसा कि मुझे उत्तर की परवाह नहीं है (कोई नहीं है) आपको यह करने की आवश्यकता नहीं है। दूसरा: नहीं, क्योंकि मैं कभी भी एक ही मानचित्र तक नहीं पहुंचता हूं, लेकिन हमेशा नए उत्पन्न करता हूं (मानचित्र अपरिवर्तनीय है) मुझे संगामिति के बारे में ज्यादा सोचने की ज़रूरत नहीं है - इसके अलावा केवल वह धागा जहां लूप वर्तमान में जारी है/चल रहा है, इसे एक्सेस करेगा ( यही कारण है कि मैं जॉबडोन संदेशों को वैश्विक मानचित्र/शब्दकोश या जो कुछ भी बदलने के बजाय प्रोसेसर को पोस्ट करता हूं)
 – 
Random Dev
18 अगस्त 2011, 12:24
मैं सुझाव दूंगा कि आप वहां कुछ "प्रिंटफ" डालें और कोड के साथ खेलें - हमेशा उन परिस्थितियों में मेरी सहायता करें (यदि आप टेस्टरुन फ़ंक्शन चलाते हैं (आप इसे एफ # इंटरैक्टिव में पेस्ट कर सकते हैं) तो आप उदाहरण के लिए देख सकते हैं कि छोड़ने को कहा जाता है किसी भी कार्य के समाप्त होने से पहले लेकिन फ़ंक्शन तभी वापस आएगा जब सभी 10 कार्य पूरे हो जाएंगे)
 – 
Random Dev
18 अगस्त 2011, 12:28
धन्यवाद, ऐसा लगता है कि समाधान मेरे लिए काम करेगा! एक बार फिर धन्यवाद!
 – 
Charles Prakash Dasari
18 अगस्त 2011, 13:00

Fssnip.net पर इस स्निपेट को देखें। यह जेनेरिक जॉब प्रोसेसर है जिसका आप उपयोग कर सकते हैं।

1
Ankur 18 अगस्त 2011, 17:07
लगभग मेरे समाधान के समान - लेकिन मुझे "छोड़ने" चरण में विभाजन पसंद है।
 – 
Random Dev
18 अगस्त 2011, 17:23