PoolToy: A (toy) process pool manager in Elixir 1.6 (part 1.7)

Managing a single pool (continued)

(This post is part of a series on writing a process pool manager in Elixir.)

We’ve now realized (in last post) that storing the references to the client monitors in a map was a bit naive: although it works fine to locate the relevant information when a client crashes, it’s not so great to handle the “worker check in” case (where we have to locate the monitor reference by worker pid instead).

You know what’s good for finding data with varying conditions? Databases. Erlang does provide a full-blown database (called Mnesia) which would actually be counter-productive for our use case: since we’ll be storing monitors, if our entire application crashes we want the monitor information to be wiped also (since it will no longer be relevant). Since Mnesia persists data to disk, we’d have to erase the obsolete monitor data ourselves in case of crash.

Instead, we can use an in-memory “database” also provided by Erlang, called ETS (for Erlang Term Storage).

Erlang Term Storage basics

First, here’s a brief description of what an ETS table is:

Data is organized as a set of dynamic tables, which can store tuples. Each table is created by a process. When the process terminates, the table is automatically destroyed. Every table has access rights set at creation.

The first element in the stored tuple will be that tuple’s key (default behavior, can be modified). Retrieving data via this key is very efficient, whereas finding data by matching the rest of the tuple requires the entire table to be searched which can be slow if the table is very large.

Now that introductions have been made, let’s get our hands dirty. The first thing we’ll need is to create a new ETS table instance when our pool manager process initializes itself. Of course, we’ll also need to change the default value of the :monitors attribute in our state, since we won’t be using maps anymore (lib/pool_toy/pool_man.ex):

defmodule State do
  defstruct [:size, :monitors, workers: []]
end

# edited for brevity

def init(size) do
  send(self(), :start_workers)
  monitors = :ets.new(:monitors, [:protected, :named_table])
  {:ok, %State{size: size, monitors: monitors}}
end

(Your keen eyes will probably have noticed that we switched the positions of the :monitors and :workers attributes in the struct on line 2. This is because values that are initialized must come last.)

The magic mainly happens on line 9, where we use new/2 to create the ETS table we’ll be using to track client monitors. The first argument is the (unique) name we give to the table (which must be an atom), followed by the options. Here, the options we give are for our convenience when inspecting the application state with Observer: they are not strictly necessary in our case, just nice to have. :protected means that only the owner process can write to the table, but other processes can read from it: this means that later on we’ll be able to look at the table contents from within Observer. :protected is a default setting, and was only included here for explicitness.

:named_table works much like named processes: if the option is present, the table will be registered under the given name (first argument mentioned above, which in this case is :monitors). This will enable us to work more conveniently with the table from IEx if we wanted to as we can use the table name instead of the reference that was returned upon table creation. Having a named table also means that it will be displayed under that name in the Observer, which will be convenient in itself.

Finally, note that when using the :named_table option, the return value of new/2 will simply be the table name, instead of the table reference (the reference can be obtained from the name with whereis/1). As we’re using a named table in our application, we technically wouldn’t need to store the value returned by new/2. However, since we’re using a named table for convenience instead of design imperatives, our code will consistently use the value returned by new/2 to interact with the ETS table. This way, our code will work even if/when the table is no longer named (feel free to try that later!).

With our monitors table available for use, let’s update the check out code (lib/pool_toy/pool_man.ex):

def handle_call(:checkout, {from, _}, %State{workers: [worker | rest]} = state) do
  %State{monitors: monitors} = state
  monitor(monitors, {worker, from})
  {:reply, worker, %{state | workers: rest}}
end

# edited for brevity

def handle_info(msg, state) do
  # edited for brevity
end

defp monitor(monitors, {worker, client}) do
  ref = Process.monitor(client)
  :ets.insert(monitors, {worker, ref})
  ref
end

Easy, peasy: we just insert the {worker, ref} into our ETS table.

We also have to update the code handling dying client processes. So what happens when a monitored process dies (docs)?

Once the monitored process dies, a message is delivered to the monitoring process in the shape of:

{:DOWN, ref, :process, object, reason}

We’ll get a special message with the reference to the monitor, an object value (which will be the client’s pid in our case), and the reason the client exited (which we’re not interested in, because it makes no difference to us: regardless of why the client exited, we need to check its worker back in).

So let’s handle this new message in our code (lib/pool_toy/pool_man.ex):

def handle_info({:DOWN, ref, :process, _, _}, %State{workers: workers, monitors: monitors} = state) do
  case :ets.match(monitors, {:"$0", ref}) do
    [[pid]] ->
      true = :ets.delete(monitors, pid)
      {:noreply, state |> handle_idle_worker(pid)}
    [] ->
      {:noreply, state}
  end
end

So what does match/2 on line 2 do? It looks through the table, trying to find matches to the provided pattern. What’s a pattern?

A pattern is a term that can contain:

  • Bound parts (Erlang terms)
  • ‘_’ that matches any Erlang term
  • Pattern variables ‘$N’, where N=0,1,…

So in the case above, line 2 says: in the monitors table, return the array of results that have ref as the second element in the tuple, and put the first element in the tuple as the first element in the results array. Note that :"$0" is shorthand for “turn the "$0" string into an atom”: Erlang/ETS expects pattern variables to be atoms formed of a dollar sign followed by an integer.

Line 3 matches on a [[pid]] result. Why the nested array? Well, match/2 will return an array of results. That’s the first level array. Within that results array, each single result is an array of the values corresponding to the pattern variables we provided: we provided only a single pattern, so the (unique) result array contains only a single element. For completeness, let’s also point out that since we have at most one monitor per client process, we don’t expect to ever have more than 1 result.

For a little more context on ETS match results, allow me to provide another example: let’s say we were storing a list of cars and their license plates in a cars ETS table, using the following tuple convention: {license_plate, make, year, color}. If we wanted to see the year and make of red cars, we could call :ets.match(cars, {:"_", :"$1", :"$0", "red"}). Notice that since the pattern order indicates we want the year before the make, an example result would be ["2018", "Tesla"]. As you’ll have noticed, we disregard the “license_plate” field in our query by using the :"_" pattern, so that information is absent from our result.

Back to our actual code: if we have a matching record in our ETS table (line 3), we delete it from the table on line 4 since it’s no longer relevant (the process just crashed). Note that we match the delete result to true, so that we crash in case the returned result is different (although in this case it isn’t so useful as delete/2 will always return true). Finally, we handle our idle worker online 5 (we’ll implement handle_idle_worker/2 soon) and return the updated state.

If there’s no monitoring info in the database (lines 6 and 7), we don’t do anything (this could happen e.g. if the client checked in its worker process before crashing).

But let’s take a brief moment to talk about our lord and savior, the “let it crash” philosophy. It’s subtly present in how we handle the matches above: we expect to have exactly one match in the ETS table, or none at all. Therefore, that’s exactly what our case statements match for! In any other case, we clearly don’t know what’s going on, so we *want* to crash. In particular, on line 6 we match [] an explicit empty result, and not just _ which you might be tempted to have as a programmer with defensive reflexes.

Matching overly lax patterns like _ in this case is counter-productive in OTP land, as it means that our software is potentially running with a corrupted state (e.g. there are 2 entries in the ETS table) which could easily start corrupting other parts of our program as the incorrect information spreads. And once that has happened, it will be near impossible to cleanly recover from. Instead, it is much safer, simpler, and better, to nip the corruption in the bud: as soon as something unexpected happens, terminate and start fresh from a known-good state.

Let’s add the handle_idle_worker/2 function we so conveniently glossed over in the above code (lib/pool_toy/pool_man.ex):

def handle_info(msg, state) do
  # edited for brevity
end

defp handle_idle_worker(%State{workers: workers} = state, idle_worker) when is_pid(idle_worker) do
  %{state | workers: [idle_worker | workers]}
end

We still have to get around to why we introduced an ETS table in the first place: demonitoring client processes when they check their workers back in to the pool. First, let’s update the check in code (lib/pool_toy/pool_man.ex):

def handle_cast({:checkin, worker}, %State{monitors: monitors} = state) do
  case :ets.lookup(monitors, worker) do
    [{pid, ref}] ->
      Process.demonitor(ref)
      true = :ets.delete(monitors, pid)
      {:noreply, state |> handle_idle_worker(pid)}
    [] ->
      {:noreply, state}
  end
end

This time around, we conveniently have the worker pid provided, which is used as the key in our ETS table (being the first value in the tuples we store). We can therefore use lookup/2 to get all information related to the monitor. Notice that lookup/2 will return an array of tuples matching the key (i.e. we don’t have a nested array).

We’ve got a bunch of things going here, but nothing’s too complicated:

  • stop monitoring the client: the worker has been returned to the pool, so we no longer care to know if the client has crashed (line 4);
  • delete the monitor entry in the ETS table, since the monitor has been demonitored (line 5);
  • return the updated state, after handling the newly idle worker pid (line 6).

If there’s no entry in the ETS table for the given pid, we don’t do anything: the pid for alive workers would always be in the ETS table. If the pid that’s being checked in isn’t there, it means it’s not valid: it could belong to a dead worker, or any random process. In any case, we don’t want invalid pids to be added to the list of available workers in our state!

Our changes so far

Let’s take our code for a spin:

iex(1)> me = self()
#PID<0.140.0>
iex(2)> client = spawn(fn ->
...(2)> Process.send(me, {:worker, PoolToy.PoolMan.checkout()}, [])
...(2)> receive do
...(2)> :die -> :ok
...(2)> end
...(2)> end)
#PID<0.148.0>
iex(3)> worker = receive do
...(3)> {:worker, w} -> w
...(3)> end
#PID<0.137.0>
iex(4)> :observer.start()
:ok

This is the first part of the code that proved problematic in the last post. In the Observer, go to the “Table Viewer” tab. What can we see there? A “monitors” entry, which is the name of the ETS table we’re using. Double-click it to see its contents. As you can see, we’ve got an entry in there for our checked out worker. If take a look at the state for PoolMan in the “Applications” tab, you’ll see we’ve got only 2 remaining workers as expected. Let’s continue:

iex(5)> PoolToy.PoolMan.checkin(worker)
:ok

Look in the Observer again: the monitors table is now empty, and the PoolMan state has 3 workers available. Continuing:

iex(6)> worker = PoolToy.PoolMan.checkout()                                   
#PID<0.137.0>
iex(7)> Process.send(client, :die, []) :ok

And our application state is still correct: we’ve got a single entry in the monitors table, and 2 available workers. In other words, killing the client process didn’t affect anything because the worker it was using had been checked in before it died.

So we’re done with our single pool implementation, right? Nope. We’ve still go a small issue to iron out. Try poking around in Observer killing processes in various application states (you can kill a process in the “applications” tab by right-clicking on the process and selecting the last “kill process” option). Does the pool manager state remain correct in all cases? Can you identify the problem? Check your answer in the next post.


Would you like to see more Elixir content like this? Sign up to my mailing list so I can gauge how much interest there is in this type of content.

This entry was posted in Elixir. Bookmark the permalink.