Asynchronous Notifications in Postgres

- postgres ruby sequel websockets

I’m fascinated by Postgres: the more I learn about it, the more I realize how much I still don’t know. Recently I discovered its asynchronous communication capabilities, which apparently have been around for a long time ¯\(ツ)

Let’s look at the two most interesting commands related to this topic, NOTIFY and LISTEN. Here’s what the documentation has to say on them:

NOTIFY provides a simple interprocess communication mechanism for a collection of processes accessing the same PostgreSQL database. A payload string can be sent along with the notification, and higher-level mechanisms for passing structured data can be built by using tables in the database to pass additional data from notifier to listener(s).

Whenever the command NOTIFY channel is invoked, either by this session or another one connected to the same database, all the sessions currently listening on that notification channel are notified, and each will in turn notify its connected client application.

LISTEN registers the current session as a listener on the notification channel named channel. If the current session is already registered as a listener for this notification channel, nothing is done.

Sounds like publish-subscribe on the database level, interesting! I learn best by trying things out and writing some code, so let’s dive in.

Setting up Postgres for notifications

For testing purposes, let’s create an overly simplified orders table, that except for the primary key also contains an email address to identify the person who placed the order and a bigint field to store the total order amount in cents:

CREATE TABLE orders (
  id SERIAL PRIMARY KEY,
  email TEXT NOT NULL,
  total BIGINT NOT NULL
);

Next we need to define a function which returns a trigger:

CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$
  DECLARE
    record RECORD;
    payload JSON;
  BEGIN
    IF (TG_OP = 'DELETE') THEN
      record = OLD;
    ELSE
      record = NEW;
    END IF;

    payload = json_build_object('table', TG_TABLE_NAME,
                                'action', TG_OP,
                                'data', row_to_json(record));

    PERFORM pg_notify('events', payload::text);

    RETURN NULL;
  END;
$$ LANGUAGE plpgsql;

The above is pretty straightforward:

  1. Declare some variables for later use.
  2. Switch on the TG_OP special variable to decide which version of the row we want to serialize.
  3. Use json_build_object and row_to_json to generate the notification payload.
  4. Use pg_notify to broadcast a message on the events channel.
  5. Return NULL since this is an AFTER trigger.

Now we can create a notify_order_event trigger, which will call this function after we perform a CRUD operation on the orders table:

CREATE TRIGGER notify_order_event
AFTER INSERT OR UPDATE OR DELETE ON orders
  FOR EACH ROW EXECUTE PROCEDURE notify_event();

With this in place we should now be able to receive events. Let’s inform Postgres that we’re interested in notifications on the events channel:

LISTEN events;

Now whenever we insert, update or delete a record we will receive a notification:

INSERT into orders (email, total) VALUES ('test@example.com', 10000);
INSERT 0 1
Asynchronous notification "events" with payload "{"table" : "orders", "action" : "INSERT", "data" : {"id":1,"email":"test@example.com","total":10000}}" received from server process with PID 5315.

Great, we just received our first asynchronous notification, though admittedly that’s not particularly useful within the same psql session, so let’s add another listener.

Listening from another process

For the following example we’ll once again use Jeremy Evan’s excellent Sequel gem:

require 'sequel'

DB = Sequel.connect('postgres://user@localhost/notify-test')

puts 'Listening for DB events...'
DB.listen(:events, loop: true) do |_channel, _pid, payload|
  puts payload
end

The above code first connects to the database and then uses Sequel::Postgres::Database#listen to listen for events in a loop.

If we start this script and insert a record in our database the JSON payload will get output to the console:

→ ruby test.rb
Listening for DB events...
{"table" : "orders", "action" : "INSERT", "data" : {"id":2,"email":"test@example.com","total":10000}}

Nice, but still not terribly useful. Wouldn’t it be nice if we could forward this notification to our web browser to build a dashboard or something? Enter WebSockets.

Connecting a frontend

Since this is just a simple demo and not a full-fledged app I felt like Rails + ActionCable would be overkill, so I decided to just write a very simple Rack app with Faye instead:

require 'faye/websocket'
require 'sequel'

DB = Sequel.connect('postgres://user@localhost/notify-test')

App = lambda do |env|
  ws = Faye::WebSocket.new(env)

  DB.listen(:events, loop: true) do |_channel, _pid, payload|
    ws.send(payload)
    ws.rack_response
  end
end

Like every Rack “app” this is just an object supporting call, in our case a lambda. In there we create a websocket which we use to forward the messages we receive from Postgres to the browser. To receive them there, all we need to do is instantiate a Websocket with an onmessage handler that logs the payload to the console:

socket = new WebSocket('ws://localhost:9292');
socket.onmessage = event => console.log(event.data);

Whenever we create, modify or delete a new record in our orders table now, we’ll be notified in the browser:

Chrome console

Summary

Postgres’ LISTEN and NOTIFY offer easy interprocess communication via a publish-subscribe pattern. There are many potential use cases for this mechanism, from logging over ETL to realtime dashboards.