Preparing for multiple pools
(This post is part of a series on writing a process pool manager in Elixir.)
When we left each other after last post, we had a working single pool that looked something like this:
The thing is, all these processes with pretty names are displayed that way by the Observer because they’re named processes. The problem with named processes is that there can be only one process with a given name within the node: as soon as we try to create another process with the same name, an error will be raised. This means that we currently can’t start another pool, because in doing so we’d be trying to start duplicate instances of PoolSup
, PoolMan
, and WorkerSup
.
“Unnaming” processes
We can’t have multiple pools with processes named the same way, so let’s start by removing the name from the PoolSup
process:
lib/pool_toy/pool_sup.ex
:
defmodule PoolToy.PoolSup do
use Supervisor
def start_link(args) when is_list(args) do
Supervisor.start_link(__MODULE__, args)
end
# edited for brevity
end
For the pool manager, what we’re going to do instead of just removing the process’ name is to name it with a pool name provided upon creation. This will let clients consistently refer to the same pool across restarts (because the pool’s pid would change on each restart).
First things first, we need to pass in the pool name when we start it (lib/pool_toy/application.ex
):
defmodule PoolToy.Application do
use Application
def start(_type, _args) do
children = [
{PoolToy.PoolSup, [name: :poolio, size: 3]}
]
opts = [strategy: :one_for_one]
Supervisor.start_link(children, opts)
end
end
On line 6, we’re now providing the name
key-value pair, which was previously absent. With that in place, we need to forward all arguments in the pool supervisor, so that they reach the pool manager (lib/pool_toy/pool_sup.ex
, line 4):
def init(args) do
children = [
PoolToy.WorkerSup,
{PoolToy.PoolMan, args}
]
Supervisor.init(children, strategy: :one_for_all)
end
Let’s now modify the pool manager. We’ve got a handful of things to do: store its name within its state, properly initialize the state, and update the checkin/checkout functions to take a pool name (lib/pool_toy/pool_man.ex
):
defmodule PoolToy.PoolMan do
use GenServer
defmodule State do
defstruct [
:name, :size, :monitors,
worker_sup: PoolToy.WorkerSup, worker_spec: Doubler, workers: []
]
end
def start_link(args) do
name = Keyword.fetch!(args, :name)
GenServer.start_link(__MODULE__, args, name: name)
end
def checkout(pool) do
GenServer.call(pool, :checkout)
end
def checkin(pool, worker) do
GenServer.cast(pool, {:checkin, worker})
end
def init(args), do: init(args, %State{})
defp init([{:name, name} | rest], %State{} = state) when is_atom(name) do
init(rest, %{state | name: name})
end
defp init([{:name, _name} | _], _state) do
{:stop, {:invalid_args, {:name, "must be an atom"}}}
end
defp init([{:size, size} | rest], %State{} = state) when is_integer(size) and size > 0 do
init(rest, %{state | size: size})
end
defp init([{:size, _size} | _], _state) do
{:stop, {:invalid_args, {:size, "must be a positive integer"}}}
end
defp init([], %State{name: nil}) do
{:stop, {:missing_args, {:name, "atom `name` is required"}}}
end
defp init([], %State{size: nil}) do
{:stop, {:missing_args, {:size, "positive integer `size` is required"}}}
end
defp init(_args, %State{name: _, size: _} = state) do
Process.flag(:trap_exit, true)
send(self(), :start_workers)
monitors = :ets.new(:monitors, [:protected, :named_table])
{:ok, %{state | monitors: monitors}}
end
defp init([_ | t], state), do: init(t, state)
# edited for brevity
end
Here’s a break down of what’s going on:
- line 6: add the
name
to the server’s state;
- lines 12-13: use the provided value to name the process (you’ll note that we made
name
a mandatory value, since we’re using Keyword.fetch!/2
which will raise an error if the key isn’t found);
- lines 16-22: update the checkin/checkout functions to use a pool name;
- lines 24-57: initialize the server’s state by reduction (read more about this technique).
So far, so good: if you run our project with iex -S mix
, it’ll run as expected. But we’re still left with a named WorkerSup
. Let’s get to that next (lib/pool_toy/worker_sup.ex
):
defmodule PoolToy.WorkerSup do
use DynamicSupervisor
def start_link(args) do
DynamicSupervisor.start_link(__MODULE__, args)
end
# edited for brevity
end
Since the processes are no longer named, we should remove them from the list of registered processes (mix.exs
):
def application do
[
mod: {PoolToy.Application, []},
# the `registered` key has been removed
extra_applications: [:logger]
]
end
With that change in place, here’s what greets us in IEx:
08:54:07.526 [error] GenServer :poolio terminating
** (stop) exited in: GenServer.call(PoolToy.WorkerSup, {:start_child, {{Doubler, :start_link, [[]]}, :temporary, 5000, :worker, [Doubler]}}, :infinity)
** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
(elixir) lib/gen_server.ex:914: GenServer.call/3
(pool_toy) lib/pool_toy/pool_man.ex:119: PoolToy.PoolMan.new_worker/2
(pool_toy) lib/pool_toy/pool_man.ex:81: anonymous fn/4 in PoolToy.PoolMan.handle_info/2
(elixir) lib/enum.ex:2967: Enum.reduce_range_inc/4
(pool_toy) lib/pool_toy/pool_man.ex:80: PoolToy.PoolMan.handle_info/2
(stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:711: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: :start_workers
State: %PoolToy.PoolMan.State{monitors: :monitors, name: :poolio, size: 3, worker_spec: Doubler, worker_sup: PoolToy.WorkerSup, workers: []}
The issue is that in lib/pool_toy/pool_man.ex:119
we’re trying to call the (previously) named PoolToy.WorkerSup
process, but the process is not alive or there's no process currently associated with the given name
.
If you look at our pool manager (lib/pool_toy/pool_man.ex
), you can see what’s going on:
defmodule State do
defstruct [
:name, :size, :monitors,
worker_sup: PoolToy.WorkerSup, worker_spec: Doubler, workers: []
]
end
We use the worker_sup
value in our state when communicating with the worker supervisor, but we’re using the name it used to have instead of a pid.
So how do we fix this? We need our pool manager to know which pid the worker supervisor has. Right now, the pool supervisor (lib/pool_toy/pool_sup.ex
) starts the worker supervisor, then the pool manager. So we need some way to find the worker supervisor’s pid from within the pool manager so that we can store it within the pool manager’s state.
One possibility would be to use Supervisor.which_children/1
to get all of the pool supervisor’s children and then iterate through them until we locate the worker supervisor. And of course, we’d need some mechanism to update that information if the worker supervisor crashes (and therefore has a different pid after restarting).
Instead, we’ll have the pool manager be responsible for (re)starting the worker supervisor, since our idea is to have the pool manager be the brains of our operation.
Managing the worker supervisor from within the pool manager
We can use Supervisor.start_child/2
to make a supervisor start a child process. Why bother, when we just said that the pool manager would be responsible for restarting the worker supervisor? Because having it under the pool supervisor gives us a way to cleanly shut everything down and prevent memory leaks.
In order for the pool manager to be able to start the worker supervisor, it needs to know the pid for the pool supervisor (so it can later use it as an argument to start_child/2
). Let’s pass it in as an argument (lib/pool_toy/pool_sup.ex
):
def init(args) do
children = [
{PoolToy.PoolMan, [{:pool_sup, self()} | args]}
]
Supervisor.init(children, strategy: :one_for_all)
end
Now that the pool supervisor’s pid is provided within the keyword list given to PoolMan.start_link/1
, let’s modify our pool manager to add it to the state (lib/pool_toy/pool_man.ex
):
defmodule State do
defstruct [
:name, :size, :pool_sup,
:monitors,
worker_sup: PoolToy.WorkerSup, worker_spec: Doubler, workers: []
]
end
# edited for brevity
defp init([{:size, _size} | _], _state) do
{:stop, {:invalid_args, {:size, "must be a positive integer"}}}
end
defp init([{:pool_sup, pid} | rest], %State{} = state) when is_pid(pid) do
init(rest, %{state | pool_sup: pid})
end
defp init([{:pool_sup, _} | _], _state) do
{:stop, {:invalid_args, {:pool_sup, "must be provided"}}}
end
Again, pretty straightforward: we added :pool_sup
to the state on line 3, then added new function heads on lines 15-21 to initialize the value within the state.
What now? Since the pool manager is going to start the worker supervisor, the pool supervisor should NOT start it. Let’s take care of that (lib/pool_toy/pool_sup.ex
):
def init(args) do
children = [
{PoolToy.PoolMan, [{:pool_sup, self()} | args]}
]
Supervisor.init(children, strategy: :one_for_all)
end
We’ve simply removed PoolToy.WorkerSup
from line 3. Since the worker supervisor is no longer started by the pool supervisor, we need to start it within the pool manager now.
At this time, we already have code to start workers from within the pool manager, so we’ll just replace that to instead start the worker sup (and then the actual workers). Here we go (lib/pool_toy/pool_man.ex
):
defp init([], %State{name: _, size: _} = state) do
Process.flag(:trap_exit, true)
send(self(), :start_worker_sup)
monitors = :ets.new(:monitors, [:protected, :named_table])
{:ok, %{state | monitors: monitors}}
end
# edited for brevity
def handle_info(:start_worker_sup, %State{pool_sup: sup} = state) do
{:ok, worker_sup} = Supervisor.start_child(sup, PoolToy.WorkerSup)
state =
state
|> Map.put(:worker_sup, worker_sup)
|> start_workers()
{:noreply, state}
end
# edited for brevity
def start_workers(%State{worker_sup: sup, worker_spec: spec, size: size} = state) do
workers =
for _ <- 1..size do
new_worker(sup, spec)
end
%{state | workers: workers}
end
defp new_worker(sup, spec) do
# edited for brevity
On line 3, we call :start_worker_sup
instead of :start_workers
. On lines 10-19 we’ve replaced the handler for :start_workers
with one for :start_worker_sup
, which in turn calls the start_workers/1
function defined on lines 23-30 (which is just our previous :start_workers
handler in disguise).
Since we’re now setting the worker supervisor pid in the state, we don’t need a fixed value for it so let’s remove that from our state (lib/pool_toy/pool_man.ex
):
defmodule State do
defstruct [
:name, :size, :pool_sup,
:monitors, :worker_sup,
worker_spec: Doubler, workers: []
]
end
And with that, our code is back to (kind of) working! We no longer have statically named processes, but we’ve still got some issues to iron out: try killing the worker supervisor in the Observer. If you just look at the Observer UI, you’ll be under the impression that everything is working fine: our pool processes get restarted as expected. But if you peek at the IEx console, you’ll see a bunch of errors like:
07:38:24.786 [error] GenServer :poolio terminating
** (MatchError) no match of right hand side value: {:error, {:already_started, #PID<0.181.0>}}
(pool_toy) lib/pool_toy/pool_man.ex:90: PoolToy.PoolMan.handle_info/2
(stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:711: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: :start_worker_sup
State: %PoolToy.PoolMan.State{monitors: :monitors, name: :poolio, pool_sup: #PID<0.149.0>, size: 3, worker_spec: Doubler, worker_sup: PoolToy.WorkerSup, workers: []}
In effect, the pool manager is complaining that when it attempts to start the worker supervisor, there’s already one up and running. What gives? Here’s what’s going on, step by step:
- We kill the worker supervisor;
- The supervisor kills the pool manager (because the supervisor is
one_for_all
);
- The supervisor restarts its child processes (pool manager and worker supervisor);
- The pool manager tries to start a worker supervisor, but a process with that name already exists, raising the error.
It’s important to note in the above that supervisors will take care or restarting all of their children, regardless of when they were started. So the worker supervisor will be restarted by the pool supervisor even though it isn’t among the children declared in the pool supervisor initialization callback.
Our changes so far link
Making the worker supervisor temporary
So how do we fix this? We need a couple of changes to pull this off:
- make the supervisor stop worrying about restarting the worker supervisor;
- get the pool manager to know when the worker supervisor dies, so it can do something about it.
Handling the first point is pretty easy: if a child specification specifies its own restart
option as :temporary
, the supervisor will essentially ignore the process terminating (whether successful or not). Other restart options are :temporary
(which is the default behavior we’ve come to know), and :transient
where the supervisor will restart the process only if it terminates unsuccessfully (e.g. due to an error).
So let’s specify that our worker supervisor should be a temporary process (lib/pool_toy/worker_sup.ex
):
defmodule PoolToy.WorkerSup do
use DynamicSupervisor, restart: :temporary
# edited for brevity
end
Easy breezy, right? When we use
the DynamicSupervisor
, we can provide options that will be used when defining the child specification. Of course, there are other ways to ensure your child specification is setup as you want it to be, as we’ve covered previously.
With that bit out of the way, we still need our pool manager to e made aware of the worker supervisor going down, and enabling it to react to the situation. We can achieve this easily by linking the processes, just like we did before (lib/pool_toy/pool_man.ex
):
def handle_info(:start_worker_sup, %State{pool_sup: sup} = state) do
{:ok, worker_sup} = Supervisor.start_child(sup, PoolToy.WorkerSup)
Process.link(worker_sup)
# edited for brevity
end
We’ve already got our pool manager trapping exits, so that taken care of for us. (Otherwise, we’d need to add that process flag!) But what should we do when the worker supervisor goes down? Just restart it? We’ve decided earlier on that the pool manager and worker supervisor processes were closely related, and if one crashed the other should be killed also.
So in this case, what we’ll do is have the pool manager kill itself if the worker supervisor crashes, since it can’t imagine going in life without its buddy. The pool supervisor notice the pool manager’s death (it won’t react to the worker supervisor death, because we just said it was :temporary
) and restart it, which in turn will start a new worker supervisor.
Making the pool manager stop itself is pretty straightforward: jut return {:stop, reason, new_state}
from one of its callbacks (lib/pool_toy/pool_man.ex
):
def handle_info({:EXIT, pid, reason}, %State{worker_sup: pid} = state) do
{:stop, {:worker_sup_exit, reason}, state}
end
def handle_info({:EXIT, pid, _reason}, %State{workers: workers, monitors: monitors} = state) do
# edited for brevity
end
We match the pid
in the exit message to our worker supervisor’s pid, and forward the exit reason when stopping the pool manager.
Our changes so far
Providing the worker spec dynamically
Our project still relies on a static worker spec. Instead, clients should be able to provide the worker spec they want to use in the pool. Let’s change our code to provide that spec dynamically.
This change will require a 3-step process:
- remove the static
worker_spec
value in the pool manager’s state;
- add clauses to
PoolMan.init/2
to add the worker spec value to the state (and fail if it’s missing);
- add the worker spec value to the arguments provided to the pool supervisor when our OTP app starts.
Here we go:
lib/pool_toy/pool_man.ex
defmodule State do
defstruct [
:name, :size, :worker_spec, :pool_sup,
:monitors, :worker_sup,
workers: []
]
end
# edited for brevity
defp init([{:worker_spec, spec} | rest], %State{} = state) do
init(rest, %{state | worker_spec: spec})
end
defp init([{:pool_sup, _} | _], _state) do
# edited for brevity
end
# edited for brevity
defp init([], %State{worker_spec: nil}) do
{:stop, {:missing_args, {:worker_spec, "child spec `worker_spec` is required"}}}
end
defp init([], %State{name: _, size: _} = state) do
# edited for brevity
lib/pool_toy/application.ex
:
def start(_type, _args) do
children = [
{PoolToy.PoolSup, [name: :poolio, worker_spec: Doubler, size: 3]}
]
opts = [strategy: :one_for_one]
Supervisor.start_link(children, opts)
end
Our changes so far link
Now that our single pool no longer has statically defined names, we start working on enabling multiple pools to be managed concurrently. That’s next!
Would you like to see more Elixir content like this? Sign up to my mailing list so I can gauge how much interest there is in this type of content.