PGQ Tutorial

From PostgreSQL wiki
Jump to navigationJump to search

PGQ is the queueing solution from Skytools. The Londiste replication solution is a consumer daemon built on PGQ, and the API is accessible for you to create any asynchronous processing facility, based on queuing.

Hint: As of 2021 there is a proposal for an additional part of the SQL standard: "SQL/PGQ" (Property Graph Queries) [1]. Don't confuse the information given here with this proposal.

What problems is PGQ a solution for?

PGQ will solve asynchronous batch processing of live transactions.

That means you're doing some INSERT/DELETE/UPDATE of rows from your live environment, and you want to trigger some actions but not at COMMIT time, later on. Not in a too distant future either, just asynchronously: without blocking the live transaction.

Every application of a certain size will need to postpone some processing at a later date, and PGQ is a generic high-performance solution built for PostgreSQL that allows implementing just that: the batch processing. PGQ will care about the asynchronous consuming of events, the error management, the queuing behaviour, etc, and it comes with a plain SQL API to do this.

The API documentation is online, this document will consider you know about it.

Installation and setup

You'll need to have a ticker instance running on the database where events get produced, which means you need to provide a ticker.ini configuration file and run a ticker daemon.

The ticker

See Londiste_Tutorial#The_ticker_daemon for an introduction on the topic, a configuration example and how to start the daemon.

The ticker will produce ticks, which will then serve as boundaries to batch events. Those batches are produced on-demand, each time a consumer is asking for a new batch, and will get produced such as any batch contains either ticker_max_lag seconds worth of events or ticker_max_count events, whichever comes first.

The daemon is responsible for the maintenance of the queues, which is done by rotating the events INSERT in 3 tables and using TRUNCATE as soon as one table does not contain unprocessed events anymore.

Producing and Consuming Events

In order not to lose any event, you'll want to make sure there's at least one consumer registered before to produce any event, as events nobody's interested in are lost.

You can have as many consumers as you want to on the same event queue, but they will all see the same events rather than share the workload. That makes sense if you consider that the main application is replication: consumers are replicas, when you have more than one you want them to all process the same events in order for all the replicas to have the same data set.

There's a work in progress in order to provide pgq cooperative consumers, the progress is currently stalled on this [2] mailing list thread.

More Info

See the following for more info:

Producing Events

The documentation is quite clear about this, you produce event either using a function based API or using a trigger based one. This latter is prefered, as it provides easy SQL data type validation, and a known interface to insert event, the classic INSERT INTO.

Writing a PGQ consumer

Your best option is to write the consumer code in python, next best option is PHP. If you want to reuse existing code written in another language, you'll have to write yourself the Operating System daemon support code and the looping around the API entry points of PGQ.

All the PGQ logic is written as PostgreSQL extension and server side code, so all you need your language to provide is a way to connect to your PostgreSQL server and call functions.

How to use the PGQ SQL API

You will first need your consumer code to be able to register and unregister from an existing queue.

Then the main idea is pretty simple: the consumer loops over the pgq.next_batch() call as fast as possible until it receives NULL, that's when it should take a rest.

Please note that the ticker daemon will produce ticks every ticker_idle_period when your system is not producing events, so it's perfectly normal (and expected) to get empty batches in low activity periods, such as when debugging your first consumer. Empty batches need to get consumed as fast as possible, only have your custom subscriber rest when pgq.next_batch() returns NULL.

When pgq.next_batch() returns an id (bigint), you get to process the batch events, which you obtain with pgq.get_batch_events(batch_id). What to do with each event should be specific to the consumer code rather than in a common PGQ consuming abstraction interface (or library, package, whatever your programming environment facility is named).

The event processing function should be able to tag events as processed (OK), failed or to retry later. In the retry case, the event will get introduced again in a latter batch, which one depends on how much later you want it retried.

Once all events of a batch are processed, you have to call pgq.finish_batch(batch_id) then COMMIT your batch processing transaction.

In pseudo language this gives:

  batch_id = pgq.next_batch()
  if batch_id is not null
    events = pgq.get_batch_events(batch_id);
    // this could be a function pointer or a virtual method or a delegate, e.g.
  end if;
while true;

Remote consuming

This is the situation where the processing of the events happens on another database than the one where the events got produced. Londiste is a good example of remote consuming.

In this case, you will need to implement a way to avoid processing again a batch when the processing COMMIT succeeded but the pgq.finish_batch() (which has to be done on the producer database rather than the consumer one) fails. How the 3 other scenarios are not a problem is left to the reader.

Skytools provides pgq_ext (extension) as a way to ensure that you won't process the same batch more than once, and this is discussed here: [[3]].

The pgq_ext idea is to record on the consumer database the last batch_id processed, and have the UPDATE be done in the processing transaction. This can be refined to the last event id processed, for cases where you don't have an easy way to rollback all the batch processing in case of failure at the remote site.

As a side note, you might want to know that londiste is using pgq_ext but moves the SQL entries (functions and tables) into the londiste schema.

Non transactional processing

Your consumer code could be sending mail rather than changing a database state. In this case the processing isn't transactional (you can't rollback the batch processing) and you'll have to solve the reliability issues yourself, PGQ won't provide the magic.

Using the python API

Skytools are written mainly in python and provide everything you need to write your own consumers.

Use-case: row counter for count(*) speedup

Here's a python code snippet of a PGQ consumer:

import pgq 
class RowCounter(pgq.Consumer): 
    def process_batch(self, db, batch_id, ev_list): 
        tbl ='table_name'); delta = 0 
        for ev in ev_list: 
            if   ev.type == 'I' and ev.extra1 == tbl: delta += 1 
            elif ev.type == 'D' and ev.extra1 == tbl: delta -= 1 
        q = 'select update_stats(%s, %s)' 
        db.cursor().execute(q, [tbl, delta]) 
RowCounter('row_counter', 'db', sys.argv[1:]).start() 

Using the PHP API

A PHP API has been contributed too, allowing to write PGQ consumer daemon in PHP easily. See README for more detail.

Here's an example

require( "pgq/PGQRemoteConsumer.php" );

define("CONFIGURATION", "conf/duration.php");

$con_src = "dbname=foo_db port=5432 host=localhost";
$con_dst = "dbname=bar_db port=5432 host=localhost"

class PGQDaemonExample extends PGQRemoteConsumer
	public function config( )
		if( $this->log !== null )
			$this->log->notice("Reloading configuration (HUP) from '%s'", CONFIGURATION);

		global $Config;
		$this->loglevel = $Config["LOGLEVEL"];
		$this->logfile  = $Config["LOGFILE"];
		$this->delay    = $Config["DELAY"];
	public function process_event( &$event ) 
		$this->log->notice("Starting process event");
		$id = $event->data["id"];
		$code = $event->data["code"];
		$data = $event->data["data"];
		$this->log->notice("Processing event : %d ", $id);
		$query = sprintf( "UPDATE table SET ... WHERE ...", $code );
		$this->log->debug( $query );
		$result = pg_query( $this->pg_dst_con, $query );
		if( $result === False ) 
			$this->log->error( "Unable to update : %s ", $query );
			$event->retry_delay = 2 * $this->delay;
			return PGQ_EVENT_RETRY;
		return PGQ_EVENT_OK;

$daemon = new PGQDaemonExample( "mydaemon", "daemonq", "table", $argc, $argv, $con_src, $con_dst );

Java API

A Java consumer API is also available. Documentation is available in the file.

Trouble Shooting

See Londiste's sections about it.