OTP 21 introduces handle_continue callback to GenServer

When using GenServers, it is sometimes necessary to perform long-running code during the initialization. The common pattern for handling this is to send a message to self() from within the init callback, and then perform the long-running work within the handle_info function for the message we just sent:

defmodule Server do
  use GenServer

  def init(arg) do
    send(self(), :finish_init)
    state = do_quick_stuff(arg)
    {:ok, state}
  end

  def handle_info(:finish_init, state) do
    state = do_slow_stuff(state)
    {:noreply, state}
  end

  # edited for brevity
end

By performing only quick operations within init/1 itself, the GenServer can return sooner to the caller (i.e. the caller won’t be blocked unnecessarily). Then, the GenServer will process the :finish_init message it sent itself and finish its initialization in parallel. Future messages it will receive will then be processed with a fully initialized state.

There is a risk for race conditions, however: clients could have sent messages to the GenServer that arrive before the :finish_init call. This case is even more likely to happen with named GenServers, as clients could send messages at any time (using only the process’ name as they won’t require to know the pid).

OTP 21 introduces the handle_continue/2 callback. With this concept, when a :continue response is returned by a callback, the corresponding handle_continue/2 function is called, but no messages from the inbox are processed until it has been executed.

Therefore, when using OTP 21, the code above could be changed to

defmodule Server do
  use GenServer

  def init(arg) do
    state = do_quick_stuff(arg)
    {:ok, state, {:continue, :finish_init}}
  end

  def handle_continue(:finish_init, state) do
    state = do_slow_stuff(state)
    {:noreply, state}
  end

  # edited for brevity
end

This way, messages in the process’ mailbox will be ignored until `handle_continue(:finish_init, state)` is done executing and our state is properly initialized. No race conditions, yay!

This is an OTP feature, therefore being able to use it depends on the OTP version you’re running. However, if you want to be able to @impl the handle_continue/2 callback implementation, you’ll need to be using Elixir 1.7 as the GenServer behaviour from earlier versions doesn’t specify this callback.

Posted in Elixir | Comments Off on OTP 21 introduces handle_continue callback to GenServer

PoolToy: A (toy) process pool manager in Elixir 1.6 (part 2.3)

Starting pools

(This post is part of a series on writing a process pool manager in Elixir.)

Previously, we got pretty excited about the possibility of starting multiple pools, only to have our dreams crushed by bleak reality. Here’s a quick reminder of the problem:

iex(1)> PoolToy.start_pool(name: :poolio, worker_spec: Doubler, size: 3)
{:ok, #PID<0.195.0>}
iex(2)> PoolToy.start_pool(name: :pooly, worker_spec: Doubler, size: 3) 
{:error,
 {:shutdown,
  {:failed_to_start_child, PoolToy.PoolMan,
   {:badarg,
    [
      {:ets, :new, [:monitors, [:protected, :named_table]], []},
      {PoolToy.PoolMan, :init, 2, [file: 'lib/pool_toy/pool_man.ex', line: 70]},
      {:gen_server, :init_it, 2, [file: 'gen_server.erl', line: 374]},
      {:gen_server, :init_it, 6, [file: 'gen_server.erl', line: 342]},
      {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}
    ]}}}}

After putting on your thinking hat, you probably noticed that on line 10 above we’re given a pretty big clue regarding the problem at hand. Here’s that line of code (lib/pool_toy/pool_man.ex):

defp init([], %State{name: _, size: _} = state) do
  Process.flag(:trap_exit, true)
  send(self(), :start_worker_sup)
  monitors = :ets.new(:monitors, [:protected, :named_table])
  {:ok, %{state | monitors: monitors}}
end

The name we’re using for the ETS table must be unique, but anytime a new pool starts up it tries registering a new ETS table using the same name. So the first pool is able to register it, and the subsequent pools fail to create an ETS table (because they’re trying with a name that is already in use) which causes them to crash.

The fix is easy: our pool names have to be unique, so let’s just stick them in the ETS table name also. lib/pool_toy/pool_man.ex:

defp init([], %State{name: name, size: _} = state) do
  Process.flag(:trap_exit, true)
  send(self(), :start_worker_sup)
  monitors = :ets.new(:"monitors_#{name}", [:protected, :named_table])
  {:ok, %{state | monitors: monitors}}
end

On line 1, we pattern match the pool’s name, which we then use in the atom for the ETS table name on line 4. Remember that prepending a string with a colon (:) will in effect create an atom (which is what :ets.new/2 expects).

Our changes so far

Stopping pools

Stopping a pool is easy: just call DynamicSupervisor.terminate_child/2:

iex(1)> {:ok, pid} = PoolToy.start_pool(name: :poolio, worker_spec: Doubler, size: 3)
{:ok, #PID<0.152.0>}
iex(2)> DynamicSupervisor.terminate_child(PoolToy.PoolsSup, pid)
:ok

We start a pool with the first statement, then use the returned pid (corresponding to the pool supervisor) when stopping the pool.

There’s a  problem, though: what if the pool needed to be restarted? It’s pid would have changed, and we would no longer be able to stop it!

Let’s try that out. Unfortunately, we can’t try this out by directly killing the pool supervisor in the Observer, because it causes a race condition that wouldn’t happen in reality. In summary, since we’re suddenly killing the pool supervisor it doesn’t have the chance to properly terminate its children, so when it starts up again it crashes because the pool manager process from before is still running (i.e. we get an already started). You can see that for yourself by starting the IEx session with iex --logger-sasl-reports true -S mix and killing the pool supervisor in the Observer. Having the old pool manager process still around wouldn’t happen in a normal situation, as in the process of crashing, the pool supervisor would first terminate all of its children.

So let’s instead test our case by triggering too many failures in the pool manager, which will cause the pool supervisor to restart itself.

iex(1)> {:ok, pid} = PoolToy.start_pool(name: :poolio, worker_spec: Doubler, size: 3)
{:ok, #PID<0.152.0>}
iex(2)> for _ <- 1..15 do
...(2)>   Process.sleep(100)
...(2)>   :poolio |> Process.whereis() |> Process.exit(:kill)
...(2)> end
[true, true, true, true, true, true, true, true, true, true, true, true, true,
 true, true]
iex(3)> DynamicSupervisor.terminate_child(PoolToy.PoolsSup, pid)
{:error, :not_found}

On lines 3-6, we’re simply continuously killing the :poolio process we originally started on line 1. This will trigger the pool supervisor to restart itself in an attempt to fix the error (feel free to open the Observer to see the changing pid).

After all this, when we try to stop the pool on line 10, we get an error because the pid we got on line 1 is no longer alive.

Clearly, we need a better method for indicating which pool we want to stop since we can’t rely on a pid that could change at any given moment.

Identifying pools to stop by name

Using a pid isn’t a good idea, but using the pool’s name would be a great idea to indicate which pool we want to stop. The client already provides it as an argument, and it’s guaranteed to be unique. All we have to do then, is make sure we have some way to match a given pool name to the pid of its pool supervisor process!

Since we don’t want clients to rely on the initial pool supervisor’s pid, let’s no longer return it (lib/pool_toy/pools_sup.ex):

def start_pool(args) do
  case DynamicSupervisor.start_child(@name, {PoolToy.PoolSup, args}) do
    {:ok, _} -> :ok
    {:error, _} = error -> error
  end
end

Now clients will just receive :ok when a pool is successfully started: there’ll be no pid they might try to use incorrectly.

So how are we going to find a pid based on a pool name? We’ll use the Registry introduced in Elixir 1.6 since

It allows developers to lookup one or more processes with a given key.

Perfect for our use, right? So let’s get started: in order to be able to use the registry, we’ll need to start in lib/pool_toy/application.ex:

def start(_type, _args) do
  children = [
    {Registry, [keys: :unique, name: PoolToy.Registry]},
    PoolToy.PoolsSup
  ]

  opts = [strategy: :rest_for_one]
  Supervisor.start_link(children, opts)
end

On line 3, we start an instance of Regsitry, configure it to use :unique keys (so every key will match 0 or 1 process), and give it the PoolToy.Registry name. Notice that because the rest of the application is going to depend on the registry, we have the application supervisor start it first. We’ve also changed the strategy on line 7 to :rest_for_one:

  • if the registry crashes, it will lose the information about which pids correspond to the pool names, so we’d need to restart the rest of the application to ensure a consistent state;
  • if the pools supervisor crashes, on the other, the registry can continue to do its job.

And since we’ve now got another statically named process, let’s register it (mix.exs):

def application do
  [
    mod: {PoolToy.Application, []},
    registered: [
      PoolToy.PoolsSup,
      PoolToy.Registry
    ],
    extra_applications: [:logger]
  ]
end

Now that we’ve got a registry available for our use, we need to register pool supervisor pids using the pool name as the key, and then look up the pid when we want to stop a given pool by name. So where should we actually register the pool supervisor pid in the registry, well according to the docs

Each entry in the registry is associated to the process that has registered the key. If the process crashes, the keys associated to that process are automatically removed.

So if we register the pool supervisor pid from that process itself, they keys will automatically get cleaned up if it crashes. Nice, right? Here we go (lib/pool_toy/pool_sup.ex):

def init(args) do
  pool_name = Keyword.fetch!(args, :name)
  {:ok, _} = Registry.register(PoolToy.Registry, pool_name, self())

  children = [
    {PoolToy.PoolMan, [{:pool_sup, self()} | args]}
  ]

  Supervisor.init(children, strategy: :one_for_all)
end

We simply grab the pool name from the arguments on line 2, then register it on line 3. Let’s now implement the other part of the equation: looking up the pool supervisor pid when stopping a pool (lib/pool_toy/pools_sup.ex):

def start_pool(args) do
  # edited for brevity
end

def stop_pool(pool_name) when is_atom(pool_name) do
  [{_, pool_sup}] = Registry.lookup(PoolToy.Registry, pool_name)
  DynamicSupervisor.terminate_child(@name, pool_sup)
end

Line 6 calls Registry.lookup/2 to fetch the value we previously stashed with the pool name key. Note that the return value is an array of matches, where each match is a tuple consisting of “pid that registered the value” and “value registered”. In our case, both will be the same (since the value we’re storing is the pid from which we’re registering it), but might as well use the actual value we’re storing for consistency.

Let’s give our code another spin:

iex(1)> PoolToy.start_pool(name: :poolio, worker_spec: Doubler, size: 3)
:ok
iex(2)> for _ <- 1..15 do
...(2)>       Process.sleep(100)
...(2)>       :poolio |> Process.whereis() |> Process.exit(:kill)
...(2)> end
[true, true, true, true, true, true, true, true, true, true, true, true, true,
 true, true]
iex(3)> PoolToy.PoolsSup.stop_pool(:poolio)
:ok

Awesome, it worked! Before we forget, let’s quickly add a method to our public API to stop pools, as well as check processes in/out (lib/pool_toy.ex):

defmodule PoolToy do
  defdelegate start_pool(args), to: PoolToy.PoolsSup
  defdelegate stop_pool(pool_name), to: PoolToy.PoolsSup
end

Our changes so far

Great, we now have a functional pool manager. I hope you’ve learned a lot about “thinking in supervised processes” and some Elixir 1.6 features. If you’d like to see more along the same vein, sign up to my mailing list so I can gauge how much interest there is in this type of content: there are still a few things we could explore with PoolToy to further our understanding of process management.

Posted in Elixir | Comments Off on PoolToy: A (toy) process pool manager in Elixir 1.6 (part 2.3)

PoolToy: A (toy) process pool manager in Elixir 1.6 (part 2.2)

Implementing multiple pools

(This post is part of a series on writing a process pool manager in Elixir.)

In the last post, we did  most of the work enabling us to have multiple pools within PoolToy. But before we jump into the implementation, let’s take a moment to think about fault tolerance: how should pool B be affected if pool A has a problem and crashes?

Well, the whole point of PoolToy is to keep those 2 pools independent: a crash in one of them shouldn’t affect the other in any way. At the same time, we’ve decreed that the pool manager and work supervisor are co-dependent and should therefore crash together: that’s why we’ve set the pool supervisor’s restart strategy to :one_for_all (if the pool manager dies, we need the pool supervisor to kill off the worker supervisor, so a :one_for_one strategy wouldn’t be a good fit).

Right, so within a pool, we need a :one_for_all strategy, but between pools we need :one_for_one. Therefore, we’ll need to introduce another level of supervision: a pools supervisor (note the plural: not to be confused with the singular pool supervisor) will supervise the various individual pool supervisors.

Adding PoolsSup

Here’s our new PoolsSup module (lib/pool_toy/pools_sup.ex):

defmodule PoolToy.PoolsSup do
  use DynamicSupervisor

  @name __MODULE__

  def start_link(opts) do
    DynamicSupervisor.start_link(__MODULE__, opts, name: @name)
  end

  def init(_opts) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end
end

Since we’re now going to handle starting pools through the pools supervisor, that’s what our application module should start instead of a pool supervisor (lib/pool_toy/application.ex):

defmodule PoolToy.Application do
  use Application

  def start(_type, _args) do
    children = [
      PoolToy.PoolsSup
    ]

    opts = [strategy: :one_for_one]
    Supervisor.start_link(children, opts)
  end
end

On line 6, we’re no longer directly starting a pool supervisor, but start the pools supervisor. And since we’ve now got a statically named process, let’s go ahead and register that on lines 4-6 (mix.exs):

def application do
  [
    mod: {PoolToy.Application, []},
    registered: [
      PoolToy.PoolsSup
    ],
    extra_applications: [:logger]
  ]
end

Let’s check everything works properly: start an IEx session with iex -S mix. If you take a look at the Observer (launch it with :observer.start(), remember?) you’ll that no pools were started, because our application file no longer launches a pool on startup, it only starts the pools supervisor.

But we can take our code for a spin by starting a new pool ourselves: the pools supervisor is a DynamicSupervisor, so we can use DynamicSupervisor.start_child/2 to do so:

DynamicSupervisor.start_child(PoolToy.PoolsSup,
    {PoolToy.PoolSup, [name: :poolio, worker_spec: Doubler, size: 3]})

Calling that manually is a bit of a pain, especially since we’re expecting the client to magically know he’s dealing with a DynamicSupervisor. Let’s clean that up by adding PoolsSup.start_pool/1 (lib/pool_toy/pools_sup.ex):

def start_pool(args) do
  DynamicSupervisor.start_child(@name, {PoolToy.PoolSup, args})
end

def init(_opts) do
  # edited for brevity

If you now start a new IEx session, you can start a new pool with

iex(1)> PoolToy.PoolsSup.start_pool(name: :poolio, worker_spec: Doubler, size: 3)

Let’s make that even better by exposing our public API through the PoolToy module (lib/pool_toy.ex):

defmodule PoolToy do
  defdelegate start_pool(args), to: PoolToy.PoolsSup
end

Note the generated code previously in the PoolToy module has been completely removed. As you can tell from the docs (and probably its name), defdelegate/2 simply defines a function on the current module that will delegate the call to a function defined in another module.

Our changes so far

And now we can start pools in an even cleaner fashion:

iex(1)> PoolToy.start_pool(name: :poolio, worker_spec: Doubler, size: 3)

Great! Let’s start another pool, since it’s so easy!

iex(2)> PoolToy.start_pool(name: :pooly, worker_spec: Doubler, size: 3)
{:error,
 {:shutdown,
  {:failed_to_start_child, PoolToy.PoolMan,
   {:badarg,
    [
      {:ets, :new, [:monitors, [:protected, :named_table]], []},
      {PoolToy.PoolMan, :init, 2, [file: 'lib/pool_toy/pool_man.ex', line: 70]},
      {:gen_server, :init_it, 2, [file: 'gen_server.erl', line: 374]},
      {:gen_server, :init_it, 6, [file: 'gen_server.erl', line: 342]},
      {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}
    ]}}}}

Dang. It was too good to be true, wasn’t it? Try figuring out the problem on your own, and join me in next post to fix multiple pool creation.


Would you like to see more Elixir content like this? Sign up to my mailing list so I can gauge how much interest there is in this type of content.

Posted in Elixir | Comments Off on PoolToy: A (toy) process pool manager in Elixir 1.6 (part 2.2)

PoolToy: A (toy) process pool manager in Elixir 1.6 (part 2.1)

Preparing for multiple pools

(This post is part of a series on writing a process pool manager in Elixir.)

When we left each other after last post, we had a working single pool that looked something like this:

The thing is, all these processes with pretty names are displayed that way by the Observer because they’re named processes. The problem with named processes is that there can be only one process with a given name within the node: as soon as we try to create another process with the same name, an error will be raised. This means that we currently can’t start another pool, because in doing so we’d be trying to start duplicate instances of PoolSup, PoolMan, and WorkerSup.

“Unnaming” processes

We can’t have multiple pools with processes named the same way, so let’s start by removing the name from the PoolSup process:

lib/pool_toy/pool_sup.ex:

defmodule PoolToy.PoolSup do
  use Supervisor

  def start_link(args) when is_list(args) do
    Supervisor.start_link(__MODULE__, args)
  end

  # edited for brevity
end

For the pool manager, what we’re going to do instead of just removing the process’ name is to name it with a pool name provided upon creation. This will let clients consistently refer to the same pool across restarts (because the pool’s pid would change on each restart).

First things first, we need to pass in the pool name when we start it (lib/pool_toy/application.ex):

defmodule PoolToy.Application do
  use Application

  def start(_type, _args) do
    children = [
      {PoolToy.PoolSup, [name: :poolio, size: 3]}
    ]

    opts = [strategy: :one_for_one]
    Supervisor.start_link(children, opts)
  end
end

On line 6, we’re now providing the name key-value pair, which was previously absent. With that in place, we need to forward all arguments in the pool supervisor, so that they reach the pool manager (lib/pool_toy/pool_sup.ex, line 4):

def init(args) do
  children = [
    PoolToy.WorkerSup,
    {PoolToy.PoolMan, args}
  ]

  Supervisor.init(children, strategy: :one_for_all)
end

Let’s now modify the pool manager. We’ve got a handful of things to do: store its name within its state, properly initialize the state, and update the checkin/checkout functions to take a pool name (lib/pool_toy/pool_man.ex):

defmodule PoolToy.PoolMan do
  use GenServer

  defmodule State do
    defstruct [
      :name, :size, :monitors,
      worker_sup: PoolToy.WorkerSup, worker_spec: Doubler, workers: []
    ]
  end

  def start_link(args) do
    name = Keyword.fetch!(args, :name)
    GenServer.start_link(__MODULE__, args, name: name)
  end

  def checkout(pool) do
    GenServer.call(pool, :checkout)
  end

  def checkin(pool, worker) do
    GenServer.cast(pool, {:checkin, worker})
  end

  def init(args), do: init(args, %State{})

  defp init([{:name, name} | rest], %State{} = state) when is_atom(name) do
    init(rest, %{state | name: name})
  end

  defp init([{:name, _name} | _], _state) do
    {:stop, {:invalid_args, {:name, "must be an atom"}}}
  end

  defp init([{:size, size} | rest], %State{} = state) when is_integer(size) and size > 0 do
    init(rest, %{state | size: size})
  end

  defp init([{:size, _size} | _], _state) do
    {:stop, {:invalid_args, {:size, "must be a positive integer"}}}
  end

  defp init([], %State{name: nil}) do
    {:stop, {:missing_args, {:name, "atom `name` is required"}}}
  end

  defp init([], %State{size: nil}) do
    {:stop, {:missing_args, {:size, "positive integer `size` is required"}}}
  end

  defp init(_args, %State{name: _, size: _} = state) do
    Process.flag(:trap_exit, true)
    send(self(), :start_workers)
    monitors = :ets.new(:monitors, [:protected, :named_table])
    {:ok, %{state | monitors: monitors}}
  end

  defp init([_ | t], state), do: init(t, state)

  # edited for brevity
end

Here’s a break down of what’s going on:

  • line 6: add the name to the server’s state;
  • lines 12-13: use the provided value to name the process (you’ll note that we made name a mandatory value, since we’re using Keyword.fetch!/2 which will raise an error if the key isn’t found);
  • lines 16-22: update the checkin/checkout functions to use a pool name;
  • lines 24-57: initialize the server’s state by reduction (read more about this technique).

So far, so good: if you run our project with iex -S mix, it’ll run as expected. But we’re still left with a named WorkerSup. Let’s get to that next (lib/pool_toy/worker_sup.ex):

defmodule PoolToy.WorkerSup do
  use DynamicSupervisor

  def start_link(args) do
    DynamicSupervisor.start_link(__MODULE__, args)
  end

  # edited for brevity
end

Since the processes are no longer named, we should remove them from the list of registered processes (mix.exs):

def application do
    [
      mod: {PoolToy.Application, []},
      # the `registered` key has been removed
      extra_applications: [:logger]
    ]
  end

With that change in place, here’s what greets us in IEx:

08:54:07.526 [error] GenServer :poolio terminating
** (stop) exited in: GenServer.call(PoolToy.WorkerSup, {:start_child, {{Doubler, :start_link, [[]]}, :temporary, 5000, :worker, [Doubler]}}, :infinity)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (elixir) lib/gen_server.ex:914: GenServer.call/3
    (pool_toy) lib/pool_toy/pool_man.ex:119: PoolToy.PoolMan.new_worker/2
    (pool_toy) lib/pool_toy/pool_man.ex:81: anonymous fn/4 in PoolToy.PoolMan.handle_info/2
    (elixir) lib/enum.ex:2967: Enum.reduce_range_inc/4
    (pool_toy) lib/pool_toy/pool_man.ex:80: PoolToy.PoolMan.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: :start_workers
State: %PoolToy.PoolMan.State{monitors: :monitors, name: :poolio, size: 3, worker_spec: Doubler, worker_sup: PoolToy.WorkerSup, workers: []}

The issue is that in lib/pool_toy/pool_man.ex:119 we’re trying to call the (previously) named PoolToy.WorkerSup process, but the process is not alive or there's no process currently associated with the given name.

If you look at our pool manager (lib/pool_toy/pool_man.ex), you can see what’s going on:

defmodule State do
    defstruct [
      :name, :size, :monitors,
      worker_sup: PoolToy.WorkerSup, worker_spec: Doubler, workers: []
    ]
  end

We use the worker_sup value in our state when communicating with the worker supervisor, but we’re using the name it used to have instead of a pid.

So how do we fix this? We need our pool manager to know which pid the worker supervisor has. Right now, the pool supervisor (lib/pool_toy/pool_sup.ex) starts the worker supervisor, then the pool manager. So we need some way to find the worker supervisor’s pid from within the pool manager so that we can store it within the pool manager’s state.

One possibility would be to use Supervisor.which_children/1 to get all of the pool supervisor’s children and then iterate through them until we locate the worker supervisor. And of course, we’d need  some mechanism to update that information if the worker supervisor crashes (and therefore has a different pid after restarting).

Instead, we’ll have the pool manager be responsible for (re)starting the worker supervisor, since our idea is to have the pool manager be the brains of our operation.

Managing the worker supervisor from within the pool manager

We can use Supervisor.start_child/2 to make a supervisor start a child process. Why bother, when we just said that the pool manager would be responsible for restarting the worker supervisor? Because having it under the pool supervisor gives us a way to cleanly shut everything down and prevent memory leaks.

In order for the pool manager to be able to start the worker supervisor, it needs to know the pid for the pool supervisor (so it can later use it as an argument to start_child/2). Let’s pass it in as an argument (lib/pool_toy/pool_sup.ex):

def init(args) do
    children = [
      {PoolToy.PoolMan, [{:pool_sup, self()} | args]}
    ]

    Supervisor.init(children, strategy: :one_for_all)
  end

Now that the pool supervisor’s pid is provided within the keyword list given to PoolMan.start_link/1, let’s modify our pool manager to add it to the state (lib/pool_toy/pool_man.ex):

defmodule State do
  defstruct [
    :name, :size, :pool_sup,
    :monitors,
    worker_sup: PoolToy.WorkerSup, worker_spec: Doubler, workers: []
  ]
end

# edited for brevity

defp init([{:size, _size} | _], _state) do
  {:stop, {:invalid_args, {:size, "must be a positive integer"}}}
end

defp init([{:pool_sup, pid} | rest], %State{} = state) when is_pid(pid) do
  init(rest, %{state | pool_sup: pid})
end

defp init([{:pool_sup, _} | _], _state) do
  {:stop, {:invalid_args, {:pool_sup, "must be provided"}}}
end

Again, pretty straightforward: we added :pool_sup to the state on line 3, then added new function heads on lines 15-21 to initialize the value within the state.

What now? Since the pool manager is going to start the worker supervisor, the pool supervisor should NOT start it. Let’s take care of that (lib/pool_toy/pool_sup.ex):

def init(args) do
  children = [
    {PoolToy.PoolMan, [{:pool_sup, self()} | args]}
  ]

  Supervisor.init(children, strategy: :one_for_all)
end

We’ve simply removed PoolToy.WorkerSup from line 3. Since the worker supervisor is no longer started by the pool supervisor, we need to start it within the pool manager now.

At this time, we already have code to start workers from within the pool manager, so we’ll just replace that to instead start the worker sup (and then the actual workers). Here we go (lib/pool_toy/pool_man.ex):

defp init([], %State{name: _, size: _} = state) do
  Process.flag(:trap_exit, true)
  send(self(), :start_worker_sup)
  monitors = :ets.new(:monitors, [:protected, :named_table])
  {:ok, %{state | monitors: monitors}}
end

# edited for brevity

def handle_info(:start_worker_sup, %State{pool_sup: sup} = state) do
  {:ok, worker_sup} = Supervisor.start_child(sup, PoolToy.WorkerSup)

  state =
    state
    |> Map.put(:worker_sup, worker_sup)
    |> start_workers()

  {:noreply, state}
end

# edited for brevity

def start_workers(%State{worker_sup: sup, worker_spec: spec, size: size} = state) do
  workers =
    for _ <- 1..size do
      new_worker(sup, spec)
    end

  %{state | workers: workers}
end

defp new_worker(sup, spec) do
  # edited for brevity

On line 3, we call :start_worker_sup instead of :start_workers. On lines 10-19 we’ve replaced the handler for :start_workers with one for :start_worker_sup, which in turn calls the start_workers/1 function defined on lines 23-30 (which is just our previous :start_workers handler in disguise).

Since we’re now setting the worker supervisor pid in the state, we don’t need a fixed value for it so let’s remove that from our state (lib/pool_toy/pool_man.ex):

defmodule State do
  defstruct [
    :name, :size, :pool_sup,
    :monitors, :worker_sup,
    worker_spec: Doubler, workers: []
  ]
end

And with that, our code is back to (kind of) working! We no longer have statically named processes, but we’ve still got some issues to iron out: try killing the worker supervisor in the Observer. If you just look at the Observer UI, you’ll be under the impression that everything is working fine: our pool processes get restarted as expected. But if you peek at the IEx console, you’ll see a bunch of errors like:

07:38:24.786 [error] GenServer :poolio terminating
** (MatchError) no match of right hand side value: {:error, {:already_started, #PID<0.181.0>}}
    (pool_toy) lib/pool_toy/pool_man.ex:90: PoolToy.PoolMan.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: :start_worker_sup
State: %PoolToy.PoolMan.State{monitors: :monitors, name: :poolio, pool_sup: #PID<0.149.0>, size: 3, worker_spec: Doubler, worker_sup: PoolToy.WorkerSup, workers: []}

In effect, the pool manager is complaining that when it attempts to start the worker supervisor, there’s already one up and running. What gives? Here’s what’s going on, step by step:

  1. We kill the worker supervisor;
  2. The supervisor kills the pool manager (because the supervisor is one_for_all);
  3. The supervisor restarts its child processes (pool manager and worker supervisor);
  4. The pool manager tries to start a worker supervisor, but a process with that name already exists, raising the error.

It’s important to note in the above that supervisors will take care or restarting all of their children, regardless of when they were started. So the worker supervisor will be restarted by the pool supervisor even though it isn’t among the children declared in the pool supervisor initialization callback.

Our changes so far link

Making the worker supervisor temporary

So how do we fix this? We need a couple of changes to pull this off:

  • make the supervisor stop worrying about restarting the worker supervisor;
  • get the pool manager to know when the worker supervisor dies, so it can do something about it.

Handling the first point is pretty easy: if a child specification specifies its own restart option as :temporary, the supervisor will essentially ignore the process terminating (whether successful or not). Other restart options are :temporary (which is the default behavior we’ve come to know), and :transient where the supervisor will restart the process only if it terminates unsuccessfully (e.g. due to an error).

So let’s specify that our worker supervisor should be a temporary process (lib/pool_toy/worker_sup.ex):

defmodule PoolToy.WorkerSup do
  use DynamicSupervisor, restart: :temporary

  # edited for brevity
end

Easy breezy, right? When we use the DynamicSupervisor, we can provide options that will be used when defining the child specification. Of course, there are other ways to ensure your child specification is setup as you want it to be, as we’ve covered previously.

With that bit out of the way, we still need our pool manager to e made aware of the worker supervisor going down, and enabling it to react to the situation. We can achieve this easily by linking the processes, just like we did before (lib/pool_toy/pool_man.ex):

def handle_info(:start_worker_sup, %State{pool_sup: sup} = state) do
  {:ok, worker_sup} = Supervisor.start_child(sup, PoolToy.WorkerSup)
  Process.link(worker_sup)

  # edited for brevity
end

We’ve already got our pool manager trapping exits, so that taken care of for us. (Otherwise, we’d need to add that process flag!) But what should we do when the worker supervisor goes down? Just restart it? We’ve decided earlier on that the pool manager and worker supervisor processes were closely related, and if one crashed the other should be killed also.

So in this case, what we’ll do is have the pool manager kill itself if the worker supervisor crashes, since it can’t imagine going in life without its buddy. The pool supervisor notice the pool manager’s death (it won’t react to the worker supervisor death, because we just said it was :temporary) and restart it, which in turn will start a new worker supervisor.

Making the pool manager stop itself is pretty straightforward: jut return {:stop, reason, new_state} from one of its callbacks (lib/pool_toy/pool_man.ex):

def handle_info({:EXIT, pid, reason}, %State{worker_sup: pid} = state) do
  {:stop, {:worker_sup_exit, reason}, state}
end

def handle_info({:EXIT, pid, _reason}, %State{workers: workers, monitors: monitors} = state) do
  # edited for brevity
end

We match the pid in the exit message to our worker supervisor’s pid, and forward the exit reason when stopping the pool manager.

Our changes so far

Providing the worker spec dynamically

Our project still relies on a static worker spec. Instead, clients should be able to provide the worker spec they want to use in the pool. Let’s change our code to provide that spec dynamically.

This change will require a 3-step process:

  1. remove the static worker_spec value in the pool manager’s state;
  2. add clauses to PoolMan.init/2 to add the worker spec value to the state (and fail if it’s missing);
  3. add the worker spec value to the arguments provided to the pool supervisor when our OTP app starts.

Here we go:

lib/pool_toy/pool_man.ex

defmodule State do
  defstruct [
    :name, :size, :worker_spec, :pool_sup,
    :monitors, :worker_sup,
    workers: []
  ]
end

# edited for brevity

defp init([{:worker_spec, spec} | rest], %State{} = state) do
  init(rest, %{state | worker_spec: spec})
end

defp init([{:pool_sup, _} | _], _state) do
  # edited for brevity
end

# edited for brevity

defp init([], %State{worker_spec: nil}) do
  {:stop, {:missing_args, {:worker_spec, "child spec `worker_spec` is required"}}}
end

defp init([], %State{name: _, size: _} = state) do
  # edited for brevity

lib/pool_toy/application.ex:

def start(_type, _args) do
  children = [
    {PoolToy.PoolSup, [name: :poolio, worker_spec: Doubler, size: 3]}
  ]

  opts = [strategy: :one_for_one]
  Supervisor.start_link(children, opts)
end

Our changes so far link

Now that our single pool no longer has statically defined names, we start working on enabling multiple pools to be managed concurrently. That’s next!


Would you like to see more Elixir content like this? Sign up to my mailing list so I can gauge how much interest there is in this type of content.

Posted in Elixir | Comments Off on PoolToy: A (toy) process pool manager in Elixir 1.6 (part 2.1)

PoolToy: A (toy) process pool manager in Elixir 1.6 (part 1.9)

Managing a single pool (continued)

(This post is part of a series on writing a process pool manager in Elixir.)

Figuring out the shenanigans

Let’s get on the same page (after last post) as to what is happening when we kill workers in our pool. After starting an IEx session with iex -S mix, starting the Observer with :observer.start(), and killing a worker, I’ve got the following state in my pool manager:

I started with 3 workers, but after killing one of them (0.138.0), I now have 4 workers (one too many). In addition, of the two new workers that appeared (0.204.0 and 0.205.0), only one of them was added to the pool manager’s list of available workers. You’ll notice that it was also the one to be linked. The other worker is just floating around, and will never be assignable to handle work.

Now, if I kill the process with pid 0.204.0, it gets replaced by a single new process. If on the other hand I kill 0.205.0, it will be replaced by two new processes. What gives?

Well, with the changes implemented in the last post, we’ve got the pool manager linked to all workers, and replacing them with new instances when they fail. But remember who else replaces worker processes as they die? That’s right: the worker supervisor. So when we killed a worker process that was spawned during initialization (and therefore linked to the pool manager), it triggered 2 responses: the worker supervisor started a new worker instances (because that’s just what it does), and the pool manager also started a new worker (which it then linked).

The new worker spawned by the worker supervisor isn’t linked by the pool manager (as you can see in the image above: no orange line), so we end up with different results when we kill these newly spawned workers:

  • kill the worker created by the worker supervisor, and it simply gets replaced by a new worker: we don’t end up with any additional workers;
  • kill the worker created by the pool manager, and we end up with an additional worker in our tree: one worker was created by the worker supervisor in response to a worker’s death, the other was created by the pool manager because it saw that a linked worker terminated.

This obviously isn’t the behavior we want in our application. What should we do? We need to tell the worker supervisor to not restart its children.

Supervisors start their children according to a child spec (that we met in back in part 1.2), which has a :restart optional value (see docs). This value dictates whether that particular child will get restarted when it terminates.

First, let’s check our current worker supervisor state by double-clicking on it within the Observer: in the “State” tab, you can see a list of children, each of which indicates permanent as the restart strategy.

Customizing the child spec

We’ve found the problem cause, let’s go for the solution. One possibility would be to ensure that our Doubler worker will always return a child spec with a :temporary restart strategy. We won’t go that route here, because down the road we want PoolToy users to be able to provide their own worker implementations to be used in the pool. So it’s safer if we ensure that workers have temporary restart strategies instead of relying on behavior implemented by others.

Here’s our current code starting workers (lib/pool_toy/pool_man.ex):

defp new_worker(sup, spec) do
  {:ok, pid} = PoolToy.WorkerSup.start_worker(sup, spec)
  true = Process.link(pid)
  pid
end

We tell the dynamic worker supervisor to start a new instance of the spec argument. If you check the code, you’ll see that the value of that argument is always Doubler. How does that become a child specification? Well, Doubler invokes use GenServer: one of its effects is adding a child_spec/1 function to the module. Then when start_child/2 is given a second argument that isn’t a child spec (a child spec being simply a map with certain mandatory key/value pairs), it will convert it into a child spec.

If given a tuple with module and args like {Doubler, [foo: :bar]} it will call Doubler.child_spec([foo: :bar]) to get the child spec instance it needs (the args will then be forwarded to start_link/1). If given simply a module name such as Doubler it will call Doubler.child_spec([]) (so in effect Doubler is equivalent to {Doubler, []}).

In fact, you can try that in an IEx session with iex -S mix:

iex(1)> Doubler.child_spec([])
%{id: Doubler, start: {Doubler, :start_link, [[]]}}

So that explains how our dynamic worker supervisor ends up with the child spec it requires in the code above. And we know that we can provide a valid child spec directly to start_child/2, so maybe we could try:

defp new_worker(sup, spec) do
  {:ok, pid} = PoolToy.WorkerSup.start_worker(sup,
       %{id: spec, start: {spec, :start_link, [[]]}, restart: :temporary})
  true = Process.link(pid)
  pid
end

The problem there is that although that will technically work, it’ll be very brittle. What happens when Doubler changes how it defines its child specification (e.g. by adding default arguments provided to start_link/1)? Our code will disregard the change and will break, that’s what. Event worse, our code won’t work with other ways of specifying spec (e.g. a tuple, or the full map).

The better way would be to generate a child spec from the spec argument, and provide an override value for the :restart attribute. Fortunately, there’s a function for that: Supervisor.child_spec/2. It takes a child spec, tuple of module and args, or module as the first arg, and a keyword list of overrides as the second arg. So we can change our code to (lib/pool_toy/pool_man.ex):

defp new_worker(sup, spec) do
  child_spec = Supervisor.child_spec(spec, restart: :temporary)
  {:ok, pid} = PoolToy.WorkerSup.start_worker(sup, child_spec)
  true = Process.link(pid)
  pid
end

But hold on a minute: if the worker supervisor is no longer restarting terminated processes (since its children are now :temporary and the pool manager handles the restarting), is it even useful anymore? Shouldn’t we refactor and remove the supervisor? Nope, because one aspect of a supervisor is indeed that it handles restarting terminated children, but the other is that it provides a central location to terminate all of its children and prevent memory leaks. In other words, our worker supervisor will allow us to terminate all worker processes by simply terminating the worker supervisor itself. This will be particularly handy in the future, when we want to enable clients to stop pools.

Our changes so far

And with that last change above, we’ve finally got a functional worker pool. Here it is in use:

iex(1)> w1 = PoolToy.PoolMan.checkout()
#PID<0.137.0>
iex(2)> w2 = PoolToy.PoolMan.checkout()
#PID<0.138.0>
iex(3)> w3 = PoolToy.PoolMan.checkout()
#PID<0.139.0>
iex(4)> w4 = PoolToy.PoolMan.checkout()
:full
iex(5)> PoolToy.PoolMan.checkin(w2)
:ok
iex(6)> w4 = PoolToy.PoolMan.checkout()
#PID<0.138.0>
iex(7)> Doubler.compute(w4, 3)
Doubling 3
6
iex(8)> 1..9 |> Enum.map(& Doubler.compute(w1, &1))
Doubling 1
Doubling 2
Doubling 3
Doubling 4
Doubling 5
Doubling 6
Doubling 7
Doubling 8
Doubling 9
[2, 4, 6, 8, 10, 12, 14, 16, 18]

Since we’ve limited our pool size to 3 doublers, we know that at any one time only 3 doubling processes may happen simultaneously: that’s what we wanted by design to prevent becoming overloaded.

We’ve got a single functional pool, but there’s more for us to do (and learn!): clients should be able to start/stop as many pools as they want, with variable sizes and worker modules. We could also enhance our pool functionality to handle bursts of activity by allowing the pool to start a certain number of “extra” overflow workers that will shut down after some cool down time when demand returns to normal levels. We can also allow our clients to decide what to do if no worker is available: return a response immediately saying the pool is at capacity, or queue the client’s demand and execute when a worker becomes available.

Along the way, we’ll find that some choices made so far weren’t such a great design. But worry not! We’ll learn why and how our design will need to evolve to accommodate the new desired requirements.


Would you like to see more Elixir content like this? Sign up to my mailing list so I can gauge how much interest there is in this type of content.

Posted in Elixir | Comments Off on PoolToy: A (toy) process pool manager in Elixir 1.6 (part 1.9)

PoolToy: A (toy) process pool manager in Elixir 1.6 (part 1.8)

Managing a single pool (continued)

(This post is part of a series on writing a process pool manager in Elixir.)

First, let’s point out the problem in our current pool design as we left after last post: our pool manager’s state gets out of whack when a checked out worker dies. Here’s how to witness that with your own eyes (after running iex -S mix from your project’s root folder, of course):

iex(1)> PoolToy.PoolMan.checkout()
#PID<0.137.0>
iex(2)> :observer.start()
:ok

I’ve checked out a process with pid 0.137.0 (note that your value could of course be different) and started the Observer. In there, if I look at the pool manager’s state (quick reminder: in the “Applications” tab, click on pool_toy in the list on the left, then double-click on PoolMan, then go to the “State” tab in the new window), I can see that I now have only 2 available workers, which is correct:

Additionally, if I go to the “Table Viewer” tab in the Observer and double click on the “monitors” table, I can see that we have an entry for a monitor on the process with pid 0.137.0:

Back in the “Applications” tab (where you might have to select pool_toy on the left again), let’s kill the checked out process by right-clicking on it and selecting the last entry in the menu: “kill process”. Click ok on the popup asking for the exit reason (thereby using the default :kill) and you’ll see our application quickly replaces the downed worker in the pool with a fresh new instance (it has a different pid) ready to accept a workload.

For completeness, let’s quickly point out that what we’ve just accomplished in the Observer (killing a specific process) can of course be achieved in code:

iex(1)> w = PoolToy.PoolMan.checkout()
#PID<0.137.0>
iex(2)> Process.exit(w, :kill)
true

And if you consult the docs, you’ll see that the default :kill reason proposed by the Observer is special: it forces the process to die unconditionally (more on that below).

But let’s get back to our earlier train of thought, and turn our attention back to the Observer: if we look at the pool manager’s state and the monitors table, what do we see? Only 2 workers are considered as available by the pool manager, and we will see an entry for the dead worker in the monitors table.

If we return to the “Applications” tab and kill one of the available workers, it gets even worse. Look at the pool manager’s state now, and you can see that it’s still the same. That means that we’ve got pids in there that are dead (and therefore shouldn’t be given to clients when they request a worker), and alive workers that aren’t in the list of available workers. Oh my!

So why is this happening? As you’ve probably guessed, it’s because we’re not handling worker death properly (or some might say we’re not handling it at all): we’ve got a supervisor watching over the workers and restarting them as they die, but the pool manager is never made aware of this, so it can’t react and update its state.

How can we fix it? I know: we can have the worker supervisor monitor its children, and notify the pool manager when one of them dies, along with the pid of the new worker it started to replace the dead one! NO. No no no. No. Supervisors only supervise.

Supervisors only supervise

Supervisors are the cornerstone of OTP reliability, so they must be rock solid. In turn, this means that any code we can put elsewhere doesn’t belong in the supervisor: less code means fewer bugs, which equals more robust supervisors.

Ok, so we can’t have the supervisor monitoring the processes, but someone’s got to do it, right? Let’s have the brains of our operation do it: the pool manager. But let’s think this through a bit before heading straight back to mashing our keyboards.

Our pool manager already makes the dynamic worker supervisor start workers. This way the pool manager knows what the worker pids are. So it’ll be pretty easy to monitor (or link!) them there. This way, when a worker dies, the pool manager gets a message and can react to it (start a new worker, monitor/link the new worker, and clean up the state so its consistent). Sounds like a plan, let’s get back to coding!

Handling worker death

Our pool manager needs to know when workers die. Should we use monitors or links for this? To determine which is more appropriate, we once again need to think about how the pool manager and worker deaths should affect one another.

If process A monitors process B and B dies, process A will receive a :DOWN message as we saw in part 1.6. But if A dies, nothing special happens.

If process A links process B and B dies, process A will be killed. Reciprocally, if A dies, B will be killed.

In our case, we’d like a mix of the two: we want the pool manager to know when a worker dies (so it can start a new worker and add it to the pool), but if the pool manager dies all workers should be killed (because the best way to ensure the pool manager’s state is correct is to start fresh with all new workers that will all be available).

As this is a common use case, OTP provides the ability to trap exits which in Elixir is achieved via Process.flag/2. To learn more about this, let’s see the Erlang docs we’re referred to:

When trap_exit is set to true, exit signals arriving to a process are converted to {‘EXIT’, From, Reason} messages, which can be received as ordinary messages.

Why is this useful? Let’s contrast it to the usual case for linked processes (again from the Erlang docs):

The default behaviour when a process receives an exit signal with an exit reason other than normal, is to terminate and in turn emit exit signals with the same exit reason to its linked processes. An exit signal with reason normal is ignored.

When a process is trapping exits, it does not terminate when an exit signal is received. Instead, the signal is transformed into a message {‘EXIT’,FromPid,Reason}, which is put into the mailbox of the process, just like a regular message.

An exception to the above is if the exit reason is kill, that is if exit(Pid,kill) has been called. This unconditionally terminates the process, regardless of if it is trapping exit signals.

So normally, if process A is linked to process B, when process B crashes it will send an exit signal to process A. Process A, upon receiving the exit signal will terminate for the same reason B did.

However, if we make process A trap exits by calling Process.flag(:trap_exit, true) from within the process, all exit signals it is sent will instead be converted into :EXIT messages that will sit in the process’ mailbox until they are treated. The exception in the last quoted paragraph documents the exception: if the exit reason is :kill, a process cannot trap the exit signal and will be terminated unconditionally. In other words, you can still force a process that trap exits to terminate: just call Process.exit(pid, :kill).

Back to our use case: by having the pool manager link to workers and trap exits, we’ll achieve exactly what we want. If the pool manager goes down, it will take all workers down with it, but if a worker dies, the pool manager won’t be killed: it’ll instead receive a :DOWN message that it can process as it pleases (in our case to replace the dead worker in the pool).

Linking worker processes

Let’s start by refactoring worker starting to extract it to it’s own function (lib/pool_toy/pool_man.ex):

def handle_info(:start_workers, %State{size: size} = state) do
  workers =
    for _ <- 1..size do
      new_worker(PoolToy.WorkerSup, Doubler)
    end

  {:noreply, %{state | workers: workers}}
end

We’ve simply shifted the worker creation to within a new_worker/2 function on line 4. Here’s that function definition (lib/pool_toy/pool_man.ex):

def handle_info(msg, state) do
  # editd for brevity
end

defp new_worker(sup, spec) do
  {:ok, pid} = PoolToy.WorkerSup.start_worker(sup, spec)
  true = Process.link(pid)
  pid
end

Most of that code is the same, we’ve only added line 7 where we call Process.link/1 which will link the calling process to the one given. That handles the linking part, but we still need to handle the exit trapping part. That’s pretty easy (lib/pool_toy/pool_man.ex):

def init(size) do
  Process.flag(:trap_exit, true)
  send(self(), :start_workers)
  monitors = :ets.new(:monitors, [:protected, :named_table])
  {:ok, %State{size: size, monitors: monitors}}
end

Line 2 is the only addition required to make the magic happen: when a worker dies, our pool manager will no longer die with it, since it’ll now receive a special message instead.

Let’s take a quick peek at the Observer again:

You can see that orange lines have appeared: they indicate link relationships. In the above image, we can see that PoolMan is linked to every worker process.

Handling a terminated worker

Let’s first write the code to handle a terminated worker. If a worker dies, we need to remove it from the list of available workers (if it even is in that list), and add a new worker to take its place. Of course, the new worker won’t have been checked out, so it’ll be added to the list of available workers (lib/pool_toy/pool_man.ex):

defp handle_idle_worker(%State{workers: workers} = state, idle_worker) when is_pid(idle_worker) do
  # edited for brevity
end

defp handle_worker_exit(%State{workers: workers} = state, pid) do
  w = workers |> Enum.reject(& &1 == pid)
  %{state | workers: [new_worker(PoolToy.WorkerSup, Doubler) | w]}
end

Right, so now let’s actually handle that mysterious :EXIT message that we’ll receive when a worker dies (because the pool manager is trapping exits) in lib/pool_toy/pool_man.ex:

def handle_info({:DOWN, ref, :process, _, _}, %State{workers: workers, monitors: monitors} = state) do
  # edited for brevity
end

def handle_info({:EXIT, pid, _reason}, %State{workers: workers, monitors: monitors} = state) do
  case :ets.lookup(monitors, pid) do
    [{pid, ref}] ->
      true = Process.demonitor(ref)
      true = :ets.delete(monitors, pid)
      {:noreply, state |> handle_worker_exit(pid)}
    [] ->
      if workers |> Enum.member?(pid) do
        {:noreply, state |> handle_worker_exit(pid)}
      else
        {:noreply, state}
      end
  end
end

The :EXIT messages will contain the pid of the process that died (line 5). Since we’re only linking to worker processes, we know that this pid is going to be a worker. We can then check (on line 6) whether the worker was checked out, and act appropriately.

If the worker was checked out (line 7):

  • stop monitoring the associated client, as we no longer care if it dies (line 8);
  • remove the monitor reference from our ETS monitors table (line 9);
  • handle the terminated worker and return the updated state (line 10).

If the worker wasn’t checked out (line 11):

  • if the worker is in the list of available workers, we handle the terminated worker and return the updated state (lines 12-13);
  • otherwise, we do nothing and return the state as is.

A quick refactor

We’ve got some “magic” module values strewn about our code: PoolToy.WorkerSup as the name for the worker supervisor process, and Doubler as the spec used for worker instances. Ideally, the @name module property we use to name the pool manager process should also get refactored, but as it is handled differently (being passed to GenServer.start_link/1 as opposed to used within the GenServer callback implementations), we’ll leave it aside for simplicity. Don’t worry, we’ll get to it when implementing multiple pools.

Hard coding process names everywhere works for now, but our end goal is to allow users to have multiple pools running at the same time. Our current approach won’t work for that: names must be unique. So let’s clean up our act by moving these values that are specific to a pool instance into the pool manager’s state. We’ll still hard code them for now, but at least that way they’ll be easier to change when the time comes.

Let’s start by adding these values to the state (lib/pool_toy/pool_man.ex):

defmodule State do
  defstruct [
    :size, :monitors,
    worker_sup: PoolToy.WorkerSup, worker_spec: Doubler, workers: []
  ]
end

And then we still need to refactor the rest of our code to work with this change (lib/pool_toy/pool_man.ex):

def handle_info(:start_workers, %State{worker_sup: sup, worker_spec: spec, size: size} = state) do
  workers =
    for _ <- 1..size do
      new_worker(sup, spec)
    end

  {:noreply, %{state | workers: workers}}
end

# edited for brevity

defp handle_worker_exit(%State{workers: workers, worker_sup: sup, worker_spec: spec} = state, pid) do
  w = workers |> Enum.reject(& &1 == pid)
  %{state | workers: [new_worker(sup, spec) | w]}
end

Our changes so far

Awesome! Start an IEx session with iex -S mix, start the Observer, and kill some workers. Does everything work as expected? Experiment a little, and try to figure out what could be going on… And move on to the next post!


Would you like to see more Elixir content like this? Sign up to my mailing list so I can gauge how much interest there is in this type of content.

Posted in Elixir | Comments Off on PoolToy: A (toy) process pool manager in Elixir 1.6 (part 1.8)

PoolToy: A (toy) process pool manager in Elixir 1.6 (part 1.7)

Managing a single pool (continued)

(This post is part of a series on writing a process pool manager in Elixir.)

We’ve now realized (in last post) that storing the references to the client monitors in a map was a bit naive: although it works fine to locate the relevant information when a client crashes, it’s not so great to handle the “worker check in” case (where we have to locate the monitor reference by worker pid instead).

You know what’s good for finding data with varying conditions? Databases. Erlang does provide a full-blown database (called Mnesia) which would actually be counter-productive for our use case: since we’ll be storing monitors, if our entire application crashes we want the monitor information to be wiped also (since it will no longer be relevant). Since Mnesia persists data to disk, we’d have to erase the obsolete monitor data ourselves in case of crash.

Instead, we can use an in-memory “database” also provided by Erlang, called ETS (for Erlang Term Storage).

Erlang Term Storage basics

First, here’s a brief description of what an ETS table is:

Data is organized as a set of dynamic tables, which can store tuples. Each table is created by a process. When the process terminates, the table is automatically destroyed. Every table has access rights set at creation.

The first element in the stored tuple will be that tuple’s key (default behavior, can be modified). Retrieving data via this key is very efficient, whereas finding data by matching the rest of the tuple requires the entire table to be searched which can be slow if the table is very large.

Now that introductions have been made, let’s get our hands dirty. The first thing we’ll need is to create a new ETS table instance when our pool manager process initializes itself. Of course, we’ll also need to change the default value of the :monitors attribute in our state, since we won’t be using maps anymore (lib/pool_toy/pool_man.ex):

defmodule State do
  defstruct [:size, :monitors, workers: []]
end

# edited for brevity

def init(size) do
  send(self(), :start_workers)
  monitors = :ets.new(:monitors, [:protected, :named_table])
  {:ok, %State{size: size, monitors: monitors}}
end

(Your keen eyes will probably have noticed that we switched the positions of the :monitors and :workers attributes in the struct on line 2. This is because values that are initialized must come last.)

The magic mainly happens on line 9, where we use new/2 to create the ETS table we’ll be using to track client monitors. The first argument is the (unique) name we give to the table (which must be an atom), followed by the options. Here, the options we give are for our convenience when inspecting the application state with Observer: they are not strictly necessary in our case, just nice to have. :protected means that only the owner process can write to the table, but other processes can read from it: this means that later on we’ll be able to look at the table contents from within Observer. :protected is a default setting, and was only included here for explicitness.

:named_table works much like named processes: if the option is present, the table will be registered under the given name (first argument mentioned above, which in this case is :monitors). This will enable us to work more conveniently with the table from IEx if we wanted to as we can use the table name instead of the reference that was returned upon table creation. Having a named table also means that it will be displayed under that name in the Observer, which will be convenient in itself.

Finally, note that when using the :named_table option, the return value of new/2 will simply be the table name, instead of the table reference (the reference can be obtained from the name with whereis/1). As we’re using a named table in our application, we technically wouldn’t need to store the value returned by new/2. However, since we’re using a named table for convenience instead of design imperatives, our code will consistently use the value returned by new/2 to interact with the ETS table. This way, our code will work even if/when the table is no longer named (feel free to try that later!).

With our monitors table available for use, let’s update the check out code (lib/pool_toy/pool_man.ex):

def handle_call(:checkout, {from, _}, %State{workers: [worker | rest]} = state) do
  %State{monitors: monitors} = state
  monitor(monitors, {worker, from})
  {:reply, worker, %{state | workers: rest}}
end

# edited for brevity

def handle_info(msg, state) do
  # edited for brevity
end

defp monitor(monitors, {worker, client}) do
  ref = Process.monitor(client)
  :ets.insert(monitors, {worker, ref})
  ref
end

Easy, peasy: we just insert the {worker, ref} into our ETS table.

We also have to update the code handling dying client processes. So what happens when a monitored process dies (docs)?

Once the monitored process dies, a message is delivered to the monitoring process in the shape of:

{:DOWN, ref, :process, object, reason}

We’ll get a special message with the reference to the monitor, an object value (which will be the client’s pid in our case), and the reason the client exited (which we’re not interested in, because it makes no difference to us: regardless of why the client exited, we need to check its worker back in).

So let’s handle this new message in our code (lib/pool_toy/pool_man.ex):

def handle_info({:DOWN, ref, :process, _, _}, %State{workers: workers, monitors: monitors} = state) do
  case :ets.match(monitors, {:"$0", ref}) do
    [[pid]] ->
      true = :ets.delete(monitors, pid)
      {:noreply, state |> handle_idle_worker(pid)}
    [] ->
      {:noreply, state}
  end
end

So what does match/2 on line 2 do? It looks through the table, trying to find matches to the provided pattern. What’s a pattern?

A pattern is a term that can contain:

  • Bound parts (Erlang terms)
  • ‘_’ that matches any Erlang term
  • Pattern variables ‘$N’, where N=0,1,…

So in the case above, line 2 says: in the monitors table, return the array of results that have ref as the second element in the tuple, and put the first element in the tuple as the first element in the results array. Note that :"$0" is shorthand for “turn the "$0" string into an atom”: Erlang/ETS expects pattern variables to be atoms formed of a dollar sign followed by an integer.

Line 3 matches on a [[pid]] result. Why the nested array? Well, match/2 will return an array of results. That’s the first level array. Within that results array, each single result is an array of the values corresponding to the pattern variables we provided: we provided only a single pattern, so the (unique) result array contains only a single element. For completeness, let’s also point out that since we have at most one monitor per client process, we don’t expect to ever have more than 1 result.

For a little more context on ETS match results, allow me to provide another example: let’s say we were storing a list of cars and their license plates in a cars ETS table, using the following tuple convention: {license_plate, make, year, color}. If we wanted to see the year and make of red cars, we could call :ets.match(cars, {:"_", :"$1", :"$0", "red"}). Notice that since the pattern order indicates we want the year before the make, an example result would be ["2018", "Tesla"]. As you’ll have noticed, we disregard the “license_plate” field in our query by using the :"_" pattern, so that information is absent from our result.

Back to our actual code: if we have a matching record in our ETS table (line 3), we delete it from the table on line 4 since it’s no longer relevant (the process just crashed). Note that we match the delete result to true, so that we crash in case the returned result is different (although in this case it isn’t so useful as delete/2 will always return true). Finally, we handle our idle worker online 5 (we’ll implement handle_idle_worker/2 soon) and return the updated state.

If there’s no monitoring info in the database (lines 6 and 7), we don’t do anything (this could happen e.g. if the client checked in its worker process before crashing).

But let’s take a brief moment to talk about our lord and savior, the “let it crash” philosophy. It’s subtly present in how we handle the matches above: we expect to have exactly one match in the ETS table, or none at all. Therefore, that’s exactly what our case statements match for! In any other case, we clearly don’t know what’s going on, so we *want* to crash. In particular, on line 6 we match [] an explicit empty result, and not just _ which you might be tempted to have as a programmer with defensive reflexes.

Matching overly lax patterns like _ in this case is counter-productive in OTP land, as it means that our software is potentially running with a corrupted state (e.g. there are 2 entries in the ETS table) which could easily start corrupting other parts of our program as the incorrect information spreads. And once that has happened, it will be near impossible to cleanly recover from. Instead, it is much safer, simpler, and better, to nip the corruption in the bud: as soon as something unexpected happens, terminate and start fresh from a known-good state.

Let’s add the handle_idle_worker/2 function we so conveniently glossed over in the above code (lib/pool_toy/pool_man.ex):

def handle_info(msg, state) do
  # edited for brevity
end

defp handle_idle_worker(%State{workers: workers} = state, idle_worker) when is_pid(idle_worker) do
  %{state | workers: [idle_worker | workers]}
end

We still have to get around to why we introduced an ETS table in the first place: demonitoring client processes when they check their workers back in to the pool. First, let’s update the check in code (lib/pool_toy/pool_man.ex):

def handle_cast({:checkin, worker}, %State{monitors: monitors} = state) do
  case :ets.lookup(monitors, worker) do
    [{pid, ref}] ->
      Process.demonitor(ref)
      true = :ets.delete(monitors, pid)
      {:noreply, state |> handle_idle_worker(pid)}
    [] ->
      {:noreply, state}
  end
end

This time around, we conveniently have the worker pid provided, which is used as the key in our ETS table (being the first value in the tuples we store). We can therefore use lookup/2 to get all information related to the monitor. Notice that lookup/2 will return an array of tuples matching the key (i.e. we don’t have a nested array).

We’ve got a bunch of things going here, but nothing’s too complicated:

  • stop monitoring the client: the worker has been returned to the pool, so we no longer care to know if the client has crashed (line 4);
  • delete the monitor entry in the ETS table, since the monitor has been demonitored (line 5);
  • return the updated state, after handling the newly idle worker pid (line 6).

If there’s no entry in the ETS table for the given pid, we don’t do anything: the pid for alive workers would always be in the ETS table. If the pid that’s being checked in isn’t there, it means it’s not valid: it could belong to a dead worker, or any random process. In any case, we don’t want invalid pids to be added to the list of available workers in our state!

Our changes so far

Let’s take our code for a spin:

iex(1)> me = self()
#PID<0.140.0>
iex(2)> client = spawn(fn ->
...(2)> Process.send(me, {:worker, PoolToy.PoolMan.checkout()}, [])
...(2)> receive do
...(2)> :die -> :ok
...(2)> end
...(2)> end)
#PID<0.148.0>
iex(3)> worker = receive do
...(3)> {:worker, w} -> w
...(3)> end
#PID<0.137.0>
iex(4)> :observer.start()
:ok

This is the first part of the code that proved problematic in the last post. In the Observer, go to the “Table Viewer” tab. What can we see there? A “monitors” entry, which is the name of the ETS table we’re using. Double-click it to see its contents. As you can see, we’ve got an entry in there for our checked out worker. If take a look at the state for PoolMan in the “Applications” tab, you’ll see we’ve got only 2 remaining workers as expected. Let’s continue:

iex(5)> PoolToy.PoolMan.checkin(worker)
:ok

Look in the Observer again: the monitors table is now empty, and the PoolMan state has 3 workers available. Continuing:

iex(6)> worker = PoolToy.PoolMan.checkout()                                   
#PID<0.137.0>
iex(7)> Process.send(client, :die, []) :ok

And our application state is still correct: we’ve got a single entry in the monitors table, and 2 available workers. In other words, killing the client process didn’t affect anything because the worker it was using had been checked in before it died.

So we’re done with our single pool implementation, right? Nope. We’ve still go a small issue to iron out. Try poking around in Observer killing processes in various application states (you can kill a process in the “applications” tab by right-clicking on the process and selecting the last “kill process” option). Does the pool manager state remain correct in all cases? Can you identify the problem? Check your answer in the next post.


Would you like to see more Elixir content like this? Sign up to my mailing list so I can gauge how much interest there is in this type of content.

Posted in Elixir | Comments Off on PoolToy: A (toy) process pool manager in Elixir 1.6 (part 1.7)

PoolToy: A (toy) process pool manager in Elixir 1.6 (part 1.6)

Managing a single pool (continued)

(This post is part of a series on writing a process pool manager in Elixir.)

So we’ve got a working pool we can check worker processes in/out of (previous post). However, we’re not handling dying client processes, resulting in not properly freeing up worker processes when clients die.

Babysitting processes with monitors

The first thing we need to do is think about how client and worker processes should affect one another upon death:

  • if a client dies, we want to the pool manager to know so it can put the associated worker process back into the pool of available workers;
  • if a worker dies, we don’t want the client to be affected (in particular, we don’t want to kill the client).

This guides us to using a monitor for the pool manager to keep an eye on the client process’ health (as opposed to a link, which we’ll cover in part 1.8). Before returning an available worker to a client process, we will monitor the client: if it dies, we’ll return the assigned worker pid back into the pool.

Up until now, if a client requested a worker and we had one available, we handled the case like so:

def handle_call(:checkout, _from, %State{workers: [worker | rest]} = state) do
  {:reply, worker, %{state | workers: rest}}
end

But now that we’re concerning ourselves with the well-being of client processes, we need to monitor them. Monitoring a process is straightforward: Process.monitor(pid) this will create a unique reference and monitor the process: when the process dies, we’ll get a message containing the same unique reference (so we can easily determine which process died).

So let’s monitor client processes, and store the references in our state (lib/pool_toy/pool_man.ex):

def handle_call(:checkout, {from, _}, %State{workers: [worker | rest]} = state) do
  %State{monitors: monitors} = state
  ref = Process.monitor(from)
  monitors = monitors |> Map.put(ref, worker)
  {:reply, worker, %{state | workers: rest, monitors: monitors}}
end

Line 3 is where the magic happens: we create a new monitor to track the client process. You’ll note that we’re now actually interested in the mysterious “from” value provided as the second argument to handle_call/3 which the documentation tells us is

a 2-tuple containing the caller’s PID and a term that uniquely identifies the call

At this time we’re not interested in the unique identifier, only the caller’s pid: it’s the calling client process we want to monitor! Right, so we’re monitoring the client with the call on line 3, and we need to store this monitor somewhere in the state. To achieve that, we get the current collection of monitors from the state (line 2), add the new monitor to it on line 4, and finally return the updated state.

You may be wondering why I’m again matching on state values on line 2 when I already have some matching going on in the function head. For more on that, see here.

Naturally, in order to be able to keep track of our monitors in the state, we need to add a monitors attribute to it:

defmodule State do
  defstruct [:size, workers: [], monitors: %{}]
end

At this point, we’re monitoring the client, but we’re still doing absolutely nothing if the client dies… Now that the client is monitored, what happens to it when it dies? (Documentation)

Once the monitored process dies, a message is delivered to the monitoring process in the shape of:

{:DOWN, ref, :process, object, reason}

Ok, so we need to handle that incoming message (lib/pool_toy/pool_man.ex):

def handle_info({:DOWN, ref, :process, _, _}, %State{workers: workers, monitors: monitors} = state) do
  workers =
    case Map.get(monitors, ref) do
      nil -> workers
      worker -> [worker | workers]
    end

  monitors = Map.delete(monitors, ref)

  {:noreply, %{state | workers: workers, monitors: monitors}}
end

def handle_info(msg, state) do
  # edited for brevity
end

When we receive a message notifying us that a monitored process died (line 1), we look up the corresponding monitor reference (lines 3-6). If we find one, we return the associated worker pid to the pool. But hold on: if we always add a monitor when a worker gets checked out (and only remove it when it’s checked in again, although we haven’t implemented that part yet), how could we end up with a client crashing without an associated monitor reference?

Because processes can crash anytime, including when it’s inconvenient ;). Consider the following:

  1. Client checks out worker, and the pool manager starts monitoring the client;
  2. Client checks in worker, the pool manager stops monitoring the client, and then worker is returned to the pool;
  3. Client crashes.

Wait a minute. This looks fine! On step 2 we stop monitoring the client, so when it crashes in step 3, we shouldn’t get a message about the crash, right?

Except we’re dealing with a gen server, where all communication happens via messages and isn’t instantaneous: messages pile up in the server mailbox, and get processed in order. Here’s what could be happening in reality, as opposed to the “mental shortcut” we have above:

  1. Client requests a worker;
  2. A checkout message is sent to the server;
  3. The server processes the checkout request: it starts monitoring the client, and returns a worker pid to the client;
  4. Client wants to check a process back in;
  5. A checkin message is sent to the server (with the worker pid);
  6. The server processes the checkin request: it stops monitoring the client, and returns the worker to the pool;
  7. Client crashes;
  8. A :DOWN message is sent to the server.

Hmm. Still don’t see a problem here… Ah yes: in the real world, several client processes will be interacting with the server. Therefore, the server’s mailbox will contain a bunch of messages from other clients that it will have to process in order. Let’s try this again:

  1. Client requests a worker;
  2. A checkout message is sent to the server;
  3. The server processes the checkout request: it starts monitoring the client, and returns a worker pid to the client;
  4. Client wants to check a process back in;
  5. A checkin message is sent to the server (with the worker pid);
  6. Server is busy processing messages from other clients, doesn’t get around to processing the checkin message for the time being;
  7. Client crashes;
  8. A :DOWN message is sent to the server;
  9. The server has worked through the backlog of messages in its mailbox and finally processes the checkin request: it stops monitoring the client, and returns the worker to the pool;
  10. The server processes more messages in the mailbox (if they came before the :DOWN message);
  11. The server processes the :DOWN message: at this point, there is no active monitor on the (dead) client process, because it was removed when the server processed the checkin message.

It’s important to remember that when a process crashes, any messages it sent out will stay around: they don’t get magically deleted from the mailboxes in other processes. In other words, any time you’re processing a message it could be from a process that is no longer alive by the time you read it.

Our changes so far

Ok, let’s check whether our code now solves our immediate problem:

iex(1)> client = spawn(fn ->
...(1)>   PoolToy.PoolMan.checkout()
...(1)>   receive do
...(1)>     :die -> :ok
...(1)>   end
...(1)> end)
#PID<0.126.0>
iex(2)> Process.alive? client
true
iex(3)> :observer.start()
:ok

We’re going to use a fancier spawned process this time around: instead of dying immediately, it’s going to wait for a :die message before terminating.

Navigate to the “State” tab for the PoolMan process, and you’ll see that we’ve got a monitor reference in the state, and only 2 workers are available. Let’s kill the client and see what happens:

iex(4)> Process.send(client, :die, [])
:ok

We’ve once again got 3 available workers, and no monitor references. Hooray! (Note that you’ll need to close the window for the PoolMan process and reopen it to see the changes: it doesn’t refresh itself.)

There’s still a problem, though. See if you can tell what it is in the Observer (don’t read further than this code sample before thinking about the problem and exploring it for yourself). Kill your previous IEx session if you still have it open, and start a new one:

iex(1)> me = self()
#PID<0.119.0>
iex(2)> client = spawn(fn ->
...(2)>   Process.send(me, {:worker, PoolToy.PoolMan.checkout()}, [])
...(2)>   receive do
...(2)>     :die -> :ok
...(2)>   end
...(2)> end)
#PID<0.127.0>
iex(3)> worker = receive do
...(3)>   {:worker, w} -> w
...(3)> end
#PID<0.116.0>
iex(4)> PoolToy.PoolMan.checkin(worker)
:ok
iex(5)> worker = PoolToy.PoolMan.checkout()
#PID<0.116.0>
iex(6)> :observer.start()
:ok
iex(7)> Process.send(client, :die, [])
:ok

What happens here is that we spawn a process that checks a worker out, and sends it to the IEx process (expression 2). We then retrieve the worker pid (expr 3), and check the worker back in (expr 4). Then we check a worker out again (expr 5), which is the same worker as we had earlier and checked back in. In expression 7, we kill the process we had originally spawned.

Although this spawned process no longer has a worker process (it was returned to the pool in expression 4), killing the spawned client process results in returning the worker from expression 5 back to the pool of available workers, even though we weren’t done with it (since we hadn’t checked it back in yet). Another related issue is that at expression 6 within the Observer you can see that the pool manager’s state has 2 different monitor references active for the same pid. Oops!

The fix, of course, is to stop monitoring client processes when the worker process they borrowed is checked back in to the pool. We’ll do that in the next post.


Would you like to see more Elixir content like this? Sign up to my mailing list so I can gauge how much interest there is in this type of content.

Posted in Elixir | Comments Off on PoolToy: A (toy) process pool manager in Elixir 1.6 (part 1.6)

PoolToy: A (toy) process pool manager in Elixir 1.6 (part 1.5)

Managing a single pool (continued)

(This post is part of a series on writing a process pool manager in Elixir.)

After last post, we’ve got a pretty fancy-looking pool:

Unfortunately, we don’t have any sort of mechanism to make use of the worker processes. We need to let client processes check out a worker, do some work with it, and check the worker back in once the client is done with it.

Implementing worker checkin and checkout

In lib/pool_toy/pool_man.ex:

def checkout() do
  GenServer.call(@name, :checkout)
end

def checkin(worker) do
  GenServer.cast(@name, {:checkin, worker})
end

def init(size) do
  # truncated for brevity
end

def handle_call(:checkout, _from, %State{workers: []} = state) do
  {:reply, :full, state}
end

def handle_call(:checkout, _from, %State{workers: [worker | rest]} = state) do
  {:reply, worker, %{state | workers: rest}}
end

def handle_cast({:checkin, worker}, %State{workers: workers} = state) do
  {:noreply, %{state | workers: [worker | workers]}}
end

Pretty easy, right? Add checkout/0 and checkin/1 to the API (lines 1-7), and implement the matching server-side functions (lines13-23). When a client wants to check out a worker, we reply :full if none are available (line 14), and otherwise we provide the pid of the first available worker (line 18). When checking in a worker, we simply add that pid to the list of available worker pids (line 22). Note that since all workers are equal, there is no need to differentiate them and we can therefore always take/return pids from the head of the workers list (i.e. workers at the head of the list will be used more often, but we don’t care in this case).

But wait, how come this time around we didn’t implement catch-all clauses for handle_call/3 and handle_cast/2? After all, we had to add one for handle_info/2 back in part 1.4, why would these be different? Here’s what the getting started guide has to say about it:

Since any message, including the ones sent via send/2, go to handle_info/2, there is a chance unexpected messages will arrive to the server. Therefore, if we don’t define the catch-all clause, those messages could cause our [pool manager] to crash, because no clause would match. We don’t need to worry about such cases for handle_call/3 and handle_cast/2though. Calls and casts are only done via the GenServer API, so an unknown message is quite likely a developer mistake.

In other words, this is the “let it crash” philosophy in action: we shouldn’t receive any unexpected messages in calls or casts. But if we do, it means something went wrong and we should simply crash.

Our changes so far

We can now fire up IEx with iex -S mix and try out our worker pool:

iex(1)> w1 = PoolToy.PoolMan.checkout()
#PID<0.116.0>
iex(2)> w2 = PoolToy.PoolMan.checkout()
#PID<0.117.0>
iex(3)> w3 = PoolToy.PoolMan.checkout()
#PID<0.118.0>
iex(4)> w4 = PoolToy.PoolMan.checkout()
:full
iex(5)> PoolToy.PoolMan.checkin(w1)
:ok
iex(6)> w4 = PoolToy.PoolMan.checkout()
#PID<0.116.0>
iex(7)> Doubler.compute(w4, 21)
Doubling 21
42

Great success! We were able to check out workers until the pool told us it was :full, but then as soon as we checked in a worker (and the pool was therefore no longer full), we were once again able to check out a worker.

So we’re done with the basic pool implementation, right? No. When working in the OTP world, we need to constantly think about how processes failing will impact our software, and how it should react.

In this case for example, what happens if a client checks out a worker, but dies before it can check the worker back in? We’ve got a worker that isn’t doing any work (because the client died), but still isn’t available for other clients to check out because it was never returned to the pool.

But don’t take my word for it, let’s verify the problem in IEx (after either starting a new session, or checking all the above workers back into the pool):

iex(1)> client = spawn(fn -> PoolToy.PoolMan.checkout() end)
#PID<0.121.0>
iex(2)> Process.alive? client
false
iex(3)> :observer.start()

Within the Observer, if you go to the applications tab (selecting pool_toy on the left if it isn’t already displayed) and double-click PoolMan, then navigate to the “state” tab in the newly opened window, you can see that only 2 workers are available. In other words, even though the worker we checked out is no longer is use by the client (since that process died), the worker process will never be assigned any more work because it was never checked back into the pool.

How can we handle this problem? How about babysitting? Coming right up next!


Would you like to see more Elixir content like this? Sign up to my mailing list so I can gauge how much interest there is in this type of content.

Posted in Elixir | Comments Off on PoolToy: A (toy) process pool manager in Elixir 1.6 (part 1.5)

PoolToy: A (toy) process pool manager in Elixir 1.6 (part 1.4)

Managing a single pool (continued)

(This post is part of a series on writing a process pool manager in Elixir.)

After the last part, we’ve made some headway but still need some workers:

More useful state

Let’s start by enhancing the state in the pool manager to be a struct: we’re going to have to remember a bunch of things to manage the pool, so a struct will be handy. In lib/pool_toy/pool_man.ex:

defmodule PoolToy.PoolMan do
  use GenServer

  defmodule State do
    defstruct [:size]
  end

  @name __MODULE__

  def start_link(size) when is_integer(size) and size > 0 do
    # edited for brevity
  end

  def init(size) do
    {:ok, %State{size: size}}
  end
end

Starting workers

Our pool manager must now get some worker processes started. Let’s begin with attempting to start a single worker dynamically, from within init/1. Look at the docs for DynamicSupervisor and see if you can figure out how to start a child.

Here we go (still in lib/pool_toy/pool_man.ex):

def init(size) do
  DynamicSupervisor.start_child(PoolToy.WorkerSup, Doubler)
  {:ok, %State{size: size}}
end

start_child/2 requires the dynamic supervisor location and a child spec. Since we’ve named our worker supervisor, we reuse the name to locate it here (but giving its pid as the argument would have worked also). Child specs can be provided the same way as for a normal supervisor: complete child spec map, tuple with arguments, or just a module name (if we don’t need any start arguments). We’ve gone with the latter.

Let’s try it out in IEx. When running iex -S mix we get:

Erlang/OTP 20 [erts-9.3]  [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:10] [hipe] [kernel-poll:false]

Compiling 6 files (.ex)
Generated pool_toy app

08:52:07.275 [info]  Application pool_toy exited: PoolToy.Application.start(:normal, []) returned an error: shutdown: failed to start child: PoolToy.PoolSup
    ** (EXIT) shutdown: failed to start child: PoolToy.PoolMan
        ** (EXIT) exited in: GenServer.call(PoolToy.WorkerSup, {:start_child, {{Doubler, :start_link, [[]]}, :permanent, 5000, :worker, [Doubler]}}, :infinity)
            ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
** (Mix) Could not start application pool_toy: PoolToy.Application.start(:normal, []) returned an error: shutdown: failed to start child: PoolToy.PoolSup
    ** (EXIT) shutdown: failed to start child: PoolToy.PoolMan
        ** (EXIT) exited in: GenServer.call(PoolToy.WorkerSup, {:start_child, {{Doubler, :start_link, [[]]}, :permanent, 5000, :worker, [Doubler]}}, :infinity)
            ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started

Oops. What’s going on? Well, let’s read through the error messages:

  1. The pool toy application couldn’t start, because:
  2. The pool supervisor couldn’t start, because:
  3. The pool manager couldn’t start, because:
  4. It tries calling a GenServer process called PoolToy.WorkerSup, which fails because:
  5. the process is not alive or there’s no process currently associated with the given name, possibly because its application isn’t started.

Wait a minute: the worker supervisor should be up and running with that name, since it was started by the pool supervisor! Let’s take a look at that code and see how the pool supervisor goes about its job of starting its children (lib/pool_toy/pool_sup.ex):

def init(args) do
    pool_size = args |> Keyword.fetch!(:size)

    children = [
      {PoolToy.PoolMan, pool_size},
      PoolToy.WorkerSup
    ]

    Supervisor.init(children, strategy: :one_for_all)
  end

Right, so on line 9 the pool supervisor start its children. Which children? The ones defined on lines 4-7. And how are they started? The docs say

When the supervisor starts, it traverses all child specifications and then starts each child in the order they are defined.

So in our code, we start the pool manager and then move on to the worker supervisor. But since the pool manager expects the worker supervisor to already be up and running when the pool manager initializes itself (since it calls the worker sup), our code blows up.

Put another way, our software design has taken the position that the worker supervisor will ALWAYS be available if the pool manager is running. However, in our current implementation we’ve failed this guarantee quite spectacularly. (Take a look at what Fred Hébert has to say about supervisor start up and guarantees.)

The fix is easy enough: tell the pool supervisor to start the worker supervisor before the manager (lib/pool_toy/pool_sup.ex).

children = [
  PoolToy.WorkerSup,
  {PoolToy.PoolMan, pool_size}
]

Try running PoolToy in IEx again, and not only does it work, you can actually see a baby worker in the Observer! Huzzah!

As a quick side note, the current example is quite simple and straightforward but initialization order needs to be properly thought through in your OTP projects. Once again, take a look at Fred’s excellent writing on the subject.

With our first worker process created, let’s finish the job and actually start enough workers to fill the requested pool size. Let’s get started with the naive approach (lib/pool_toy/pool_man.ex):

defmodule State do
  defstruct [:size, workers: []]
end

def init(size) do
  start_worker = fn _ ->
    {:ok, pid} = DynamicSupervisor.start_child(PoolToy.WorkerSup, Doubler)
    pid
  end

  workers = 1..size |> Enum.map(start_worker)

  {:ok, %State{size: size, workers: workers}}
end

Pretty straightforward, right? Since we’re going to need to keep track of the worker pids (to know which ones are available), we add a :workers attribute (initialized to an empty list) to our state on line 2. On lines 6-9 we define a helper function that starts a worker and returns its pid. You’ll note that with the pattern match on line 7 we expect the worker creation to always succeed: if anything but an “ok tuple” is returned, we want to crash. Finally, on line 11 we start the requested number of workers, and add them to the state on line 13.

If you take this for a run in IEx and peek at the state for the PoolMan process, you’ll see we indeed have pids in the :workers state property, and that they match the ones visible in the application tab. Great!

Our changes so far

Deferring initialization work

Actually, it’s not so great: GenServer initialization is synchronous, yet we’re doing a bunch of work in there. This could tie up the client process and contribute to making our project unresponsive: the init/1 callback should always return as soon as possible.

So what are we to do, given we need to start the worker processes when the pool manager is starting up? Well, we have the server process send a message to itself to start the workers. That way, init/1 will return right away, and will then process the next message in its mailbox which should be the one telling it to start the worker processes. Here we go (lib/pool_toy/pool_man.ex):

def init(size) do
  send(self(), :start_workers)
  {:ok, %State{size: size}}
end

def handle_info(:start_workers, %State{size: size} = state) do
  start_worker = fn _ ->
    {:ok, pid} = DynamicSupervisor.start_child(PoolToy.WorkerSup, Doubler)
    pid
  end

  workers = 1..size |> Enum.map(start_worker)

  {:noreply, %{state | workers: workers}}
end

def handle_info(msg, state) do
  IO.puts("Received unexpected message: #{inspect(msg)}")
  {:noreply, state}
end

Take a good long look at this code, and try to digest it. Think about why each line/function is there.

First things first: in init/1, we have the process send a message to itself. Remember that init/1 is executed within the server process of a gen server, so send(self(), ...) will send the given message to the server process’ inbox.

Although we’re sending the message immediately, it’s important that nothing (e.g. starting new workers) will actually get done: the only thing happening is a message being sent. After sending the message, init/1 is free to do whatever it wants, before returning {:ok, state}. Only after init/1 has returned will the server process look in its mailbox and see the :start_workers message we sent (assuming no other message came in before).

Side note: if there’s a possibility in your project that a message could be sent to the gen server before it is fully initialized, you can have it respond with e.g. {:error, :not_ready} (in handle_call, etc.) when the state indicates that prerequisites are missing. It is then up to the client to handle this case when it presents itself.

Ok, onwards: we’ve sent a message to the server process so we need code to handle it. If a message is simply sent to a gen server process (as opposed to using something like GenServer.call/3), it will be treated as an “info” message. So on lines 6-15 we handle that info message, start the worker processes, and store their pids in the server state.

But why did we define another handle_info function on lines 17-20?

A short primer on process mailboxes

Each process in the BEAM has its mailbox where it receives messages sent to it, and from which it can read using receive/1. But as you may recall, receive/1 will pattern match on the message, and process the first message it finds that matches a pattern. What about the messages that don’t match any pattern? They stay in the mailbox.

Every time receive/1 is invoked, the mailbox is consulted, starting with the oldest message and moving chronologically. So if messages don’t match clauses in receive/1 they will just linger in the mailbox and be checked again and again for matching clauses. This is bad: not only does it slow your process down (it’s spending a lot of time needlessly checking messages that will never match), but you also run the risk of running out of memory for the process’ mailbox as the list of unprocessed messages grows until finally the process is killed (and all unread messages are lost).

To prevent this undesirable state of affairs, it is a good practice to have a “catch all” clause in handle_info/2 that will log the fact that an unprocessed message was received while also removing it from the process’ mailbox. In other words, it’s a good practice to have a catch all version of handle_info/2 to prevent unprocessed messages from piling up in the mailbox.

When we use GenServer, default catch all implementations of handle_info/2 (as well as handle_call/3 and handle_cast/2, which we’ll meet later) are created for us and added to the module. But as soon as we define a function with the same name (e.g. we write a handle_info/2 function as above) it will override the one created by the use statement. So when we write a handle_info/2 message-handling function in a gen server, we need to follow through and write a catch all version also.

Since these unexpected messages shouldn’t have been sent to this process, in production code we would most likely log them as errors using the Logger instead of just printing a message.

Improving the internal API

Right now we’re starting worker like so (lib/pool_toy/pool_man.ex):

{:ok, pid} = DynamicSupervisor.start_child(PoolToy.WorkerSup, Doubler)

This isn’t great, because it means that our pool manager needs relatively intimate knowledge about the internals of the worker supervisor (namely, that it is a dynamic supervisor, and not a plain vanilla one). It also means that if we change how children get started (e.g. by setting a default config), we’d have to make that change everywhere that requests new workers to be started.

Let’s move the knowledge of how to start workers where it belongs: within the WorkerSup module (lib/pool_toy/worker_sup.ex):

defmodule PoolToy.WorkerSup do
  use DynamicSupervisor

  @name __MODULE__

  def start_link(args) do
    DynamicSupervisor.start_link(__MODULE__, args, name: @name)
  end

  defdelegate start_worker(sup, spec), to: DynamicSupervisor, as: :start_child

  def init(_arg) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end
end

I’ve reproduced the whole module here, but the only change is on line 10: we define a start_worker/2 function, which in effect proxies the call to DynamicSupervisor.start_child/2. If you’re not familiar with the defdelegate macro, you can learn more about it here, but in essence the line we’ve added is equivalent to:

def start_worker(sup, spec) do
  DynamicSupervisor.start_child(sup, spec)
end

With this addition in place, we can update our pool manager code to make use of it (lib/pool_toy/pool_man.ex):

def handle_info(:start_workers, %State{size: size} = state) do
  workers =
    for _ <- 1..size do
      {:ok, pid} = PoolToy.WorkerSup.start_worker(PoolToy.WorkerSup, Doubler)
      pid
    end

  {:noreply, %{state | workers: workers}}
end

Our changes so far

At this stage in our journey, we’ve got a pool with worker processes. However, right now clients can’t check out worker processes to do work with them. Let’s get to that next!


Would you like to see more Elixir content like this? Sign up to my mailing list so I can gauge how much interest there is in this type of content.

Posted in Elixir | Comments Off on PoolToy: A (toy) process pool manager in Elixir 1.6 (part 1.4)