Parallel Sort

From PostgreSQL wiki

Jump to: navigation, search

Contents

Overview

If we could perform large sorts in parallel, we could make queries, and more importantly index builds, run faster. Also, it would be useful infrastructure for supporting general Parallel Query Execution. We imagine that the user backend will be supported by one or more "worker backends" which will be similar to a normal backend, but with no client connection. In broad terms, we expect them to look a lot like autovacuum worker processes, but with some differences: each will be associated with a user backend, and data will be passed back and forth between the user backend and its workers, and possibly among workers, sometimes in large volumes.

Process Management and IPC

Dynamic Shared Memory

Shared memory is much faster than any other IPC mechanism, because it allows data to be passed between processes without kernel involvement. However, the main shared memory segment isn't suitable for parallel execution, because its size is fixed at startup and can't be changed. Parallel sort may may require many gigabytes of work space (depending on the configured value of work_mem), and this may amount to a significant percentage of the system's available memory. Therefore, we can't take the approach of pre-allocating space in the main shared memory segment on the off chance that we may wish to perform a parallel sort. Instead, we need the ability to create new shared memory segments on the fly. As of October 2013 (commit 0ac5e5a7e152504c71ce2168acc9cef7fde7893c), we now have this ability. The API looks like this:

extern dsm_segment *dsm_create(Size size);
extern dsm_segment *dsm_attach(dsm_handle h);
extern void dsm_detach(dsm_segment *seg);
extern void *dsm_segment_address(dsm_segment *seg);
extern Size dsm_segment_map_length(dsm_segment *seg);
extern dsm_handle dsm_segment_handle(dsm_segment *seg);

As of this writing (October 31, 2013), there are some residual problems with the dynamic shared memory system. It does not have anything comparable to the on_shmem_exit mechanism we use for the main shared memory segment, nor is there any easy way to lay out data structures in shared memory as we do for the main shared memory segment using ShmemInitHash, ShmemInitStruct, etc. There are proposed patches to fix these problems.

Worker Backends

PostgreSQL 9.3 introduced the ability to configure background worker processes at postmaster startup time (commit da07a1e856511dca59cbb1357616e26baa64428e). This is, however, insufficient for parallel sort, where we need to be able to start background workers when it's time to do parallel sorting and shut them down afterwards. The ability to do this was added in July 2013 (commit 7f7485a0cde92aa4ba235a1ffe4dda0ca0b6cc9a). Subsequent commits in August 2013 and October 2013 introduced the ability to determine whether a dynamically registered background worker was started, whether it is still running, and if so what its PID is (commit 090d0f2050647958865cb495dff74af7257d2bb4) and the ability to reliably terminate a previously-registered background worker (commit 523beaa11bdf6a9864e8978b467ed586b792c9ca). The relevant APIs look like this:

extern bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle);
extern BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp);
extern BgwHandleStatus WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pid);
extern void TerminateBackgroundWorker(BackgroundWorkerHandle *handle);

Shared Memory Message Queues

Moving data from one process to another through a dynamic shared memory segment is easy enough if the data is fixed size, that size is known when allocating the dynamic shared memory segment, and the storage used need not be reclaimed until the dynamic shared memory segment is destroyed. In general, though, these things aren't always true. There are proposed patches which introduce a shared memory message queue system with the following API:

extern shm_mq *shm_mq_create(void *address, Size size);
extern void shm_mq_set_receiver(shm_mq *mq, PGPROC *);
extern void shm_mq_set_sender(shm_mq *mq, PGPROC *);
extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle);
extern void shm_mq_detach(shm_mq *);
extern shm_mq_result shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait);
extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait);

This system creates a ring buffer in shared memory that can be used to propagate messages between backends. Messages are simply strings of bytes of arbitrary length. This is intended to provide a higher-level abstraction which user backends and workers can use to communicate with each other.

Parallel Mode

In the long term, parallel query will call for the ability to read data from database tables. For example, a parallel sequential scan with filter can hardly perform well without that capability. The need is more limited in the context of parallel sort, arising when a worker backend encounters toast pointers and catcache misses. For example, sorting of enum types can't be done without reading pg_enum. Even executing a btree comparator - of any sort - requires a bunch of catalog accesses. It will be convenient to let the workers perform their own table reads in those scenarios. The chief alternative is to funnel such reads through the user backend. That would be complex to implement and would limit the possible gains to be realized from parallelism. Such machinery would then become obsolete when we later grant workers the ability to read tables directly. We're better off starting with full table read capability.

These problems are general; therefore, it seems best to address them by creating a general concept of "parallel mode", a general toolkit for starting up workers and initializing them with suitable state to allow them to usefully assist a parallel computation.

State Synchronization

There are several pieces of state that we will synchronize to each new background worker as it starts up. We'll also prohibit non-transient changes to this state in either the workers or the user backends so that, through the parallel phase, all backends are guaranteed to have the same view of the world. Transient changes to these pieces of state that are local to a particular operation may be allowable even while more enduring changes are not. We currently plan to copy the following state from the user backend to each worker, with corresponding prohibitions against further changes to the values until parallel mode is concluded:

  • User ID and database.
  • GUCs. Temporary changes that will be reverted, such as entering a function with proconfig set, are allowable even in parallel mode, but changes intended to endure (e.g. something like calling set_config), must be prohibited.
  • Transaction state. Starting or ending subtransactions isn't allowable within parallel mode; e.g., you can't do something like SAVEPOINT or ROLLBACK TO SAVEPOINT. Starting and ending a subtransaction for purposes of implementing a PL/pgsql EXCEPTION block or similar might be OK, though. At end-of-transaction, the worker must not try to write a commit or abort record; that should be done by the user backend. (It may be helpful to think of the worker as running in an XID-less read-only subtransaction of the parent's (sub)xact, and have it abort that subtransaction and then discard the rest of its copied transaction stack.)
  • CurrentSnapshot, ActiveSnapshot. Any transaction snapshot must be established, in the user backend, before parallelism begins. Prohibit CommandCounterIncrement(), which would modify CurrentSnapshot. Temporary changes by way of PushActiveSnapshot()/PopActiveSnapshot() are fine.
  • Combo CID hash. All database writes will need to be forbidden in parallel mode, because anything that creates a new combo CID would be problematic. We might eventually look for ways to relax this restriction, but it's not a serious problem for parallel sort.
  • Prepared statements. Alternately, we could prohibit EXECUTE in parallel mode. PREPARE/DEALLOCATE should be prohibited in any case. Internal use of cached plans, such as PL/pgSQL employs, is fine.

Operations Prohibited In Background Workers

These operations may be allowed in the user backend even while parallel workers are in existence, but the workers themselves must not be allowed to perform them.

  • Sequences. Sequence values are cached in backend-local memory, so accessing the same sequence in multiple cooperating backends would have different user-visible semantics from accessing it multiple times within the same backend. That might be OK, since users arguably shouldn't be relying on the specific values assigned anyway. A bigger problem is that a parallel worker which touches a sequence won't update the master's notion of currval() or lastval(); rather than trying to fix that, let's just disallow all access to sequences in parallel mode.
  • Invalidation messages. Workers had better not do anything that can generate local or shared invalidation messages. Local invalidation messages seem particularly problematic, since there's no obvious way to ensure that they are processed simultaneously by all backends. Since we must already disallow writes due to the combo CID issue, it's not clear that there's any other specific thing we need to prohibit here; but even if we fixed the combo CID problem somehow, we'd still have trouble with writes to system catalogs, at least.
  • Cursors. These also have backend-local state. Even if we copied the state into all workers, they'd each be updating the cursor position separately, so the semantics wouldn't match what happens in a single backend. It seems best to insist that background workers cannot touch cursors. We might eventually make an exception for transient cursors, such as one used to implement a PL/pgsql FOR loop.
  • Large objects. Large objects have associated cursors, and operations on those objects might move those cursors. Since we don't have a way to synchronize that across multiple backends at present, let's just forbid worker backends to touch large objects for now.
  • LISTEN/NOTIFY. These would require dedicated code to propagate the change back to the user backend. Interest in calling LISTEN from parallel mode is unlikely, and interest in NOTIFY will remain unlikely so long as writes are forbidden.

Message Propagation

In the course of performing a parallel operation, code running in the worker may generate message by calling ereport() or elog(). Except when elevel >= PANIC, these messages should be propagated back to the user backend and from there to the user. Note that it's important to preserve the details of the error - for example, if worker fails with ERROR: division by zero, the user backend should fail similarly. Warnings and lesser messages must be propagated just as much as errors.

To solve this problem, we can establish one shared memory message queue per worker, with the worker as the sender and the user backend as the receiver. Whenever a message would normally be sent to the client, the worker will instead marshal it and sent it to this queue. The user backend will need to regularly drain messages from these queues so that the workers do not block on a full queue.

Heavyweight Lock Management

Without changes to the lock manager, an undetected deadlock will occur if a worker backend tries to take a lock incompatible with one already held by the user backend. We could fix this by forbidding the user backend from holding any strong locks and allowing worker backends to take only weak locks, but that feels like an artificial restriction. It seems better to revise the heavyweight lock manager so that the user backend and its workers form a locking group whose locks are mutually non-conflicting.

Resource Transfer at Successful Completion

When a parallel operation concludes without error, each worker will serialize several pieces of local state for the user backend to transfer into its own local state. This transfer is akin to the transfer of resources to the parent (sub)transaction that occurs in CommitSubTransaction(). This can perhaps be accomplished using the same message queue posited under the discussion of message propagation, above. The following resources are affected:

  • Heavyweight locks
  • pgstat counter updates
  • LocalPredicateLockHash

Temporary Buffers

If there's any possibility that background workers might try to access temporary relations belonging to the user backend, then the user backend will need to write them all out to the operating system before entering parallel mode; subsequently, both the user backend and the workers will need to refrain from modifying them. (Setting hint bits might be OK, though.)

Alternatively, we could (a) prohibit access to temporary relations in parallel mode or (b) have some kind of option to store temporary buffers in dynamic shared memory, with the same sort of lock-and-pin stuff we do for normal buffers when parallel mode is in use.

Deciding Whether to Parallelize

There are two considerations when deciding whether or not to parallelize: whether it's safe, and whether it will help performance.

Is Parallelism Safe?

If the parallel workers would end up doing any of the things that are prohibited in parallel mode (e.g. writing the database or manipulating cursors), then we don't want to try anyway and fail. Instead, we want to decide not to invoke parallelism in the first place. Also, some operations may be unsafe in ways that we can't realistically detect; for example, a pseudo-random number generator with a user-specified seed isn't safe, because the user is expecting a particular sequence of random numbers that depends on backend-local state which will not be shared among the cooperating backends.

Unless we can solve the halting problem, it won't be possible to determine conclusively whether a function is safe by mechanical inspection of that function, so we will instead need to label functions as parallel-safe, or not. It would be nice if this marking were sufficiently general as to apply to other things we might want to parallelize in the future, not just parallel sort. It's unclear whether a trivial form of labeling (e.g. proisparallelsafe) is the best route forward or whether we want to make it more generic than that. Some things that are not parallel-safe in the initial implementation may become safe in future versions as capabilities are added, so instead of simply labeling things safe or not, we could try to classify them in terms of what operations they might perform, and then compare that with the set of operations currently supported for parallel mode. Such labeling might find other applications (e.g. query optimization).

Will Parallelism Make It Faster?

There's some overhead in setting up a parallel environment, so there's probably no point in doing it if the amount of work that can be done in parallel mode is small. However, the estimates will only be as accurate as the optimizer itself. For example, in the case of sorting, int4 comparisons are ~1000x cheaper than text comparisons, but right now the system doesn't know that. We'd need to insert more sensible procost values in order to make good decisions in this area.

Perhaps for a first version of parallel sort we could simply use declarative syntax; e.g. only enable this for CREATE INDEX, and just let the user specify the desired level of parallelism. Forget about planning it and never use parallelism during query execution.

Even if an operation looks like a good candidate for parallelism, there might not be enough unused background worker slots to parallelize it. More subtly, if the gains are minimal, it might not be good for system performance overall to tie up one or even many background worker slots in order to achieve that gain; perhaps we'd be better off waiting for a better candidate to come along.

Parallel Internal Sort

There are several possible algorithms that can be used for parallel internal sort. These include parallel merge sort, parallel quicksort, and sample sort. Parallel merge sort is stable, but we don't guarantee that sorts are stable. Unlike the other algorithms, parallel quicksort does not require space for a full copy of the array, which seems like a useful advantage for PostgreSQL since we tend to be highly concerned with sort memory usage. We plan to implement this algorithm first.

Quicksort readily lends itself to parallelism, because each partition of the array produces two subpartitions each of which can be sorted by a separate processor. However, until the first few partitioning steps have been done, very few CPUs can be brought to bear in this way. To solve this problem, parallel quick sort is design so that multiple CPUs can be used for a single partitioning operation. During a parallel partitioning step, we imagine that the array is divided into fixed-size blocks. Each worker claims one block from the front of the array and one block from the back of the array, then performs the quicksort partitioning loop on the block pair. When a particular worker exhausts either its front-block or its back-block, the block is said to be neutralized, and the worker claims a new block to replace it. When no blocks remain to claim, a single process finishes the partitioning operation by cleaning up any blocks not neutralized by any worker; there cannot be more of these than the number of workers used. Parallel partitioning is a little less efficient than serial partitioning, so it is used only in the first few steps of the algorithm, until the array has been sufficiently broken up so as to allow each CPU to be devoted to a different portion of the array.

Some workers may make progress more rapidly than others, either because they receive a smaller section of the array to process or because the comparison operations for their particular values execute more quickly than for other workers (e.g. some values may require detoasting while other values are short and do not require it; some keys may differ near the beginning of the string while others differ only much later; etc.). Therefore, even after each worker has been dedicated to a separate chunk of the array, provision should be made to allow workers that finish early to "steal" work from other workers. At each recursion step, a worker splits a partition into two subpartitions; it recurses into one and stores the bounds of the other in shared memory whence an otherwise-idle worker can steal it.

Shared Memory Allocation Context

If we plan to perform a parallel internal sort and then find out that the data won't fit in work_mem, we need to switch to a tape sort. Ideally, we don't want this to involve redoing too much work. For example, if the layout of tuples in dynamic shared memory is very different from what we do for a non-parallel sort, we might need to rearrange all the data; that would stink. To solve this problem, perhaps we can create a new type of memory context that allocates from shared memory. When shared memory is exhausted, it fails over to backend-private memory (perhaps pointing to another context, or perhaps using bespoke code). We can check whether the context has overflowed to know whether or not we should try to invoke the parallel-internal-sort code.

Parallel Quick Sort Bookkeeping

Here's a draft of the shared-memory data structure we might use for parallel quicksort; originally by Noah Misch, with modifications by Robert Haas.

/*
 * Size of a parallel work unit block.  Smaller values add contention, but
 * they reduce the amount of serial work to be done by the coordinator when
 * finalizing a parallel partition task.
 */
#define PAR_BLK_SIZE 2048

/*
 * When a worker would otherwise add to a pending list a range of tuples less
 * numerous than this, it shall instead fully sort the range itself.  A higher
 * value curbs bookkeeping overhead, but it causes earlier starvation.  The
 * ideal value is actually dependent on comparator cost.
 */
#define PAR_LOCAL_THRESHOLD 8192

/*
 * Index into a memtuples array.  Currently "int" for compatibility with
 * historic tuplesort design.  At some point, this should become intptr_t or
 * simply int64 to support internal sorts of >INT_MAX tuples.
 */
typedef int memtupsz_t;

/*
 * A parallel work unit block ordinal.  It's almost certainly the same type as
 * memtupsz_t; this is just documentation.
 */
typedef int parblk_t;

struct SortWorker
{
    /* Protects all subsequent fields. */
    slock_t     mutex;

    /* Process attached to this slot. */
    PGPROC     *proc;

    /* True if this sort process aborted unexpectedly. */
    bool        proc_died;

    /*----------
     * Work assignment indicator, either an index into AllSortWorkers or -1.
     * A worker plays one of three roles at any given time: coordinator,
     * helper, or individual:
     *      coordinator:    assignment == my_offset
     *      helper:         assignment != my_offset, assignment != -1
     *      individual:     assignment == -1
     *----------
     */
    int         assignment;

    /*
     * Coordinators and individuals: assigned tuple range.  May be -1 for an
     * individual to signal that the worker should search pending lists for
     * further work.
     */
    memtupsz_t  first;                  /* first tuple of in assigned range */
    memtupsz_t  count;                  /* number of tuples in assigned range */

    /*
     * Coordinators: block reservation bookkeeping.  Block b covers the range
     * [first, first + Min(count, b * PAR_BLK_SIZE)).  Thus the last block is
     * typically smaller than PAR_BLK_SIZE.  When a worker seeking to reserve
     * a new block finds next_left > next_right, its contribution to the
     * parallel phase of this partitioning task is finished.
     */
    parblk_t    next_left;              /* lowest unreserved left block */
    parblk_t    next_right;             /* highest unreserved right block */

    /* Individuals: first undefined position in the serial_pending stack. */
    int         pending_stack_pos;

    /*
     * Array with one entry per process participating in the sort.  Usage will
     * often be sparse, but we still consume only 78 KiB for 100 workers.
     */
    union
    {
        /*
         * Coordinators: left and right blocks reserved by each participating
         * worker (coordinator itself or a helper).  A participant's
         * AllSortWorkers offset is also its offset in the coordinator's "u"
         * array.  Left and right reservations both start at -1, indicating
         * that no block is reserved.  A busy worker has two blocks reserved.
         * A finished worker most typically has one block reserved, though it
         * will have zero blocks reserved when it neutralized its final pair
         * of blocks simultaneously.
         */
        struct
        {
            parblk_t    left;
            parblk_t    right;
        } parallel_reserved;

        /*
         * Individuals: stack of pending tuple ranges.  If the worker would
         * push onto its stack and finds the stack to be full, the worker will
         * continue stack depth accrual in local memory.  If we manage to fill
         * this much stack, it's unlikely that other individuals will drain it
         * and starve before we manage to add to it once again.
         */
        struct
        {
            memtupsz_t  first;
            memtupsz_t  count;
        } serial_pending;
    } u[FLEXIBLE_ARRAY_MEMBER];
};

/*
 * Array of SortWorker, one per process participating in this sort.  Offset
 * zero is for the foreground process that initiated the sort.  Each
 * background worker learns its offset at startup.
 */
struct SortWorker *AllSortWorker;
Personal tools