मैं GenStage का उपयोग कर रहा हूं और इसके माध्यम से कुछ प्रक्रियाएं शुरू कर रहा हूं।

कोड बिट जो प्रक्रिया शुरू कर रहा है:

  defp start_snapshot_extractor(config, id) do
    config = Map.put(config, :id, id)
    case Process.whereis(:snapshot_extractor) do
      nil ->
        {:ok, pid} = GenStage.start_link(EvercamMedia.SnapshotExtractor.CloudExtractor, {}, name: :snapshot_extractor)
        pid
      pid -> pid
    end
    |> GenStage.cast({:snapshot_extractor, config})
  end

और इस मॉड्यूल में EvercamMedia.SnapshotExtractor.CloudExtractor

defmodule EvercamMedia.SnapshotExtractor.CloudExtractor do
  use GenStage
  require Logger
  import Commons
  import EvercamMedia.Snapshot.Storage

  @root_dir Application.get_env(:evercam_media, :storage_dir)

  def init(args) do
    {:producer, args}
  end

  def handle_cast({:snapshot_extractor, config}, state) do
    IO.inspect "here "
    _start_extractor(config)
    {:noreply, [], state}
  end
end

अब मुद्दा है। मैं इस प्रक्रिया को अलग-अलग कॉन्फ़िगरेशन के साथ शुरू करने के लिए एक एंडपॉइंट का उपयोग कर रहा हूं, कभी-कभी एक ही कॉन्फ़िगरेशन के साथ।

जब मैं पहली बार प्रक्रिया शुरू करता हूं तो यह "here " प्रिंट करता है और फिर इस प्रक्रिया के पूरा होने के बाद। यह "here " फिर से आउटपुट करता है, ऐसा क्यों है? दोनों प्रक्रियाओं को समानांतर चलाने के बजाय यह पहले वाले के पूरा होने और फिर चलने का इंतजार क्यों कर रहा है?

अद्यतन:

इस तरह पहली विधि कहा जा रहा है

  extraction_pid = spawn(fn ->
    EvercamMedia.UserMailer.snapshot_extraction_started(full_snapshot_extractor, "Cloud")
    start_snapshot_extractor(config)
  end)
  :ets.insert(:extractions, {exid <> "-cloud-#{full_snapshot_extractor.id}", extraction_pid})
0
Junaid Farooq 8 अगस्त 2019, 10:47

1 उत्तर

सबसे बढ़िया उत्तर

प्रत्येक प्रक्रिया द्वारा किसी भी समय संसाधित किए गए मेलबॉक्स से केवल एक ही संदेश होता है, भले ही handle_cast/2 तुरंत वापस आ जाए। मेलबॉक्स कतार में प्रतीक्षारत संदेश तक पहुँचने से पहले प्रक्रिया अंततः पिछले संदेश को संसाधित करना समाप्त कर देगी।

सबसे आसान समाधान के लिए न्यूनतम संख्या में परिवर्तनों की आवश्यकता होगी, संभवतः प्रक्रिया नाम में id जोड़ना होगा ताकि नई id के लिए नई प्रक्रियाओं को जन्म दिया जा सके। साथ ही, आपको GenStage.start_link/3 आपके लिए ऐसा करने के लिए काफी स्मार्ट है।

defp start_snapshot_extractor(config, id) do
  config = Map.put(config, :id, id)
  name = :"snapshot_extractor_#{id}"

  CloudExtractor
  |> GenStage.start_link({}, name: name)
  |> case do
    {:ok, pid} -> pid
    {:error, {:already_started, pid}} -> pid
  end
  |> GenStage.cast({:snapshot_extractor, config})
end
0
Aleksei Matiushkin 8 अगस्त 2019, 11:27