Asynchronous Notifications in Postgres
- postgres ruby sequel websockets
I’m fascinated by Postgres: the more I learn about it, the more I realize how much I still don’t know. Recently I discovered its asynchronous communication capabilities, which apparently have been around for a long time ¯\(ツ)/¯
Let’s look at the two most interesting commands related to this topic, NOTIFY
and LISTEN
. Here’s what the documentation has to say on them:
NOTIFY provides a simple interprocess communication mechanism for a collection of processes accessing the same PostgreSQL database. A payload string can be sent along with the notification, and higher-level mechanisms for passing structured data can be built by using tables in the database to pass additional data from notifier to listener(s).
Whenever the command NOTIFY channel is invoked, either by this session or another one connected to the same database, all the sessions currently listening on that notification channel are notified, and each will in turn notify its connected client application.
LISTEN registers the current session as a listener on the notification channel named channel. If the current session is already registered as a listener for this notification channel, nothing is done.
Sounds like publish-subscribe on the database level, interesting! I learn best by trying things out and writing some code, so let’s dive in.
Setting up Postgres for notifications
For testing purposes, let’s create an overly simplified orders
table, that except for the primary key also contains an email address to identify the person who placed the order and a bigint
field to store the total order amount in cents:
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
email TEXT NOT NULL,
total BIGINT NOT NULL
);
Next we need to define a function which returns a trigger:
CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$
DECLARE
record RECORD;
payload JSON;
BEGIN
IF (TG_OP = 'DELETE') THEN
record = OLD;
ELSE
record = NEW;
END IF;
payload = json_build_object('table', TG_TABLE_NAME,
'action', TG_OP,
'data', row_to_json(record));
PERFORM pg_notify('events', payload::text);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
The above is pretty straightforward:
- Declare some variables for later use.
- Switch on the
TG_OP
special variable to decide which version of the row we want to serialize. - Use
json_build_object
androw_to_json
to generate the notification payload. - Use
pg_notify
to broadcast a message on theevents
channel. - Return
NULL
since this is anAFTER
trigger.
Now we can create a notify_order_event
trigger, which will call this function after we perform a CRUD operation on the orders
table:
CREATE TRIGGER notify_order_event
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE PROCEDURE notify_event();
With this in place we should now be able to receive events. Let’s inform Postgres that we’re interested in notifications on the events
channel:
LISTEN events;
Now whenever we insert, update or delete a record we will receive a notification:
INSERT into orders (email, total) VALUES ('test@example.com', 10000);
INSERT 0 1
Asynchronous notification "events" with payload "{"table" : "orders", "action" : "INSERT", "data" : {"id":1,"email":"test@example.com","total":10000}}" received from server process with PID 5315.
Great, we just received our first asynchronous notification, though admittedly that’s not particularly useful within the same psql
session, so let’s add another listener.
Listening from another process
For the following example we’ll once again use Jeremy Evan’s excellent Sequel gem:
require 'sequel'
DB = Sequel.connect('postgres://user@localhost/notify-test')
puts 'Listening for DB events...'
DB.listen(:events, loop: true) do |_channel, _pid, payload|
puts payload
end
The above code first connects to the database and then uses Sequel::Postgres::Database#listen
to listen for events in a loop.
If we start this script and insert a record in our database the JSON payload will get output to the console:
→ ruby test.rb
Listening for DB events...
{"table" : "orders", "action" : "INSERT", "data" : {"id":2,"email":"test@example.com","total":10000}}
Nice, but still not terribly useful. Wouldn’t it be nice if we could forward this notification to our web browser to build a dashboard or something? Enter WebSockets.
Connecting a frontend
Since this is just a simple demo and not a full-fledged app I felt like Rails + ActionCable would be overkill, so I decided to just write a very simple Rack app with Faye instead:
require 'faye/websocket'
require 'sequel'
DB = Sequel.connect('postgres://user@localhost/notify-test')
App = lambda do |env|
ws = Faye::WebSocket.new(env)
DB.listen(:events, loop: true) do |_channel, _pid, payload|
ws.send(payload)
ws.rack_response
end
end
Like every Rack “app” this is just an object supporting call
, in our case a lambda
. In there we create a websocket which we use to forward the messages we receive from Postgres to the browser. To receive them there, all we need to do is instantiate a Websocket
with an onmessage
handler that logs the payload to the console:
socket = new WebSocket('ws://localhost:9292');
socket.onmessage = event => console.log(event.data);
Whenever we create, modify or delete a new record in our orders
table now, we’ll be notified in the browser:
Summary
Postgres’ LISTEN
and NOTIFY
offer easy interprocess communication via a publish-subscribe pattern. There are many potential use cases for this mechanism, from logging over ETL to realtime dashboards.