PoolToy: A (toy) process pool manager in Elixir 1.6 (part 1.6)

Managing a single pool (continued)

(This post is part of a series on writing a process pool manager in Elixir.)

So we’ve got a working pool we can check worker processes in/out of (previous post). However, we’re not handling dying client processes, resulting in not properly freeing up worker processes when clients die.

Babysitting processes with monitors

The first thing we need to do is think about how client and worker processes should affect one another upon death:

  • if a client dies, we want to the pool manager to know so it can put the associated worker process back into the pool of available workers;
  • if a worker dies, we don’t want the client to be affected (in particular, we don’t want to kill the client).

This guides us to using a monitor for the pool manager to keep an eye on the client process’ health (as opposed to a link, which we’ll cover in part 1.8). Before returning an available worker to a client process, we will monitor the client: if it dies, we’ll return the assigned worker pid back into the pool.

Up until now, if a client requested a worker and we had one available, we handled the case like so:

def handle_call(:checkout, _from, %State{workers: [worker | rest]} = state) do
  {:reply, worker, %{state | workers: rest}}

But now that we’re concerning ourselves with the well-being of client processes, we need to monitor them. Monitoring a process is straightforward: Process.monitor(pid) this will create a unique reference and monitor the process: when the process dies, we’ll get a message containing the same unique reference (so we can easily determine which process died).

So let’s monitor client processes, and store the references in our state (lib/pool_toy/pool_man.ex):

def handle_call(:checkout, {from, _}, %State{workers: [worker | rest]} = state) do
  %State{monitors: monitors} = state
  ref = Process.monitor(from)
  monitors = monitors |> Map.put(ref, worker)
  {:reply, worker, %{state | workers: rest, monitors: monitors}}

Line 3 is where the magic happens: we create a new monitor to track the client process. You’ll note that we’re now actually interested in the mysterious “from” value provided as the second argument to handle_call/3 which the documentation tells us is

a 2-tuple containing the caller’s PID and a term that uniquely identifies the call

At this time we’re not interested in the unique identifier, only the caller’s pid: it’s the calling client process we want to monitor! Right, so we’re monitoring the client with the call on line 3, and we need to store this monitor somewhere in the state. To achieve that, we get the current collection of monitors from the state (line 2), add the new monitor to it on line 4, and finally return the updated state.

You may be wondering why I’m again matching on state values on line 2 when I already have some matching going on in the function head. For more on that, see here.

Naturally, in order to be able to keep track of our monitors in the state, we need to add a monitors attribute to it:

defmodule State do
  defstruct [:size, workers: [], monitors: %{}]

At this point, we’re monitoring the client, but we’re still doing absolutely nothing if the client dies… Now that the client is monitored, what happens to it when it dies? (Documentation)

Once the monitored process dies, a message is delivered to the monitoring process in the shape of:

{:DOWN, ref, :process, object, reason}

Ok, so we need to handle that incoming message (lib/pool_toy/pool_man.ex):

def handle_info({:DOWN, ref, :process, _, _}, %State{workers: workers, monitors: monitors} = state) do
  workers =
    case Map.get(monitors, ref) do
      nil -> workers
      worker -> [worker | workers]

  monitors = Map.delete(monitors, ref)

  {:noreply, %{state | workers: workers, monitors: monitors}}

def handle_info(msg, state) do
  # edited for brevity

When we receive a message notifying us that a monitored process died (line 1), we look up the corresponding monitor reference (lines 3-6). If we find one, we return the associated worker pid to the pool. But hold on: if we always add a monitor when a worker gets checked out (and only remove it when it’s checked in again, although we haven’t implemented that part yet), how could we end up with a client crashing without an associated monitor reference?

Because processes can crash anytime, including when it’s inconvenient ;). Consider the following:

  1. Client checks out worker, and the pool manager starts monitoring the client;
  2. Client checks in worker, the pool manager stops monitoring the client, and then worker is returned to the pool;
  3. Client crashes.

Wait a minute. This looks fine! On step 2 we stop monitoring the client, so when it crashes in step 3, we shouldn’t get a message about the crash, right?

Except we’re dealing with a gen server, where all communication happens via messages and isn’t instantaneous: messages pile up in the server mailbox, and get processed in order. Here’s what could be happening in reality, as opposed to the “mental shortcut” we have above:

  1. Client requests a worker;
  2. A checkout message is sent to the server;
  3. The server processes the checkout request: it starts monitoring the client, and returns a worker pid to the client;
  4. Client wants to check a process back in;
  5. A checkin message is sent to the server (with the worker pid);
  6. The server processes the checkin request: it stops monitoring the client, and returns the worker to the pool;
  7. Client crashes;
  8. A :DOWN message is sent to the server.

Hmm. Still don’t see a problem here… Ah yes: in the real world, several client processes will be interacting with the server. Therefore, the server’s mailbox will contain a bunch of messages from other clients that it will have to process in order. Let’s try this again:

  1. Client requests a worker;
  2. A checkout message is sent to the server;
  3. The server processes the checkout request: it starts monitoring the client, and returns a worker pid to the client;
  4. Client wants to check a process back in;
  5. A checkin message is sent to the server (with the worker pid);
  6. Server is busy processing messages from other clients, doesn’t get around to processing the checkin message for the time being;
  7. Client crashes;
  8. A :DOWN message is sent to the server;
  9. The server has worked through the backlog of messages in its mailbox and finally processes the checkin request: it stops monitoring the client, and returns the worker to the pool;
  10. The server processes more messages in the mailbox (if they came before the :DOWN message);
  11. The server processes the :DOWN message: at this point, there is no active monitor on the (dead) client process, because it was removed when the server processed the checkin message.

It’s important to remember that when a process crashes, any messages it sent out will stay around: they don’t get magically deleted from the mailboxes in other processes. In other words, any time you’re processing a message it could be from a process that is no longer alive by the time you read it.

Our changes so far

Ok, let’s check whether our code now solves our immediate problem:

iex(1)> client = spawn(fn ->
...(1)>   PoolToy.PoolMan.checkout()
...(1)>   receive do
...(1)>     :die -> :ok
...(1)>   end
...(1)> end)
iex(2)> Process.alive? client
iex(3)> :observer.start()

We’re going to use a fancier spawned process this time around: instead of dying immediately, it’s going to wait for a :die message before terminating.

Navigate to the “State” tab for the PoolMan process, and you’ll see that we’ve got a monitor reference in the state, and only 2 workers are available. Let’s kill the client and see what happens:

iex(4)> Process.send(client, :die, [])

We’ve once again got 3 available workers, and no monitor references. Hooray! (Note that you’ll need to close the window for the PoolMan process and reopen it to see the changes: it doesn’t refresh itself.)

There’s still a problem, though. See if you can tell what it is in the Observer (don’t read further than this code sample before thinking about the problem and exploring it for yourself). Kill your previous IEx session if you still have it open, and start a new one:

iex(1)> me = self()
iex(2)> client = spawn(fn ->
...(2)>   Process.send(me, {:worker, PoolToy.PoolMan.checkout()}, [])
...(2)>   receive do
...(2)>     :die -> :ok
...(2)>   end
...(2)> end)
iex(3)> worker = receive do
...(3)>   {:worker, w} -> w
...(3)> end
iex(4)> PoolToy.PoolMan.checkin(worker)
iex(5)> worker = PoolToy.PoolMan.checkout()
iex(6)> :observer.start()
iex(7)> Process.send(client, :die, [])

What happens here is that we spawn a process that checks a worker out, and sends it to the IEx process (expression 2). We then retrieve the worker pid (expr 3), and check the worker back in (expr 4). Then we check a worker out again (expr 5), which is the same worker as we had earlier and checked back in. In expression 7, we kill the process we had originally spawned.

Although this spawned process no longer has a worker process (it was returned to the pool in expression 4), killing the spawned client process results in returning the worker from expression 5 back to the pool of available workers, even though we weren’t done with it (since we hadn’t checked it back in yet). Another related issue is that at expression 6 within the Observer you can see that the pool manager’s state has 2 different monitor references active for the same pid. Oops!

The fix, of course, is to stop monitoring client processes when the worker process they borrowed is checked back in to the pool. We’ll do that in the next post.

Would you like to see more Elixir content like this? Sign up to my mailing list so I can gauge how much interest there is in this type of content.

This entry was posted in Elixir. Bookmark the permalink.