From PostgreSQL wiki

Jump to: navigation, search

DTM resources:


eXtensible Transaction Manager API

We are going to develop generic and universal manager of distributed transactions for PostgreSQL which can be used in many different scenarios. For example there are several tools for PostgreSQL supporting sharding (horizontal partitioning of data): pgshard, pgpool,... Them can scatter data between multiple nodes and execute distributed queries. But them are not able to provide global consistency. I.e. if you are transferring money from one banking account to another, and accounts are scattered between different nodes, then distributed query calculating total balance may show different values. There is not such problem in Postgres-XC/XL which uses GTM (global transaction monitor) to provide global snapshots. But Postgres-XC is clone of Postgres which contains huge number of changes in Postgres code (more than 300 thousand lines). There is no chance to merge Postgres-XC/XL in the current state in the main Postgres development branch.

In contrast to Postgres-XC/XL, our distributed transaction manager (DTM) is intended to be implemented mostly as PostgreSQL extension. We are going to implement most of functionality using standard Postgres extension and use Postgres callback mechanism to alter default Postgres behavior.

The first question we have to answer was whether we need to support local transactions. Local transaction is executed just at one node and doesn't not communicate with DTM. Speed of local transaction execution should be the same as of normal Postgres transactions. Postgres XC/XL does not support local transactions: all transactions require interaction with GTM. But it is well know fact that performance of distributed system is acceptable only if most of transactions are local. And in well designed system it is most common case: entities which are used to be accessed together should be located at the same node.

We consider three different approaches:

All approaches are implemented on top of eXtensible transaction manager API:

typedef struct
	/* Get current transaction status (encapsulation of TransactionIdGetStatus in clog.c) */
	XidStatus (*GetTransactionStatus)(TransactionId xid, XLogRecPtr *lsn);

	/* Set current transaction status (encapsulation of TransactionIdGetStatus in clog.c) */
	void (*SetTransactionStatus)(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);

	/* Get current transaction snaphot (encapsulation of GetSnapshotData in procarray.c) */
	Snapshot (*GetSnapshot)(Snapshot snapshot);

	/* Assign new Xid to transaction (encapsulation of GetNewTransactionId in varsup.c) */
	TransactionId (*GetNewTransactionId)(bool isSubXact);

	/* Get oldest transaction Xid that was running when any current transaction was started (encapsulation of GetOldestXmin in procarray.c) */
	TransactionId (*GetOldestXmin)(Relation rel, bool ignoreVacuum);

	/* Check if current transaction is not yet completed (encapsulation of TransactionIdIsInProgress in procarray.c) */
	bool (*IsInProgress)(TransactionId xid);

	/* Get global transaction XID: returns XID of current transaction if it is global, InvalidTransactionId otherwise */
	TransactionId (*GetGlobalTransactionId)(void);

	/* Is the given XID still-in-progress according to the snapshot (encapsulation of XidInMVCCSnapshot in tqual.c) */
	bool (*IsInSnapshot)(TransactionId xid, Snapshot snapshot);

        /* Detect distributed deadlock */
        bool (*DetectGlobalDeadLock)(PGPROC* proc);
} TransactionManager;

This API is not introducing any new abstractions, it just encapsulate some (not all) PostgreSQL functions related with transaction management. We can't prove that it is necessary and sufficient set of functions. But we have implemented several different distributed transaction managers on top of this API, so it is proven to be flexible enough.

Why did we choose exactly this functions? PostgreSQL transaction manager has many different functions, some of them are doing almost the same things, but in different way. For example consider TransactionIdIsInProgress,TransactionIdIsKnownCompleted, TransactionIdDidCommit, TransactionIdDidAbort, TransactionIdGetStatus. Some of them are accessing clog, some - procarray, some just check cached value. And so them are scattered through different Postgres modules.

We have investigated code and usage of all this functions. We found out that TransactionIdDidCommit is always called by visibility check after TransactionIdIsInProgress. And it is in turn using TransactionIdGetStatus to extract information about transaction from clog. So we have included in XTM TransactionIdIsInProgress and TransactionIdGetStatus, but not TransactionIdDidCommit,TransactionIdDidAbort and TransactionIdIsKnownCompleted.

The similar story is with committing transaction. There are once again a bundle of functions: CommitTransactionCommand, CommitTransaction, CommitSubTransaction, RecordTransactionCommit, TransactionIdSetTreeStatus. CommitTransactionCommand - is function from public API. It is initiating switch of state of Postgres TM finite state automaton. We do not want to affect logic of this automaton: it is the same for DTM and local TM. So we are looking deeper. CommitTransaction/CommitSubTransaction are called by this FSM. We also do not want to change logic of processing subtransactions. One more step deeper. So we arrive at TransactionIdSetTreeStatus. And this is why it is included in XTM.

It is not always obvious which functionality should be covered by TM API. For example is checking visibility of tuples responsibility of transaction manager or not? Or what about deadlock detection? The main our argument for including such functions in XTM is that them are required to implement distributed transaction manager and then are tightly related with transaction manager. It will be difficult and have not so much sense to extract them in separate modules. For example to be able to detect distributed deadlock we need to have mapping between local and global transaction IDs. And such map is available in TM.

XTM API contains just one function responsible for tuple visibility. There is a family of HeapTupleSatisfies* functions in utils/time/tqual.c But them are based on few other functions, such as TransactionIdIsInProgress, TransactionIdIsInProgress, XidInMVCCSnapshot... As far as we do not want to change heap tuple format, we leave all manipulations with tuple status bits as it is and redefine only XidInMVCCSnapshot() function.

Custom TM should not necessarily use PostgreSQL snapshots. For example our pg_tsdtm is based on CSN (commit serial numbers). But if TM wants to preserve/extend PostgreSQL snapshot approach, then we need in XTM method for getting snapshot. PostgreSQL is using different kind of snapshots and have set of functions for obtaining them: GetCatalogSnapshot, GetTransactionSnapshot, GetNonHistoricCatalogSnapshot, GetLatestSnapshot. Plus functions maintaining stack of snapshots. But there is one low level function GetSnapshotData (defined in procarray.c, not in snapmgr.c) which actually construct snapshot. And we override exactly this function, because all higher level logic of processing snapshots seems to be common for all transaction managers.

Transaction manager may assign own transaction XID or can reuse XIDs assigned by PostgreSQL core. pg_dtm uses arbiter which assigns global XIDs. And pg_tsdtm uses local XIDs, but provide mapping them to CSNs. This is why we encapsulate GetNewTransactionId function from varsup.c. Redefinition of this function requires cut&pasting substantial piece of code. But it is difficult to decomposite this function in some lower level calls.

Below are subgraphs of PostgreSQL call graphs. Functions encapsulated by XTM are marked with Italic text:

Transaction commit:

  • CommitTransactionCommand
    • CommitTransaction
      • RecordTransactionCommit
        • TransactionIdCommitTree
          • TransactionIdSetTreeStatus

Visibility check:

  • HeapTupleSatisfies*
    • TransactionIdIsCurrentTransactionId
    • TransactionIdIsInProgress
    • XidInMVCCSnapshot
    • TransactionIdDidCommit
      • TransactionLogFetch
        • TransactionIdGetStatus

Obtain snapshot:

  • Get*Snapshot
    • GetSnapshotData

DTM approach

Let's first explain DTM approach. It requires centralized service (lets call it arbiter) which is responsible for

  • Assigning transaction IDs
  • Providing consistent snapshots to all nodes participated in transaction
  • Making decision about transaction commit or rollback
  • Maintain state of distributed transactions (needed for recovery)

The proposed architecture of DTM is the following:

      .------- Backend ----.
     /                     \
    /                       \
Coordinator -- Backend -- Arbiter
    \                       /
     \                     /
      `------ Backend ----´
libpq+xtm procs  libdtm+libsockhub    

There is an abstract coordinator. Responsibility of coordinator is to organize distributed execution of queries. It is connected to Postgres backends using some standard protocol (for example pqlib). It can be implemented in any programming language. It can be some special server process (like in Posrgres-XC) or just smart client or proxy (like pg_shard)... We want to provide the largest level of flexibility for coordinator. Right now we have provided integration of DTM with pg_shard and postgres_fdw

Coordinator is communicated with data nodes using standard SQL statement. Plus there are some special SQL functions provided by our extension used to mark start of global transaction:

CREATE FUNCTION dtm_begin_transaction() RETURNS integer
AS 'MODULE_PATHNAME','dtm_begin_transaction'

CREATE FUNCTION dtm_join_transaction(xid integer) RETURNS void
AS 'MODULE_PATHNAME','dtm_join_transaction'

At first node coordinator should call dtm_begin_transaction() which returns assigned transaction ID (XID). Assigned XID should be passed by coordinator to other nodes participated in distributed transaction using dtm_join_transaction function. Please notice, that dtm_begin/join_transaction() are also executed within some transaction, which is already assigned XID. So dtm_begin_transaction() set XID for next transaction. So them should be used in this way:

Primary node:

xid = select dtm_begin_transaction() ;
begin transaction;
commit transaction;

Other nodes:

select dtm_join_transaction(xid) ;
begin transaction;
commit transaction;

Below is example of coordinator code in GO language:

xid := execQuery(con1, "select dtm_begin_transaction()")
exec(con2, "select dtm_join_transaction($1)", xid)
exec(con1, "begin transaction")
exec(con2, "begin transaction")
exec(con1, "update t set v = v + $1 where u=$2", amount, account1)
exec(con2, "update t set v = v - $1 where u=$2", amount, account2)
var wg sync.WaitGroup
asyncExec(con1, “commit”, &wg)
asyncExec(cnn2, “commit”, &wg)

And this is example of GO code working with pg_shard+DTM:

exec(con, "begin transaction")
exec(con, "update t set v = v + $1 where u=$2", amount, account1)
exec(con, "update t set v = v - $1 where u=$2", amount, account2)
exec(con, “commit”)

Finally the example of GO code working with posstgres_fdw+DTM:

exec(con, "select dtm_begin_transaction()")
exec(con, "begin transaction")
exec(con, "update t set v = v + $1 where u=$2", amount, account1)
exec(con, "update t set v = v - $1 where u=$2", amount, account2)
exec(con, “commit”)

Evaluating DTM approach

Tests as above were performed on amazon cluster instances. Three series of tests for each number of nodes. In this test we changed number of connections from benchmark program on each step, to have constant number of backends on each host. Green line means single node performance on same workload without DTM.


tsDTM approach

This approach is based on using local timestamps (assuming that system time at all nodes is more or less synchronized): http://research.microsoft.com/en-us/people/samehe/clocksi.srds2013.pdf

This approach doesn't require centralized timestamp authority, which can be bottleneck and single point of failure. Precision of clock synchronization doesn't affect correctness of this algorithm but only the performance. This is why it is desired to use something like NTP to synchronize time at nodes.

This approach should provide the best scalability. But recovery is more challenged, because it requires some interaction between nodes, determining quorum and comming to consensus.

Architecture of tsDTM is simpler than of DTM:

      .------ Backend
Coordinator -- Backend
      `------ Backend 

But tsDTM SQL API is more complex:

-- Extend global transaction to global. This function is called only once when coordinator decides to access
-- data at some other node.
-- in: gtid (global transaction ID chosen by coordinator)
-- out: snapshot
CREATE FUNCTION dtm_extend(gtid cstring default null) RETURNS bigint
AS 'MODULE_PATHNAME','dtm_extend'

-- This function should be called by coordinator for all nodes participated in global transaction except first node
-- (at which dtm_extend() is called.
-- in: snapshot (snapshot returned by dtm_extend
-- in: gtid (global transaction ID chosen by coordinator)
-- out: adjusted snapshot (to be passed to other nodes)
CREATE FUNCTION dtm_access(snapshot bigint, gtid cstring default null) RETURNS bigint
AS 'MODULE_PATHNAME','dtm_access'

-- Start two phase commit
-- in: gtid (global transaction ID chosen by coordinator)
CREATE FUNCTION dtm_begin_prepare(gtid cstring) RETURNS void
AS 'MODULE_PATHNAME','dtm_begin_prepare'

-- Prepare transaction commit 
-- in: gtid (global transaction ID chosen by coordinator)
-- in: CSN (pass 0 for primary node)
-- out: CSN: propagated CSN
CREATE FUNCTION dtm_prepare(gtid cstring, csn bigint) RETURNS bigint
AS 'MODULE_PATHNAME','dtm_prepare'

-- Complete two phase commit
-- in: gtid (global transaction ID chosen by coordinator)
-- in: CSN (returned by dtm_prepare)
CREATE FUNCTION dtm_end_prepare(gtid cstring, csn bigint) RETURNS void
AS 'MODULE_PATHNAME','dtm_end_prepare'

At first node coordinator should call dtm_extend() which returns assigned snapshot. This snapshot should be propagated to all other nodes using dtm_access function. Last three functions are used to implement two phase commit (currently implemented on top of PostgreSQL two phase commit).

Intended usage of this API is the following:

So them should be used in this way:

Primary node:

begin transaction;
snapshot = select dtm_access(gtid, snapshot) ;


prepare transaction 'gtid';
select dtm_begin_prepare(gtid);
csn = select dtm_prepare(gtid,0)
select dtm_end_prepare(gtid,csn);
commit prepared 'gtid';

Other nodes:

begin transaction;
snapshot = select dtm_extend(snapshot,gtid) ;


prepare transaction 'gtid';
select dtm_begin_prepare(gtid);
csn = select dtm_prepare(gtid,csn)
select dtm_end_prepare(gtid,csn);
commit prepared 'gtid';

Local transaction is executed at single node in usual way without any special efforts performed by coordinator. All transactions are actually started as local. If transaction needs to involve some other node, then it should invoke dtm_extend which will return global snapshot (CSN). This snapshot should be delivered by DTM to all other nodes using dtm_access.

To commit global transaction coordinator should obtain new CSN using dtm_prepare'(gtid,csn)' . If this phase is successfully completed at all nodes, then coordinator executes COMMIT PREPARED 'GTID' ' at all nodes. In case of failure at any of nodes, coordinator executes ROLLBACK PREPARED 'GTID' at all nodes.

Below is example of coordinator code in GO language:

tsDTM evaluation

We've performed a number of tests to investigate write scalability of timestamp-based approach. Following transaction was executed against several servers:

exec(con1, "begin transaction")
exec(con2, "begin transaction")
snapshot = execQuery(con1, "select dtm_extend($1)", gtid)        
snapshot = execQuery(con2, "select dtm_access($1, $2)", snapshot, gtid)
exec(con1, "update t set v = v + $1 where u=$2", amount, account1)
exec(con2, "update t set v = v - $1 where u=$2", amount, account2)
exec(con1, "prepare transaction '" + gtid + "'")
exec(con2, "prepare transaction '" + gtid + "'")
exec(con1, "select dtm_begin_prepare($1)", gtid)
exec(con2, "select dtm_begin_prepare($1)", gtid)
csn = execQuery(con1, "select dtm_prepare($1, 0)", gtid)
csn = execQuery(con2, "select dtm_prepare($1, $2)", gtid, csn)
exec(con1, "select dtm_end_prepare($1, $2)", gtid, csn)
exec(con2, "select dtm_end_prepare($1, $2)", gtid, csn)
exec(con1, "commit prepared '" + gtid + "'")
exec(con2, "commit prepared '" + gtid + "'")

We've measured tps in cluster with different number of members. Table size = 100k. 10k requests in each thread.

Tps vs threads.png Tps vs nodes.png

On different cluster with greater number of nodes. In this test we changed number of connections from benchmark program on each step, to have constant number of backends on each host.


Green line means single server performance on the same workload with 2pc (like tsdtm) and red line with ordinary commit instead of 2pc.


Besides integration of our distributed transaction manager with existed systems like pg_shard and postgres_fdw, we have implemented multimaster for PostgreSQL based on pg_dtm.

Some background of replication support in PostgreSQL. PostgreSQL supports unidirectional master-slave replication. Moreover it supports hot standby mode in which it is possible to execute read only queries at replicas. Replication can be either asynchronous either synchronous, but even in case of synchronous replication there is a time gap between master and replica, so client executing read-only query at replica may not see changes he has performed in previous transaction at master. Also, right now only one synchronous replica is supported, so it is practically not possible to use synchronous replication for load balancing. As a result, current streaming replication in PostgreSQL provides only fault tolerance (HA), but not scaling performance.

2ndQuadrant provides Bidirectional Replication for PostgreSQL (BDR). In this case updates can be performed at any node of the cluster and then propagated to other nodes. BDR is essentially asynchronous: changes are applied at nodes some time later after committing transaction at master and various ways of resolving conflicts are proposed. BDR is really fast (provides almost the same speed as hot standby), but certainly there is no global consistency in such model.

BDR is based on new PostgreSQL feature named logical replication. Changes are extracted from WAL and are proceeded by logical output plugin. In can then apply this changes to some other database, save in log or do whatever else it likes. BDR uses logical replication to deliver changes to other nodes. Logical replication is now part of PostgreSQL 9.5, but right now it has some limitations: for example doesn't support prepared transactions and DDL statements. DDL is handled in BDR in special way.

Our multimaster is based on pglogical_output plugin provided by 2ndQuadrant. We have implemented receiver part for this plugin, which is also partly based on BDR code. At receiver side we have a pool of background workers which concurrently apply changes received from remote walsender.

Global consistency is enforced by pg_dtm (centralized arbiter). From client's point of view it works just with set of identical PostgreSQL instances. It can login and send queries to any of them. It doesn't mean whether it is read-only or update query. But certainly, as far as updates has to be applied to all nodes, multimaster is able to provide scaling only for read-only queries.

The diagram below shows performance results of multimaster installed at three nodes cluster. We run our dtmbench benchmark, varying percent of updates. We compare results of multimaster with performance of standalone PostgreSQL. Providing ACID properties for distributed transactions adds essential overhead: multimaster is about 4 times slower on updates than single node. In case of asynchronous replication it is possible to get much better results but without global consistency. At mostly read-only workloads multimaster provides much better performance, but still to beat single node we need to have at least 3 nodes in the cluster.

Rw ratio.png



Vertical axis: TPS, thouthands

Personal tools