Parallel External Sort

From PostgreSQL wiki
Jump to navigationJump to search

Parallel CREATE INDEX patch

This section summarizes parallel external sort as it relates to CREATE INDEX only. This is the initial proposal for parallel external sort, posted in mid-2016 as part of the PostgreSQL 10 development cycle. The approach taken to parallelize CREATE INDEX could in principle be duplicated by all other parts of the system that perform tuplesorts, but that isn't likely to be proposed in the near-term, since more sophisticated infrastructure is also required to get the most out of parallelism in many of these other parts of the system. The proposed patch only adds some of the pieces necessary to add parallelism around sort operations more extensively (in particular, partitioning is not introduced by the patch). A further reason that CREATE INDEX is the first client of parallel sort is that the cost model can be relatively straightforward. While these factors are much less of an issue for parallel CLUSTER (CLUSTER is roughly comparable to CREATE INDEX in some ways), that seems to already be too I/O bound for parallelism to help much, as so is unlikely to be pursued in the near term. [1]

CommitFest entry for initial parallel CREATE INDEX patch: https://commitfest.postgresql.org/13/690/

The patch:

  • Adds a parallel sorting capability to tuplesort.c.
  • Adds a new client of this capability: btbuild()/nbtsort.c can now create B-Trees in parallel.

Most of the complexity of the patch relates to infrastructure added to make temp files sharable. Parallel sort is usable in principle by every existing tuplesort caller, without any restriction imposed by the newly expanded tuplesort.h interface. So, for example, randomAccess MinimalTuple support has been added (as already noted, this will not be used for the time being).

The project has clear goals on improving performance and scalability.

Internals: New, key concepts for tuplesort.c

The heap is scanned in parallel, and worker processes also merge in parallel if required. The implementation makes heavy use of existing external sort infrastructure. In fact, it's almost the case that the implementation is a generalization of external sorting that allows workers to perform heap scanning and run sorting independently, with tapes then "unified" in the leader process for merging. At that point, the state held by the leader is more or less consistent with the leader being a serial external sort process that has reached its merge phase in the conventional manner (serially).

The steps callers must take are described fully in tuplesort.h (in the interest of clarity and brevity, some existing prototypes are skipped over here):

/*
 * Tuplesortstate and Sharedsort are opaque types whose details are not
 * known outside tuplesort.c.
 */
typedef struct Tuplesortstate Tuplesortstate;
typedef struct Sharedsort		Sharedsort;

/*
 * Tuplesort parallel coordination state.  Caller initializes everything.
 * Serial sorts should pass NULL coordinate argument.  See usage notes below.
 */
typedef struct SortCoordinateData
{
	/* Worker process?  If not, must be leader. */
	bool				isWorker;

	/*
	 * Leader-process-passed number of workers known launched (workers set this
	 * to -1).  This typically includes the leader-as-worker process.
	 */
	int					nLaunched;

	/* Private opaque state in shared memory */
	Sharedsort		   *sharedsort;
} SortCoordinateData;

typedef struct SortCoordinateData* SortCoordinate;

/*
 * (...Existing tuplesort.h comments for serial case...)
 *
 * Parallel sort callers are required to coordinate multiple tuplesort
 * states in a leader process, and one or more worker processes.  The
 * leader process must launch workers, and have each perform an independent
 * "partial" tuplesort, typically fed by the parallel heap interface.  The
 * leader later produces the final output (internally, it merges runs output
 * by workers).
 *
 * Note that callers may use the leader process to sort runs, as if it was
 * an independent worker process (prior to the process performing a leader
 * sort to produce the final sorted output).  Doing so only requires a
 * second "partial" tuplesort within the leader process, initialized like
 * any worker process.
 *
 * Callers must do the following to perform a sort in parallel using
 * multiple worker processes:
 *
 * 1.  Request tuplesort-private shared memory for n workers.  Use
 *     tuplesort_estimate_shared() to get the required size.
 * 2.  Have leader process initialize allocated shared memory using
 *     tuplesort_initialize_shared().  This assigns a unique identifier for
 *     the sort.  See BufFileGetHandle() for notes on resource management and
 *     the shared memory segment that is passed through from this point.
 * 3.  Initialize a "coordinate" argument (serial case just passes NULL
 *     here), within both the leader process, and for each worker process.
 *     Note that this has a pointer to the shared tuplesort-private
 *     structure.
 * 4.  Begin a tuplesort using some appropriate tuplesort_begin* routine,
 *     passing a "coordinate" argument, within each worker.  The workMem
 *     argument need not be identical.  All other arguments to the
 *     routine should be identical across workers and the leader.
 *     The workMem argument should be at least 64 (64KB) in all cases.
 * 5.  Feed tuples to each worker, and call tuplesort_performsort() for each
 *     when input is exhausted.
 * 6.  Optionally, workers may aggregate information/statistics about the
 *     heap scan someplace; caller must handle all those details.  Then, call
 *     tuplesort_end() in each worker process (but not for any leader-as-worker
 *     Tuplesortstate).  Worker processes can generally shut down as soon as
 *     underlying temp file state is handed over to the leader.
 * 7.  Begin a tuplesort in the leader using the same tuplesort_begin* routine,
 *     passing a leader-appropriate "coordinate" argument.  The leader must now
 *     wait for workers to finish; have the leader process wait for workers by
 *     calling tuplesort_leader_wait().  tuplesort_leader_wait() waits until
 *     workers finish, and no longer.  Note that the leader requires the
 *     number of workers actually launched now, so this need only happen after
 *     caller has established that number (after step 4).  If there was a
 *     leader-as-worker Tuplesortstate, call tuplesort_end() with it now.
 * 8.  Call tuplesort_performsort() in leader.  When this returns, sorting
 *     has completed, or leader will do final on-the-fly merge.  Consume
 *     output using the appropriate tuplesort_get* routine as required.
 * 9.  Leader caller may now optionally combine any data that may have been
 *     aggregated by workers in step 6.  (e.g., for custom instrumentation.)
 * 10. Call tuplesort_end() in leader.
 *
 * This division of labor assumes nothing about how input tuples are produced,
 * but does require that caller combine the state of multiple tuplesorts for
 * any purpose other than producing the final output.  For example, callers
 * must consider that tuplesort_get_stats() reports on only one worker's role
 * in a sort (or the leader's role), and not statistics for the sort as a
 * whole.
 *
 * Note that there is an assumption that temp_tablespaces GUC matches across
 * processes.  Typically, this happens automatically because caller uses
 * parallel infrastructure.  Note also that only a very small amount of
 * memory will be allocated prior to the leader state first consuming input,
 * and that workers will free the vast majority of their memory upon
 * reaching a quiescent state.  Callers can rely on this to arrange for
 * memory to be consumed in a way that respects a workMem-style budget
 * across an entire sort operation, and not just within one backend.
 *
 * Callers are also responsible for parallel safety in general.  However, they
 * can at least rely on there being no parallel safety hazards within
 * tuplesort, because tuplesort conceptualizes the sort as several independent
 * sorts whose results are combined.  Since, in general, the behavior of sort
 * operators is immutable, caller need only worry about the parallel safety of
 * whatever the process is through which input tuples are generated
 * (typically, caller uses a parallel heap scan).  Furthermore, note that
 * callers must be careful in providing a perfectly consistent tuple
 * descriptor across processes.  This can be more subtle than it appears,
 * since for example the RECORD pseudo-type uses transient typmods that are
 * only meaningful within a single backend (see tqueue infrastructure to
 * support transient record types).  For the cluster, index_btree and
 * index_hash APIs, callers automatically avoid problems by opening up the
 * target relation from within worker processes, since the relation's
 * cataloged attributes are necessarily not of transient types.
 */

extern Tuplesortstate *tuplesort_begin_index_btree(Relation heapRel,
                            Relation indexRel,
                            bool enforceUnique,
                            int workMem, SortCoordinate coordinate,
                            bool randomAccess);
/* ... */
extern Size tuplesort_estimate_shared(int nworkers);
extern void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers,
                                        dsm_segment *seg);
extern void tuplesort_leader_wait(Tuplesortstate *state);

The general idea is that a Tuplesortstate is aware that it might not be a self-contained sort; it may instead be one part of a parallel sort operation. You might say that the tuplesort caller must "build its own sort" from participant worker process Tuplesortstates. The caller creates a dynamic shared memory segment + TOC for each parallel sort operation (could be more than one concurrent sort operation, of course), passes that to tuplesort to initialize and manage, and creates a "leader" Tuplesortstate in private memory, plus one or more "worker" Tuplesortstates, each presumably managed by a different parallel worker process.

tuplesort.c coordinates workers and manages mutual dependencies, including having processes wait on each other to respect its ordering dependencies. Caller (nbtsort.c) is responsible for spawning workers to do the work, reporting details of the workers to tuplesort through shared memory, and having workers call tuplesort to actually perform sorting. At the lowest level, resource management is outsourced to buffile.c (and fd.c routines that are morally owned by buffile.c). The tuplesort_estimate_shared() routine requests shared memory it directly uses to keep track of shared tape state, as well as some shared memory that buffile.c requires for refcounts and other fixed per-worker state.

Caller consumes final output through leader Tuplesortstate in leader process. In practice, caller is getting a final on-the-fly merge as final output is consumed and index is physically written out.

Sharing temp files among tuplesort workers

Sort operations have a unique identifier, generated before any workers are launched, using a scheme based on the leader's PID, and a unique temp file number. Worker temp file names are generated in a deterministic fashion. This makes all on-disk state (temp files managed by buffile.c on behalf of logtape.c) discoverable by the leader process. State in shared memory is sized in proportion to the number of workers, so the only thing about the data being sorted that gets passed around in shared memory is a little logtape.c metadata for tapes, describing for example how large each constituent BufFile is (a BufFile associated with one particular worker's tapeset). There is also state for the per-sort BufFile, with one entry per worker, needed to maintain refcounts.

buffile.c, and BufFile unification

There has been significant new infrastructure added to make logtape.c able to "unify" worker tapes (changes to logtape.c itself are pretty minimal, though). buffile.c has been taught about unification as a first class part of the abstraction, with low-level management of certain details occurring within fd.c temp file routines. Note that in practice these fd.c routines are only called by buffile.c, making buffile.c the only sensible interface for temp files in postgres (temp files in the sense of files automatically deleted by resource managers, subject to temp_file_limit enforcement, etc). This infrastructure is a key part of the patch. It is clearly the single biggest source of complexity for the patch, since we must consider:

  • Creating a reasonably general, reusable abstraction for other possible BufFile users. It's possible that the ability to unify BufFiles could be useful for tuplestore.c in the future. And, there are already plans for the parallel hash join patch to use the same mechanism, which supports multiple backends opening each other's BufFiles at the same time (though only at key barrier points; temp files are considered immutable once this happens, and so even there the mechanism is a one-way, one-time means of IPC).
  • Resource management. fd.c + resource manager code are made to close relevant temp files at xact end, or when a worker needs to go away, while not necessarily deleting files at the same time. As of V8, the actual backend that needs to delete each BufFiles segment file(s) is determined based on a refcount mechanism that is implemented within buffile.c. Under this scheme, workers can go away as soon as the leader's final on-the-fly merge starts, having effectively handed over ownership of their temp file segments (BufFile) to the leader. The leader deletes these fd.c segments when the entire sort operation ends, because, due to how operations are ordered, parallel sort always leaves the leader with responsibility for "turning out the lights".
  • Crash safety (e.g., when to truncate existing temp files, and when not to). See also existing master branch remarks above RemovePgTempFiles() about collisions.

The new buffile.c interface is described in V9 of the patch as follows:

/*
 * Returns unified BufFile, from multiple worker process files.  All
 * underlying BufFiles must be temp BufFiles, and should have been
 * created with BufFileCreateUnifiable(), and therefore discoverable
 * with the minimal metadata passed by caller.
 *
 * This should be called when workers have flushed out temp file buffers and
 * yielded control to caller's process.  Workers should hold open their
 * BufFiles at least until the caller's process is able to call here and
 * assume ownership of BufFile.  The general pattern is that workers make
 * available data from their temp files to one nominated process; there is
 * no support for workers that want to read back data from their original
 * BufFiles following writes performed by the caller, or any other
 * synchronization beyond what is implied by caller contract.  All
 * communication occurs in one direction.  All output is made available to
 * caller's process exactly once by workers, following call made here at the
 * tail end of processing.
 *
 * Callers with more advanced requirements should consider an IPC mechanism
 * that is optimized for fine-grained parallel access.  Using a unified
 * BufFile makes sense when there is an access pattern characterized by
 * access to original data in physical order within workers (access in
 * undefined order, such as from a parallel heap scan), without any immediate
 * need for logical combination of the data across worker boundaries.  There
 * will perhaps be some relatively limited logical combination within the
 * caller's process, input by reading from unified BufFile in mostly
 * sequential order, but sensible use of this interface is limited to use in
 * support of parallel batch processing that isn't naturally pipelinable.  In
 * cases where this interface is appropriate, caller might well have used
 * temp files for a similar serial case due to needing to process a
 * significant volume of data.  Taking advantage of I/O parallelism should be
 * almost as important to caller as taking advantage of available CPU
 * resources; the high level batch processing task led by caller should not
 * be performed with the expectation of workers being able to eliminate their
 * input from consideration early.
 *
 * npieces is number of BufFiles involved; one input BufFile per worker, plus
 * 1 for the caller's space, which should always come last.  npieces is the
 * size of the pieces argument array.  Currently, interface assumes that
 * there is a contiguous range of worker whose BufFiles are numbered 0
 * through to npieces - 1, followed by zero-sized entry for space reserved
 * for caller process to write to.  Callers must be certain that all worker
 * BufFiles already exist.  (Caller-owned entry will not exist yet,
 * obviously.)
 *
 * Caller's pieces argument should be set to the size of each
 * discoverable/input BufFile when passed here.  The size value should
 * ultimately originate from a BufFileUnifiableHandover() call in each worker,
 * which must have been made by worker immediately prior to worker yielding
 * control to caller, and only after worker is done with all writing.  For
 * the caller's space at the end of returned BufFile, this value should
 * always be 0.  Writing in space reserved for the caller is a convention
 * that provides clear separation of responsibilities between workers and the
 * caller's unified BufFile space.  A writable space in unified BufFiles is
 * only supported because callers find this more convenient than opening a
 * separate, conventional (non-unified) BufFile to write to.
 *
 * The pieces argument also has a field that is written to as output to
 * caller; each piece's offsetBlockNumber field is set here.  Caller
 * should go on to apply these offsets for each input BufFile piece when
 * seeking within unified BufFile, typically while using the
 * BufFileSeekBlock() interface with metadata that originates from
 * workers.  This is necessary iff original block offsets are in terms
 * of the worker's original BufFile space, for example due to
 * originating from metadata read from the unified BufFile itself, the
 * typical case.  Caller needs to consider which part of the unified
 * BufFile space is being accessed to determine the correct offset to
 * apply.  The ownership of one particular worker's data should be naturally
 * delineated by distinct state built and managed by caller for each input
 * from each worker; caller's own abstraction should manage that complexity
 * (we don't do any of that for them because it's impractical for us to take
 * an interest in every possible scheme callers might have for serializing
 * data, but callers should sharply limit the places that need to know
 * anything about applying these offsets).
 *
 * If callers are prepared to deal with offsets in a clean way, and are
 * prepared to have workers lose the ability to read from their temp files
 * following ending their quiescent state, they may write even to
 * worker-owned space.  temp_file_limit enforcement will start to consider
 * blocks as owned by caller process iff caller does this, but does not count
 * worker-owned files against that limit.  This is allowed only so that our
 * caller can reuse temp file space on disk.  It is not intended as a mechanism
 * for sending data back to workers.
 */
BufFile *
BufFileUnify(SharedBF *handle, int npieces, BufFilePiece *pieces)
{
    /* (Unification of worker tapes occurs here) */
}

Memory use by parallel processes and Amdahl's law

Each worker process receives an even share of workMem budget (which is in practice always maintenance_work_mem for CREATE INDEX -- that's what is passed to tuplesort.c as the "workMem" argument for all B-Tree CREATE INDEX cases).

The new implementation reuses much of what was originally designed for external sorts. As such, parallel sorts are necessarily external sorts, even when the workMem budget could in principle allow for parallel sorting to take place entirely in memory. The implementation arguably insists on making such cases external sorts, when they don't really need to be.

Since parallel sort is only compelling for medium to large sorts, it makes sense to prioritize being able to perform parallel sort operations externally. More extensive use of shared memory (parallel internal sort) can follow in a later release.

Making every parallel CREATE INDEX use external temp files (a "logical tapeset" manages these temp files) is much less of a problem than might be assumed, since the 9.6 work on external sorting [2] [3] somewhat blurred the distinction between internal and external sorts (just consider how much time trace_sort indicates is spent waiting on writes in workers; it's typically a small part of the total time spent). Moreover, more recent work by Heikki Linnakangas on preloading within tuplesort.c for external sorts that is already committed to the master branch [4] makes this even less of a problem. And, the more recent merge heap work by Peter Geoghegan [5] also significantly helps, particularly when input into a merge is somewhat clustered.

maintenance_work_mem

Workers (including the leader-as-worker process) are given the opportunity to free memory before the leader's merge has the opportunity to allocate significant amounts of memory. Therefore, maintenance_work_mem is respected as a budget for the entire CREATE INDEX operation. Unlike with parallel query's use of work_mem, the number of workers launched shouldn't significantly affect the amount of memory used (there will only be a trivial fixed overhead per additional worker launched). However, the maintenance_work_mem setting will influence the number of workers indicated as optimal by the new cost model, by having a backstop requirement of at least 32MB of maintenance_work_mem per worker launched (the 32MB backstop is a behavior new to V9 of the patch).

All of this works quite well, because the serial merge step has access to the entire budget on its own, and can typically use that budget more effectively than the workers (quicksorting of runs) would have been able to. Users can increase maintenance_work_mem and continue to see benefits past the point where they can be seen for an equivalent serial case.

Performance and scalability Goals for the patch

The primary goal of the project is to add a parallel CREATE INDEX feature to PostgreSQL that has comparable performance and scalability to equivalent features found in other major RDBMSs.

Peter Geoghegan assessed the scalability of V4 in October of 2016 [6]. The implementation seems to fairly consistently manage a ~3x improvement over equivalent serial cases, as measured by total wall clock time taken for an entire CREATE INDEX operation (with 8 cores/workers, and 12 HDDs configured in RAID0). Note that this is based on a baseline of unpatched PostgreSQL 10 git tip -- the improvement would be more pronounced if PostgreSQL 9.6 was taken as a baseline.

The patch already meets its stated primary goal, at least based on informal comparisons (anecdotal reports suggest that the scalability in other major systems already tops out at about 3x, even with a "degree of parallelism" equal to 8 or 16). This conclusion was later bolstered by a benchmark from Thomas Munro in February of 2017 [7].

The gensort tool, which has been used for many years to generate test data for the sortbenchmark.org competition has been adopted to input an arbitrary amount of data into a Postgres table in bulk:

https://github.com/petergeoghegan/gensort

The tool has its limitations, but does at least provide a perfectly deterministic test case for testing sorting optimizations in general. Coming up with test cases that are verifiable is an important part of performance verification for parallel CREATE INDEX. One area that requires further investigation is how well parallel CREATE INDEX performs when its input table is very large (more than 1TB).

Summary

In summary, the "serial portion" of the execution of CREATE INDEX has been significantly optimized in recent years, and so Amdahl's law has not unreasonably limited the scalability of parallel CREATE INDEX.

Merging

The model for merging runs is that every worker process is guaranteed to output one materialized run onto one tape for the leader to merge within from its "unified" tapeset. This is the case regardless of how much workMem is available, or any other factor. The leader always assumes that the worker runs/tapes are present and discoverable based only on the number of known-launched worker processes, and a little metadata on each that is passed through shared memory.

Producing one output run/materialized tape from all input tuples in a worker can sometimes happen without the worker running out of workMem (its share of the total maintenance_work_mem). A straight quicksort and dump of all tuples occurs, without any merging needed in the worker. More often, though, it will prove necessary to do some amount of merging in each worker to generate one materialized output run. The worker merge is handled in almost the same way as a randomAccess case that requires a single materialized output tape to support subsequent random access (parallel tuplesort won't write out the size of each tuple unless it's actually doing a randomAccess, though). Merging performed by workers does necessitate another pass over all temp files for the worker, but that's usually a relatively small cost for large sorts.

Limited merging in parallel

Currently, merging worker output runs may only occur in the leader process. In other words, we always keep n worker processes busy with scanning-and-sorting (and maybe some merging), but then all processes but the leader process come to a halt.

One leader process is kept busy with merging these n output runs on the fly, and writing out new index (this includes WAL logging new index). Workers will sometimes merge in parallel, but only their own runs -- never another worker's runs.

Parallel merging (merging beyond that necessary for workers to each produce one final materialized tape for leader to merge serially) is one area that could be improved upon in the future. However, partitioning seems more promising, especially for parallel query (Parallel CREATE INDEX seems to be highly bottlenecked on I/O for creating the new index during the leader's merge anyway).

CREATE INDEX user interface with parallelism

There are two ways that it is determined how many parallel workers parallel CREATE INDEX requests -- a new cost model, and a new storage parameter. If the implementation cannot launch any parallel workers, a conventional serial sort is performed (the serial sort may or may not be performed internally).

Cost Model

A cost model is added, which is currently loosely based on create_plain_partial_paths()

Update: V9 greatly simplifies the cost model, and makes things work pretty much like a parallel sequential scan in terms of how workers are scaled. The size of the final index doesn't need to be considered. [8]

Workers are added at logarithmic intervals of the projected size of the new index (from V5) [9]. The intervals are a function of the current value of the GUC min_parallel_relation_size (this GUC was added in 9.6) . The new GUC max_parallel_workers_maintenance will always disable the use of parallelism when set to 0. (max_parallel_workers_maintenance is intended as the "maintenance-wise" version of max_parallel_workers_per_gather). Naturally, the new-to-postgres-10 max_parallel_workers GUC is also respected by parallel CREATE INDEX.

Main thread for cost model discussion, started in final CF for Postgres 10: https://www.postgresql.org/message-id/flat/CAH2-WzmjVMCUviDnUmrJnvhfPpzODtCM71NEHx7_QYCtz+=8ng@mail.gmail.com#CAH2-WzmjVMCUviDnUmrJnvhfPpzODtCM71NEHx7_QYCtz+=8ng@mail.gmail.com

bt_estimated_nblocks() function in pageinspect

Update: V9 doesn't use any estimate of the size of the final index at all. [10]

V5 also added a temporary testing tool for displaying how large the new cost model/optimizer projects the size of the final index will be:

 mgd=# analyze;
 ANALYZE
 mgd=# select oid::regclass as rel,
 bt_estimated_nblocks(oid),
 relpages,
 to_char(bt_estimated_nblocks(oid)::numeric / relpages, 'FM990.990') as estimate_actual
 from pg_class
 where relkind = 'i'
 order by relpages desc limit 20;
 
                         rel                         │ bt_estimated_nblocks │ relpages │ estimate_actual
 ────────────────────────────────────────────────────┼──────────────────────┼──────────┼─────────────────
  mgd.acc_accession_idx_accid                        │              107,091 │  106,274 │ 1.008
  mgd.acc_accession_0                                │              169,024 │  106,274 │ 1.590
  mgd.acc_accession_1                                │              169,024 │   80,382 │ 2.103
  mgd.acc_accession_idx_prefixpart                   │               76,661 │   80,382 │ 0.954
  mgd.acc_accession_idx_mgitype_key                  │               76,661 │   76,928 │ 0.997
  mgd.acc_accession_idx_clustered                    │               76,661 │   76,928 │ 0.997
  mgd.acc_accession_idx_createdby_key                │               76,661 │   76,928 │ 0.997
  mgd.acc_accession_idx_numericpart                  │               76,661 │   76,928 │ 0.997
  mgd.acc_accession_idx_logicaldb_key                │               76,661 │   76,928 │ 0.997
  mgd.acc_accession_idx_modifiedby_key               │               76,661 │   76,928 │ 0.997
  mgd.acc_accession_pkey                             │               76,661 │   76,928 │ 0.997
  mgd.mgi_relationship_property_idx_propertyname_key │               74,197 │   74,462 │ 0.996
  mgd.mgi_relationship_property_idx_modifiedby_key   │               74,197 │   74,462 │ 0.996
  mgd.mgi_relationship_property_pkey                 │               74,197 │   74,462 │ 0.996
  mgd.mgi_relationship_property_idx_clustered        │               74,197 │   74,462 │ 0.996
  mgd.mgi_relationship_property_idx_createdby_key    │               74,197 │   74,462 │ 0.996
  mgd.seq_sequence_idx_clustered                     │               50,051 │   50,486 │ 0.991
  mgd.seq_sequence_raw_pkey                          │               35,826 │   35,952 │ 0.996
  mgd.seq_sequence_raw_idx_modifiedby_key            │               35,826 │   35,952 │ 0.996
  mgd.seq_source_assoc_idx_clustered                 │               35,822 │   35,952 │ 0.996
 (20 rows)

This shows how the tool can very accurately estimate the size of real-world indexes *after* a fresh REINDEX. (Source data is the Mouse Genome sample database).

pg_restore

The cost model tends to use parallel CREATE INDEX inconsistently during pg_restore, since pg_class.reltuples may or may not be set when it is called. The lack of pg_statistic entries for the table is a related though distinct issue. Currently, this has the effect of making parallelism used or not use for individual CREATE INDEX statements more or less unpredictably [11].

Update: Version 7 of the patch adds a new GUC, enable_parallelddl, which pg_restore sets to "off" when run. This implies that pg_restore still respects the parallel_workers storage parameter if set, but otherwise makes no use of parallel CREATE INDEX. (An approach based on pg_restore setting the GUC max_parallel_workers_maintenance to 0 would be unworkable, since doing so will disable parallel CREATE INDEX under all circumstances, which has been deemed the wrong thing for pg_restore. This is why the new enable_parallelddl GUC was added in V7.)

As an alternative to the new-to-V7 pg_restore behavior, it might be worthwhile considering having CREATE INDEX perform some kind of analysis in the absence of any preexisting usable statistics [12].

Update: V9 of the patch does nothing special within pg_restore at all. It also introduces only one new GUC, and no new storage parameter.

parallel_workers index storage parameter

Update: V9 doesn't add a new index storage parameter. However, like any parallel heap scan, the existing parallel_workers heap storage parameter is considered.

A parallel_workers storage parameter is added, which completely bypasses the cost model. This is the "DBA knows best" approach.

As already noted, the new max_parallel_workers_maintenance caps the number of parallel workers requested, regardless of whether it was the cost model or parallel_workers storage parameter that determined the number to request. Example:

 -- Patch with 8 tuplesort "sort-and-scan" workers:
 CREATE INDEX patch_8_idx ON parallel_sort_test (randint) WITH (parallel_workers = 7);
 
 -- REINDEX, but force a serial tuplesort (without changing max_parallel_workers_maintenance, storage param would dictate use of 7 worker processes):
 SET max_parallel_workers_maintenance = 0;
 REINDEX INDEX patch_8_idx;
 -- Clean-up:
 RESET max_parallel_workers_maintenance;  
 
 -- Serial case (could be internal sort):
 CREATE INDEX serial_idx ON parallel_sort_test (randint) WITH (parallel_workers = 0);
 
 -- Leave it up to cost model to determine number of parallel workers to use (could end up as serial sort, which could be internal sort):
 CREATE INDEX cost_model_idx ON parallel_sort_test;

While there is clear precedent for having an "index version" of a storage parameter (e.g., fillfactor), there may be other issues around exactly what the new storage parameter means. For example, there could eventually be more than one way in which different parts of the system cared about the index storage parameter in different contexts (e.g., parallel index scans might reuse the storage parameter). This tension has been considered at least once [13], and this may be fine [14].

Note that trace_sort can be turned on to get details of how parallelism is used in detail. Note that there are 8 "participant" tuplesorts when 7 worker processes are successfully launched -- 1 for the leader, and the remaining for each of the launched worker processes.

Example with 2 participant processes (1 worker + 1 leader), each of which perform their own merge before the leader's final on-the-fly merge:

 postgres=# set trace_sort = on;
 SET
 postgres=# set client_min_messages = 'log';
 SET
 postgres=# create index on land_registry_price_paid_uk (price) with (parallel_workers=1);
 LOG:  begin index sort: unique = f, workMem = 1048576, randomAccess = f
 LOG:  numeric_abbrev: cardinality 1515.963752 after 10240 values (10240 rows)
 LOG:  numeric_abbrev: cardinality 1515.963752 after 10240 values (10240 rows)
 LOG:  numeric_abbrev: cardinality 2178.509579 after 20480 values (20480 rows)
 LOG:  numeric_abbrev: cardinality 2161.858806 after 20480 values (20480 rows)
 LOG:  numeric_abbrev: cardinality 3075.797830 after 40960 values (40960 rows)
 LOG:  numeric_abbrev: cardinality 3030.798768 after 40960 values (40960 rows)
 LOG:  numeric_abbrev: cardinality 4424.075663 after 81920 values (81920 rows)
 LOG:  numeric_abbrev: cardinality 4455.486412 after 81920 values (81920 rows)
 LOG:  numeric_abbrev: cardinality 6747.487758 after 163840 values (163840 rows)
 LOG:  numeric_abbrev: cardinality 6504.966941 after 163840 values (163840 rows)
 LOG:  numeric_abbrev: cardinality 9830.136484 after 327680 values (327680 rows)
 LOG:  numeric_abbrev: cardinality 9978.823948 after 327680 values (327680 rows)
 LOG:  numeric_abbrev: cardinality 14904.604827 after 655360 values (655360 rows)
 LOG:  numeric_abbrev: cardinality 14537.378637 after 655360 values (655360 rows)
 LOG:  numeric_abbrev: cardinality 24083.776053 after 1310720 values (1310720 rows)
 LOG:  numeric_abbrev: cardinality 25524.289758 after 1310720 values (1310720 rows)
 LOG:  numeric_abbrev: cardinality 44701.701832 after 2621440 values (2621440 rows)
 LOG:  numeric_abbrev: cardinality 43976.514975 after 2621440 values (2621440 rows)
 LOG:  numeric_abbrev: cardinality 61521.386147 after 5242880 values (5242880 rows)
 LOG:  numeric_abbrev: cardinality 62482.797227 after 5242880 values (5242880 rows)
 LOG:  starting quicksort of run 0/1: CPU: user: 4.13 s, system: 0.39 s, elapsed: 4.55 s
 LOG:  starting quicksort of run 1/1: CPU: user: 4.06 s, system: 0.47 s, elapsed: 4.57 s
 LOG:  finished quicksort of run 1/1: CPU: user: 32.79 s, system: 0.48 s, elapsed: 33.40 s
 LOG:  finished quicksort of run 0/1: CPU: user: 33.14 s, system: 0.39 s, elapsed: 33.58 s
 LOG:  finished writing run 0/1 to tape 0: CPU: user: 35.52 s, system: 0.93 s, elapsed: 36.51 s
 LOG:  finished writing run 1/1 to tape 0: CPU: user: 35.12 s, system: 1.10 s, elapsed: 36.35 s
 LOG:  performsort of 0 starting: CPU: user: 36.68 s, system: 1.04 s, elapsed: 38.26 s
 LOG:  performsort of 1 starting: CPU: user: 36.38 s, system: 1.24 s, elapsed: 38.24 s
 LOG:  starting quicksort of run 0/2: CPU: user: 36.68 s, system: 1.04 s, elapsed: 38.26 s
 LOG:  starting quicksort of run 1/2: CPU: user: 36.38 s, system: 1.24 s, elapsed: 38.24 s
 LOG:  finished quicksort of run 0/2: CPU: user: 42.91 s, system: 1.04 s, elapsed: 44.50 s
 LOG:  finished writing run 0/2 to tape 1: CPU: user: 43.42 s, system: 1.17 s, elapsed: 45.14 s
 LOG:  0 using 524213 KB of memory for read buffers among 2 input tapes
 LOG:  finished quicksort of run 1/2: CPU: user: 43.60 s, system: 1.24 s, elapsed: 45.48 s
 LOG:  finished writing run 1/2 to tape 1: CPU: user: 44.18 s, system: 1.36 s, elapsed: 46.17 s
 LOG:  1 using 524213 KB of memory for read buffers among 2 input tapes
 LOG:  0 finished 2-way merge step: CPU: user: 46.40 s, system: 1.73 s, elapsed: 48.67 s
 LOG:  performsort of 0 done: CPU: user: 46.41 s, system: 1.85 s, elapsed: 48.81 s
 LOG:  1 finished 2-way merge step: CPU: user: 47.09 s, system: 1.89 s, elapsed: 49.60 s
 LOG:  performsort of 1 done: CPU: user: 47.10 s, system: 2.02 s, elapsed: 49.75 s
 LOG:  performsort of -1 starting: CPU: user: 46.42 s, system: 1.85 s, elapsed: 49.77 s
 LOG:  leader using 1048500 KB of memory for read buffers among 2 worker tapes
 LOG:  performsort done (except 2-way final merge): CPU: user: 46.45 s, system: 2.09 s, elapsed: 50.04 s
 LOG:  parallel external sort ended, 52438 disk blocks used: CPU: user: 53.11 s, system: 3.39 s, elapsed: 58.76 s
 CREATE INDEX

Note that output like "finished quicksort of run 0/2" denotes the second run of participant process 0 has finished its in-memory sort step. A "participant process" is distinct from a worker process in that it could be the leader process -- this CREATE INDEX has 2 tuplesort-wise workers/participants, and only one actual worker process. Note that the ordinal identifier 0 and 1 are assigned in an undefined order here -- there is no easy way to determine which of the two is the actual leader process, since it really doesn't matter. Parallel CREATE INDEX does not reuse the executor-wise worker numbers.

There are no new maintenance-wise variant GUC for things like min_parallel_relation_size, though. Only this one new GUC is added (plus the new index storage parameter, parallel_workers).

Partitioning for parallelism (parallel external sort beyond CREATE INDEX)

While it doesn't seem critical to have extensive support for parallelizing sort-heavy operations initially, it makes sense to at least plan for a world where parallel sort is used pervasively, especially within the executor. Partitioning seems like the most plausible way to get there currently. The parallel external sort infrastructure added for parallel CREATE INDEX should be able to eventually be extended to support parallel external sorting by the executor.

Anticipated requirements

Partitioning refers to schemes that redistribute slices of worker runs across workers along predetermined partition boundaries, sort a range of values within dedicated workers, then concatenate to get final result, a bit like the in-memory samplesort algorithm. That approach would not particularly suit CREATE INDEX, because the approach's great strength is that the workers can run in parallel for the entire duration, since there is no merge bottleneck (this assumes good partition boundaries, which is of a bit risky assumption). Parallel CREATE INDEX wants something where the workers can independently write the index, and independently create a unified set of internal pages, which is hard. For indexes on logged tables, WAL logging would also have to be performed in parallel, which could leave partitioning to sort of no value over an approach with a serial merge.

This parallel CREATE INDEX patch will tend to proportionally speed up CREATE INDEX statements at a level that is comparable to other major database systems. Partitioning to sort is more useful for query execution than for utility statements like CREATE INDEX; there will often be far more that can be pushed down.

Partitioning and merge joins

Robert Haas has often speculated about what it would take to make merge joins work well in parallel. "Range distribution"/bucketing may prove an important component of that. It's just too useful to aggregate tuples in shared memory initially, and have workers sort them without any serial merge bottleneck; arguments about misestimations, data skew, and so on should not deter us from this, long term. This approach has minimal IPC overhead, especially with regard to LWLock contention.

This kind of redistribution probably belongs in a Gather-like node, though, which has access to the context necessary to determine a range, and even dynamically alter the range in the event of a misestimation. Under this scheme, tuplesort.c just needs to be instructed that these worker-private Tuplesortstates are range-partitioned (i.e., the sorts are virtually independent, as far as it's concerned). That's a bit messy, but it is still probably the way to go for merge joins and other sort-reliant executor nodes.

Rough notes on a design for partitioning

Peter Geoghegan hashed out one design for partitioning in an e-mail to pgsql-hackers [15]. This design would reuse some of the infrastructure that's part of the parallel CREATE INDEX proposal, but significant work remains for that to be possible.