erlang 如何使genserver以频率值Elixir运行

a8jjtwal  于 2022-12-08  发布在  Erlang
关注(0)|答案(1)|浏览(132)

I have seen many GenServer implementations, I am trying to create one with such specifications, But I am not sure its GenServer's use case.
I have a state such as

%{url: "abc.com/jpeg", name: "Camera1", id: :camera_one, frequency: 10}

I have such 100 states, with different values, my use case contains on 5 steps.

  1. Start Each state as a Gen{?}.
  2. Send an HTTP request to that URL.
  3. Get results.
  4. Send another HTTP request with the data came from the first request.
  5. Put the Process to sleep. if the frequency is 10 then for 10 seconds and so on and after 10 seconds It will start from 1 Step again.
    Now when I will start 100 such workers, there are going to be 100 * 2 HTTP requests with frequency. I am not sure about either I am going to use GenServer or GenStage or Flow or even Broadway?
    I am also concerned the HTTP requests won't collapse such as one worker, with a state, will send a request and if the frequency is 1 Second before the first request comes back, the other request would have been sent, would GenServer is capable enough to handle those cases? which I think are called back pressure?
    I have been asking and looking at this use case of so long, I have been guided towards RabbitMQ as well for my use case.
    Any guidance would be so helpful, or any minimal example would be so grateful.

? GenServer/ GenStage / GenStateMachine

jfewjypa

jfewjypa1#

Your problem comes down to reducing the number of concurrent network requests at a given time.
A simple approach would be to have a GenServer which keeps track of the count of outgoing requests. Then, for each client (Up to 200 in your case), it can check to see if there's an open request, and then act accordingly. Here's what the server could look like:

defmodule Throttler do
  use GenServer

  #server
  @impl true
  def init(max_concurrent: max_concurrent) do
    {:ok, %{count: 0, max_concurrent: max_concurrent}}
  end

  @impl true
  def handle_call(:run, _from, %{count: count, max_concurrent: max_concurrent} = state) when count < max_concurrent, do: {:reply, :ok, %{state | count: count + 1}}
  @impl true
  def handle_call(:run, _from, %{count: count, max_concurrent: max_concurrent} = state) when count >= max_concurrent, do: {:reply, {:error, :too_many_requests}, state}

  @impl true
  def handle_call(:finished, _from, %{count: count} = state) when count > 0, do: {:reply, :ok, %{state | count: count - 1}}
end

Okay, so now we have a server where we can call handle_call(pid, :run) and it will tell us whether or not we've exceeded the count. Once the task (getting the URL) is complete, we need to call handle_call(pid, :finished) to let the server know we've completed the task.
On the client side, we can wrap that in a convenient helper function. (Note this is still within the Throttler module so __MODULE__ works)

defmodule Throttler do
  #client
  def start_link(max_concurrent: max_concurrent) when max_concurrent > 0 do
    GenServer.start_link(__MODULE__, [max_concurrent: max_concurrent])
  end
  
  def execute_async(pid, func) do
    GenServer.call(pid, :run)
    |> case do
      :ok ->
        task = Task.async(fn -> 
          try do
            func.()
          after
            GenServer.call(pid, :finished)
          end
        end)
        {:ok, task}
      {:error, reason} -> {:error, reason, func}
    end
  end
end

Here we pass in a function that we want to asynchronously execute on the client side, and do the work of calling :run and :finished on the server side before executing. If it succeeds, we get a task back, otherwise we get a failure.
Putting it all together, and you get code that looks like this:

{:ok, pid} = Throttler.start_link(max_concurrent: 3)
results = Enum.map(1..5, fn num -> 
  Throttler.execute(pid, fn ->
    IO.puts("Running command #{num}")
    :timer.sleep(:5000)
    IO.puts("Sleep complete for #{num}")
    num * 10
  end)
end)
valid_tasks = Enum.filter(results, &(match?({:ok, _func}, &1))) |> Enum.map(&elem(&1, 1))

Now you have a bunch of tasks that either succeeded, or failed and you can act appropriately.
What do you do upon failure? That's the interesting part of backpressure :) The simplest thing would be to have a timeout and retry, under the assumption that you will eventually clear the pressure downstream. Otherwise you can fail out the requests entirely and keep pushing the problem upstream.

相关问题