Starting pools
(This post is part of a series on writing a process pool manager in Elixir.)
Previously, we got pretty excited about the possibility of starting multiple pools, only to have our dreams crushed by bleak reality. Here’s a quick reminder of the problem:
iex(1)> PoolToy.start_pool(name: :poolio, worker_spec: Doubler, size: 3)
{:ok, #PID<0.195.0>}
iex(2)> PoolToy.start_pool(name: :pooly, worker_spec: Doubler, size: 3)
{:error,
{:shutdown,
{:failed_to_start_child, PoolToy.PoolMan,
{:badarg,
[
{:ets, :new, [:monitors, [:protected, :named_table]], []},
{PoolToy.PoolMan, :init, 2, [file: 'lib/pool_toy/pool_man.ex', line: 70]},
{:gen_server, :init_it, 2, [file: 'gen_server.erl', line: 374]},
{:gen_server, :init_it, 6, [file: 'gen_server.erl', line: 342]},
{:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}
]}}}}
After putting on your thinking hat, you probably noticed that on line 10 above we’re given a pretty big clue regarding the problem at hand. Here’s that line of code (lib/pool_toy/pool_man.ex
):
defp init([], %State{name: _, size: _} = state) do
Process.flag(:trap_exit, true)
send(self(), :start_worker_sup)
monitors = :ets.new(:monitors, [:protected, :named_table])
{:ok, %{state | monitors: monitors}}
end
The name we’re using for the ETS table must be unique, but anytime a new pool starts up it tries registering a new ETS table using the same name. So the first pool is able to register it, and the subsequent pools fail to create an ETS table (because they’re trying with a name that is already in use) which causes them to crash.
The fix is easy: our pool names have to be unique, so let’s just stick them in the ETS table name also. lib/pool_toy/pool_man.ex
:
defp init([], %State{name: name, size: _} = state) do
Process.flag(:trap_exit, true)
send(self(), :start_worker_sup)
monitors = :ets.new(:"monitors_#{name}", [:protected, :named_table])
{:ok, %{state | monitors: monitors}}
end
On line 1, we pattern match the pool’s name, which we then use in the atom for the ETS table name on line 4. Remember that prepending a string with a colon (:
) will in effect create an atom (which is what :ets.new/2
expects).
Stopping pools
Stopping a pool is easy: just call DynamicSupervisor.terminate_child/2
:
iex(1)> {:ok, pid} = PoolToy.start_pool(name: :poolio, worker_spec: Doubler, size: 3)
{:ok, #PID<0.152.0>}
iex(2)> DynamicSupervisor.terminate_child(PoolToy.PoolsSup, pid)
:ok
We start a pool with the first statement, then use the returned pid (corresponding to the pool supervisor) when stopping the pool.
There’s a problem, though: what if the pool needed to be restarted? It’s pid would have changed, and we would no longer be able to stop it!
Let’s try that out. Unfortunately, we can’t try this out by directly killing the pool supervisor in the Observer, because it causes a race condition that wouldn’t happen in reality. In summary, since we’re suddenly killing the pool supervisor it doesn’t have the chance to properly terminate its children, so when it starts up again it crashes because the pool manager process from before is still running (i.e. we get an already started
). You can see that for yourself by starting the IEx session with iex --logger-sasl-reports true -S mix
and killing the pool supervisor in the Observer. Having the old pool manager process still around wouldn’t happen in a normal situation, as in the process of crashing, the pool supervisor would first terminate all of its children.
So let’s instead test our case by triggering too many failures in the pool manager, which will cause the pool supervisor to restart itself.
iex(1)> {:ok, pid} = PoolToy.start_pool(name: :poolio, worker_spec: Doubler, size: 3)
{:ok, #PID<0.152.0>}
iex(2)> for _ <- 1..15 do
...(2)> Process.sleep(100)
...(2)> :poolio |> Process.whereis() |> Process.exit(:kill)
...(2)> end
[true, true, true, true, true, true, true, true, true, true, true, true, true,
true, true]
iex(3)> DynamicSupervisor.terminate_child(PoolToy.PoolsSup, pid)
{:error, :not_found}
On lines 3-6, we’re simply continuously killing the :poolio
process we originally started on line 1. This will trigger the pool supervisor to restart itself in an attempt to fix the error (feel free to open the Observer to see the changing pid).
After all this, when we try to stop the pool on line 10, we get an error because the pid we got on line 1 is no longer alive.
Clearly, we need a better method for indicating which pool we want to stop since we can’t rely on a pid that could change at any given moment.
Identifying pools to stop by name
Using a pid isn’t a good idea, but using the pool’s name would be a great idea to indicate which pool we want to stop. The client already provides it as an argument, and it’s guaranteed to be unique. All we have to do then, is make sure we have some way to match a given pool name to the pid of its pool supervisor process!
Since we don’t want clients to rely on the initial pool supervisor’s pid, let’s no longer return it (lib/pool_toy/pools_sup.ex
):
def start_pool(args) do
case DynamicSupervisor.start_child(@name, {PoolToy.PoolSup, args}) do
{:ok, _} -> :ok
{:error, _} = error -> error
end
end
Now clients will just receive :ok
when a pool is successfully started: there’ll be no pid they might try to use incorrectly.
So how are we going to find a pid based on a pool name? We’ll use the Registry
introduced in Elixir 1.6 since
It allows developers to lookup one or more processes with a given key.
Perfect for our use, right? So let’s get started: in order to be able to use the registry, we’ll need to start in lib/pool_toy/application.ex
:
def start(_type, _args) do
children = [
{Registry, [keys: :unique, name: PoolToy.Registry]},
PoolToy.PoolsSup
]
opts = [strategy: :rest_for_one]
Supervisor.start_link(children, opts)
end
On line 3, we start an instance of Regsitry
, configure it to use :unique
keys (so every key will match 0 or 1 process), and give it the PoolToy.Registry
name. Notice that because the rest of the application is going to depend on the registry, we have the application supervisor start it first. We’ve also changed the strategy on line 7 to :rest_for_one
:
- if the registry crashes, it will lose the information about which pids correspond to the pool names, so we’d need to restart the rest of the application to ensure a consistent state;
- if the pools supervisor crashes, on the other, the registry can continue to do its job.
And since we’ve now got another statically named process, let’s register it (mix.exs
):
def application do
[
mod: {PoolToy.Application, []},
registered: [
PoolToy.PoolsSup,
PoolToy.Registry
],
extra_applications: [:logger]
]
end
Now that we’ve got a registry available for our use, we need to register pool supervisor pids using the pool name as the key, and then look up the pid when we want to stop a given pool by name. So where should we actually register the pool supervisor pid in the registry, well according to the docs
Each entry in the registry is associated to the process that has registered the key. If the process crashes, the keys associated to that process are automatically removed.
So if we register the pool supervisor pid from that process itself, they keys will automatically get cleaned up if it crashes. Nice, right? Here we go (lib/pool_toy/pool_sup.ex
):
def init(args) do
pool_name = Keyword.fetch!(args, :name)
{:ok, _} = Registry.register(PoolToy.Registry, pool_name, self())
children = [
{PoolToy.PoolMan, [{:pool_sup, self()} | args]}
]
Supervisor.init(children, strategy: :one_for_all)
end
We simply grab the pool name from the arguments on line 2, then register it on line 3. Let’s now implement the other part of the equation: looking up the pool supervisor pid when stopping a pool (lib/pool_toy/pools_sup.ex
):
def start_pool(args) do
# edited for brevity
end
def stop_pool(pool_name) when is_atom(pool_name) do
[{_, pool_sup}] = Registry.lookup(PoolToy.Registry, pool_name)
DynamicSupervisor.terminate_child(@name, pool_sup)
end
Line 6 calls Registry.lookup/2
to fetch the value we previously stashed with the pool name key. Note that the return value is an array of matches, where each match is a tuple consisting of “pid that registered the value” and “value registered”. In our case, both will be the same (since the value we’re storing is the pid from which we’re registering it), but might as well use the actual value we’re storing for consistency.
Let’s give our code another spin:
iex(1)> PoolToy.start_pool(name: :poolio, worker_spec: Doubler, size: 3)
:ok
iex(2)> for _ <- 1..15 do
...(2)> Process.sleep(100)
...(2)> :poolio |> Process.whereis() |> Process.exit(:kill)
...(2)> end
[true, true, true, true, true, true, true, true, true, true, true, true, true,
true, true]
iex(3)> PoolToy.PoolsSup.stop_pool(:poolio)
:ok
Awesome, it worked! Before we forget, let’s quickly add a method to our public API to stop pools, as well as check processes in/out (lib/pool_toy.ex
):
defmodule PoolToy do
defdelegate start_pool(args), to: PoolToy.PoolsSup
defdelegate stop_pool(pool_name), to: PoolToy.PoolsSup
end
Great, we now have a functional pool manager. I hope you’ve learned a lot about “thinking in supervised processes” and some Elixir 1.6 features. If you’d like to see more along the same vein, sign up to my mailing list so I can gauge how much interest there is in this type of content: there are still a few things we could explore with PoolToy to further our understanding of process management.