Building a pool manager, part 2.1

Posted on September 8, 2018

 This article is part of a series on writing a process pool manager in Elixir.

Preparing for multiple pools

When we left each other after last post, we had a working single pool that looked something like this:

“Unnaming” processes

We can’t have multiple pools with processes named the same way, so let’s start by removing the name from the PoolSup process:

lib/pool_toy/pool_sup.ex:

defmodule PoolToy.PoolSup do
  use Supervisor

  def start_link(args) when is_list(args) do
    Supervisor.start_link(__MODULE__, args)
  end

  # edited for brevity
end

For the pool manager, what we’re going to do instead of just removing the process’ name is to name it with a pool name provided upon creation. This will let clients consistently refer to the same pool across restarts (because the pool’s pid would change on each restart).

First things first, we need to pass in the pool name when we start it (lib/pool_toy/application.ex):

defmodule PoolToy.Application do
  use Application

  def start(_type, _args) do
    children = [
      {PoolToy.PoolSup, [name: :poolio, size: 3]}
    ]

    opts = [strategy: :one_for_one]
    Supervisor.start_link(children, opts)
  end
end

On line 6, we’re now providing the name key-value pair, which was previously absent. With that in place, we need to forward all arguments in the pool supervisor, so that they reach the pool manager (lib/pool_toy/pool_sup.ex, line 4):

def init(args) do
  children = [
    PoolToy.WorkerSup,
    {PoolToy.PoolMan, args}
  ]

  Supervisor.init(children, strategy: :one_for_all)
end

Let’s now modify the pool manager. We’ve got a handful of things to do: store its name within its state, properly initialize the state, and update the checkin/checkout functions to take a pool name (lib/pool_toy/pool_man.ex):

defmodule PoolToy.PoolMan do
  use GenServer

  defmodule State do
    defstruct [
      :name, :size, :monitors,
      worker_sup: PoolToy.WorkerSup, worker_spec: Doubler, workers: []
    ]
  end

  def start_link(args) do
    name = Keyword.fetch!(args, :name)
    GenServer.start_link(__MODULE__, args, name: name)
  end

  def checkout(pool) do
    GenServer.call(pool, :checkout)
  end

  def checkin(pool, worker) do
    GenServer.cast(pool, {:checkin, worker})
  end

  def init(args), do: init(args, %State{})

  defp init([{:name, name} | rest], %State{} = state) when is_atom(name) do
    init(rest, %{state | name: name})
  end

  defp init([{:name, _name} | _], _state) do
    {:stop, {:invalid_args, {:name, "must be an atom"}}}
  end

  defp init([{:size, size} | rest], %State{} = state) when is_integer(size) and size > 0 do
    init(rest, %{state | size: size})
  end

  defp init([{:size, _size} | _], _state) do
    {:stop, {:invalid_args, {:size, "must be a positive integer"}}}
  end

  defp init([], %State{name: nil}) do
    {:stop, {:missing_args, {:name, "atom `name` is required"}}}
  end

  defp init([], %State{size: nil}) do
    {:stop, {:missing_args, {:size, "positive integer `size` is required"}}}
  end

  defp init(_args, %State{name: _, size: _} = state) do
    Process.flag(:trap_exit, true)
    send(self(), :start_workers)
    monitors = :ets.new(:monitors, [:protected, :named_table])
    {:ok, %{state | monitors: monitors}}
  end

  defp init([_ | t], state), do: init(t, state)

  # edited for brevity
end

Here’s a break down of what’s going on:

  • line 6: add the name to the server’s state;
  • lines 12-13: use the provided value to name the process (you’ll note that we made name a mandatory value, since we’re using Keyword.fetch!/2 which will raise an error if the key isn’t found);
  • lines 16-22: update the checkin/checkout functions to use a pool name;
  • lines 24-57: initialize the server’s state by reduction (read more about this technique).

So far, so good: if you run our project with iex -S mix, it’ll run as expected. But we’re still left with a named WorkerSup. Let’s get to that next (lib/pool_toy/worker_sup.ex):

defmodule PoolToy.WorkerSup do
  use DynamicSupervisor

  def start_link(args) do
    DynamicSupervisor.start_link(__MODULE__, args)
  end

  # edited for brevity
end

Since the processes are no longer named, we should remove them from the list of registered processes (mix.exs):

def application do
    [
      mod: {PoolToy.Application, []},
      # the `registered` key has been removed
      extra_applications: [:logger]
    ]
  end

With that change in place, here’s what greets us in IEx:

08:54:07.526 [error] GenServer :poolio terminating
** (stop) exited in: GenServer.call(PoolToy.WorkerSup, {:start_child, {{Doubler, :start_link, [[]]}, :temporary, 5000, :worker, [Doubler]}}, :infinity)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (elixir) lib/gen_server.ex:914: GenServer.call/3
    (pool_toy) lib/pool_toy/pool_man.ex:119: PoolToy.PoolMan.new_worker/2
    (pool_toy) lib/pool_toy/pool_man.ex:81: anonymous fn/4 in PoolToy.PoolMan.handle_info/2
    (elixir) lib/enum.ex:2967: Enum.reduce_range_inc/4
    (pool_toy) lib/pool_toy/pool_man.ex:80: PoolToy.PoolMan.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: :start_workers
State: %PoolToy.PoolMan.State{monitors: :monitors, name: :poolio, size: 3, worker_spec: Doubler, worker_sup: PoolToy.WorkerSup, workers: []}

The issue is that in lib/pool_toy/pool_man.ex:119 we’re trying to call the (previously) named PoolToy.WorkerSup process, but the process is not alive or there's no process currently associated with the given name.

If you look at our pool manager (lib/pool_toy/pool_man.ex), you can see what’s going on:

defmodule State do
    defstruct [
      :name, :size, :monitors,
      worker_sup: PoolToy.WorkerSup, worker_spec: Doubler, workers: []
    ]
  end

We use the worker_sup value in our state when communicating with the worker supervisor, but we’re using the name it used to have instead of a pid.

So how do we fix this? We need our pool manager to know which pid the worker supervisor has. Right now, the pool supervisor (lib/pool_toy/pool_sup.ex) starts the worker supervisor, then the pool manager. So we need some way to find the worker supervisor’s pid from within the pool manager so that we can store it within the pool manager’s state.

One possibility would be to use Supervisor.which_children/1 to get all of the pool supervisor’s children and then iterate through them until we locate the worker supervisor. And of course, we’d need  some mechanism to update that information if the worker supervisor crashes (and therefore has a different pid after restarting).

Instead, we’ll have the pool manager be responsible for (re)starting the worker supervisor, since our idea is to have the pool manager be the brains of our operation.

Managing the worker supervisor from within the pool manager

We can use Supervisor.start_child/2 to make a supervisor start a child process. Why bother, when we just said that the pool manager would be responsible for restarting the worker supervisor? Because having it under the pool supervisor gives us a way to cleanly shut everything down and prevent memory leaks.

In order for the pool manager to be able to start the worker supervisor, it needs to know the pid for the pool supervisor (so it can later use it as an argument to start_child/2). Let’s pass it in as an argument (lib/pool_toy/pool_sup.ex):

def init(args) do
    children = [
      {PoolToy.PoolMan, [{:pool_sup, self()} | args]}
    ]

    Supervisor.init(children, strategy: :one_for_all)
  end

Now that the pool supervisor’s pid is provided within the keyword list given to PoolMan.start_link/1, let’s modify our pool manager to add it to the state (lib/pool_toy/pool_man.ex):

defmodule State do
  defstruct [
    :name, :size, :pool_sup,
    :monitors,
    worker_sup: PoolToy.WorkerSup, worker_spec: Doubler, workers: []
  ]
end

# edited for brevity

defp init([{:size, _size} | _], _state) do
  {:stop, {:invalid_args, {:size, "must be a positive integer"}}}
end

defp init([{:pool_sup, pid} | rest], %State{} = state) when is_pid(pid) do
  init(rest, %{state | pool_sup: pid})
end

defp init([{:pool_sup, _} | _], _state) do
  {:stop, {:invalid_args, {:pool_sup, "must be provided"}}}
end

Again, pretty straightforward: we added :pool_sup to the state on line 3, then added new function heads on lines 15-21 to initialize the value within the state.

What now? Since the pool manager is going to start the worker supervisor, the pool supervisor should NOT start it. Let’s take care of that (lib/pool_toy/pool_sup.ex):

def init(args) do
  children = [
    {PoolToy.PoolMan, [{:pool_sup, self()} | args]}
  ]

  Supervisor.init(children, strategy: :one_for_all)
end

We’ve simply removed PoolToy.WorkerSup from line 3. Since the worker supervisor is no longer started by the pool supervisor, we need to start it within the pool manager now.

At this time, we already have code to start workers from within the pool manager, so we’ll just replace that to instead start the worker sup (and then the actual workers). Here we go (lib/pool_toy/pool_man.ex):

defp init([], %State{name: _, size: _} = state) do
  Process.flag(:trap_exit, true)
  send(self(), :start_worker_sup)
  monitors = :ets.new(:monitors, [:protected, :named_table])
  {:ok, %{state | monitors: monitors}}
end

# edited for brevity

def handle_info(:start_worker_sup, %State{pool_sup: sup} = state) do
  {:ok, worker_sup} = Supervisor.start_child(sup, PoolToy.WorkerSup)

  state =
    state
    |> Map.put(:worker_sup, worker_sup)
    |> start_workers()

  {:noreply, state}
end

# edited for brevity

def start_workers(%State{worker_sup: sup, worker_spec: spec, size: size} = state) do
  workers =
    for _ <- 1..size do
      new_worker(sup, spec)
    end

  %{state | workers: workers}
end

defp new_worker(sup, spec) do
  # edited for brevity

On line 3, we call :start_worker_sup instead of :start_workers. On lines 10-19 we’ve replaced the handler for :start_workers with one for :start_worker_sup, which in turn calls the start_workers/1 function defined on lines 23-30 (which is just our previous :start_workers handler in disguise).

Since we’re now setting the worker supervisor pid in the state, we don’t need a fixed value for it so let’s remove that from our state (lib/pool_toy/pool_man.ex):

defmodule State do
  defstruct [
    :name, :size, :pool_sup,
    :monitors, :worker_sup,
    worker_spec: Doubler, workers: []
  ]
end

And with that, our code is back to (kind of) working! We no longer have statically named processes, but we’ve still got some issues to iron out: try killing the worker supervisor in the Observer. If you just look at the Observer UI, you’ll be under the impression that everything is working fine: our pool processes get restarted as expected. But if you peek at the IEx console, you’ll see a bunch of errors like:

07:38:24.786 [error] GenServer :poolio terminating
** (MatchError) no match of right hand side value: {:error, {:already_started, #PID<0.181.0>}}
    (pool_toy) lib/pool_toy/pool_man.ex:90: PoolToy.PoolMan.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: :start_worker_sup
State: %PoolToy.PoolMan.State{monitors: :monitors, name: :poolio, pool_sup: #PID<0.149.0>, size: 3, worker_spec: Doubler, worker_sup: PoolToy.WorkerSup, workers: []}

In effect, the pool manager is complaining that when it attempts to start the worker supervisor, there’s already one up and running. What gives? Here’s what’s going on, step by step:

  1. We kill the worker supervisor;
  2. The supervisor kills the pool manager (because the supervisor is one_for_all);
  3. The supervisor restarts its child processes (pool manager and worker supervisor);
  4. The pool manager tries to start a worker supervisor, but a process with that name already exists, raising the error.

It’s important to note in the above that supervisors will take care or restarting all of their children, regardless of when they were started. So the worker supervisor will be restarted by the pool supervisor even though it isn’t among the children declared in the pool supervisor initialization callback.

Our changes so far link

Making the worker supervisor temporary

So how do we fix this? We need a couple of changes to pull this off:

  • make the supervisor stop worrying about restarting the worker supervisor;
  • get the pool manager to know when the worker supervisor dies, so it can do something about it.

Handling the first point is pretty easy: if a child specification specifies its own restart option as :temporary, the supervisor will essentially ignore the process terminating (whether successful or not). Other restart options are :temporary (which is the default behavior we’ve come to know), and :transient where the supervisor will restart the process only if it terminates unsuccessfully (e.g. due to an error).

So let’s specify that our worker supervisor should be a temporary process (lib/pool_toy/worker_sup.ex):

defmodule PoolToy.WorkerSup do
  use DynamicSupervisor, restart: :temporary

  # edited for brevity
end

Easy breezy, right? When we use the DynamicSupervisor, we can provide options that will be used when defining the child specification. Of course, there are other ways to ensure your child specification is setup as you want it to be, as we’ve covered previously.

With that bit out of the way, we still need our pool manager to e made aware of the worker supervisor going down, and enabling it to react to the situation. We can achieve this easily by linking the processes, just like we did before (lib/pool_toy/pool_man.ex):

def handle_info(:start_worker_sup, %State{pool_sup: sup} = state) do
  {:ok, worker_sup} = Supervisor.start_child(sup, PoolToy.WorkerSup)
  Process.link(worker_sup)

  # edited for brevity
end

We’ve already got our pool manager trapping exits, so that taken care of for us. (Otherwise, we’d need to add that process flag!) But what should we do when the worker supervisor goes down? Just restart it? We’ve decided earlier on that the pool manager and worker supervisor processes were closely related, and if one crashed the other should be killed also.

So in this case, what we’ll do is have the pool manager kill itself if the worker supervisor crashes, since it can’t imagine going in life without its buddy. The pool supervisor notice the pool manager’s death (it won’t react to the worker supervisor death, because we just said it was :temporary) and restart it, which in turn will start a new worker supervisor.

Making the pool manager stop itself is pretty straightforward: jut return {:stop, reason, new_state} from one of its callbacks (lib/pool_toy/pool_man.ex):

def handle_info({:EXIT, pid, reason}, %State{worker_sup: pid} = state) do
  {:stop, {:worker_sup_exit, reason}, state}
end

def handle_info({:EXIT, pid, _reason}, %State{workers: workers, monitors: monitors} = state) do
  # edited for brevity
end

We match the pid in the exit message to our worker supervisor’s pid, and forward the exit reason when stopping the pool manager.

Our changes so far

Providing the worker spec dynamically

Our project still relies on a static worker spec. Instead, clients should be able to provide the worker spec they want to use in the pool. Let’s change our code to provide that spec dynamically.

This change will require a 3-step process:

  1. remove the static worker_spec value in the pool manager’s state;
  2. add clauses to PoolMan.init/2 to add the worker spec value to the state (and fail if it’s missing);
  3. add the worker spec value to the arguments provided to the pool supervisor when our OTP app starts.

Here we go:

lib/pool_toy/pool_man.ex

defmodule State do
  defstruct [
    :name, :size, :worker_spec, :pool_sup,
    :monitors, :worker_sup,
    workers: []
  ]
end

# edited for brevity

defp init([{:worker_spec, spec} | rest], %State{} = state) do
  init(rest, %{state | worker_spec: spec})
end

defp init([{:pool_sup, _} | _], _state) do
  # edited for brevity
end

# edited for brevity

defp init([], %State{worker_spec: nil}) do
  {:stop, {:missing_args, {:worker_spec, "child spec `worker_spec` is required"}}}
end

defp init([], %State{name: _, size: _} = state) do
  # edited for brevity

lib/pool_toy/application.ex:

def start(_type, _args) do
  children = [
    {PoolToy.PoolSup, [name: :poolio, worker_spec: Doubler, size: 3]}
  ]

  opts = [strategy: :one_for_one]
  Supervisor.start_link(children, opts)
end

Our changes so far link

Now that our single pool no longer has statically defined names, we start working on enabling multiple pools to be managed concurrently. That’s next!

 This article is part of a series on writing a process pool manager in Elixir.


Would you like to see more Elixir content like this? Sign up to my mailing list so I can gauge how much interest there is in this type of content.