Scaleout Design

From PostgreSQL wiki
Jump to navigationJump to search

This is a memo for discussion of PostgreSQL's scale-out design.


Introduction

Basic principles

  • Maximize the benefits of PostgreSQL: Make as many existing features and ecosystems available as possible, such as powerful SQL, ACID based on strict consistency, various index types, extensibility, and client drivers. That will differentiate the built-in scale-out from other scale-out DBMSs.
  • Value application transparency: Try to require little or no modification of the application's Logical data model, application logic, and query. For example, it's better to enable users to specify table distribution method with ALTER TABLE or a user-defined function, instead of forcing the use of special keywords in CREATE TABLE.
  • Learn: Because there are already various scale-out DBMSs out there, it's better to learn from them and adopt good points to aim for the best. Do not stick to the old mechanism. Of course, it's ideal to invent PostgreSQL-specific advantages.

<References>

Target workloads

<Questions>

  • Q1: What workloads do we want to handle?
  • Q2: Which workload do we want to focus on first?
  • OLTP (mainly multi-tenant OLTP custom applications)
  • analytics (with MPP parallelism)
  • Mixed (HTAP)

[Oracle]

  • Oracle Sharding is a scalability, availability and geo-distribution feature for OLTP applications that distributes and replicates data across a pool of discrete Oracle databases.

[MySQL Cluster]

  • MySQL Cluster has replication between clusters across multiple geographical sites built-in. A shared nothing architecture with data locality awareness make it the perfect choice for running on commodity hardware and in globally distributed cloud infrastructure.
  • DSS Applications (data marts, analytics): Limited (Join operations across OLTP datasets not exceeding 3TB in size)
  • Packaged Applications: Limited (should be mostly primary key access)

[CockroachDB]

  • CockroachDB returns single-row reads in 2ms or less and single-row writes in 4ms or less, and supports a variety of SQL and operational tuning practices for optimizing query performance. However, CockroachDB is not yet suitable for heavy analytics / OLAP.

[YugabyteDB]

  • YugabyteDB is a high-performance, cloud-native distributed SQL database that aims to support all PostgreSQL features. It is best fit for cloud-native OLTP (i.e. real-time, business critical) applications that need absolute data correctness and require at least one of the following: scalability, high tolerance to failures, globally-distributed deployments. YugabyteDB is not a good fit for traditional Online Analytical Processing (OLAP) use cases that need complete ad-hoc analytics.

[Greenplum]

  • Greenplum Database is a massively parallel processing (MPP) database server with an architecture specially designed to manage large-scale analytic data warehouses and business intelligence workloads.


Distributed architecture

<Questions>

  • Q1: Which scale-out architecture(s) do we want to adopt? If we want multiple architectures, what's the priority?

Based on the PGCon 2018 developer unconference and other past discussions, we'd like to adopt the shared nothing architecture first. This does not mean to exclude the possibility of incorporating the shared disk architecture. Oracle Database and IBM Db2 provide both shared nothing and shared disk architectures as separate features. It should not be bad for PostgreSQL as well to have both.

  • Shared nothing
  • Shared disk
  • New architecture based on separation of compute and storage: e.g., Amazon Aurora has an intelligent multi-tenant scale-out storage service behind the database server, and pushes redo processing to the storage service and eliminate the database page writes including checkpoints by the database server.

<References>

Other DBMSs to learn from

The following DBMSs are referenced in the design considerations. Much of their documentation is quoted on this wiki page.

<OLTP DBMSs>

<Analytic DBMSs>

<OLTP and analytic DBMSs>

The following DBMSs may be referenced (but not yet now) when we want to explore other ideas:

  • MemSQL
  • VoltDB
  • Vertica
  • Snowflake


Server roles

  • Minimize single point of contention.
  • Eliminate single point of failure.
  • Minimize the number of server roles to ease administration.
  • Some node needs to manage node information, system catalog, and sequences.

<Questions>

  • Q1: Should we have a central management node?
  • Q2: Do we allow clients to connect to any node and run any SQL statements?
  • Q3: Should we have a component that routes connections based on some information such as a sharding key, server load, and node attributes (primary/standby, proximity like region/AZ/rack)?

[Oracle]

  • Shards - independent physical Oracle databases that host a subset of the sharded database
  • Shard catalog - an Oracle Database that supports automated shard deployment, centralized management of a sharded database, and multi-shard queries
    • All DDLs in a sharded database are executed by connecting to the shard catalog.
    • The shard catalog also contains the master copy of all duplicated tables in a sharded database. The shard catalog uses materialized views to automatically replicate changes to duplicated tables in all shards.
    • The shard catalog database also acts as a query coordinator used to process multi-shard queries and queries that do not specify a sharding key.
  • Shard directors - network listeners that enable high performance connection routing based on a sharding key
  • Connection pools - at runtime, act as shard directors by routing database requests across pooled connections

[MySQL Cluster]

  • Management node - provides management services for the cluster as a whole, including startup, shutdown, backups, and configuration data for the other nodes.
  • Data node - stores and replicates data.
  • SQL node - is an instance of MySQL Server (mysqld) that accepts client connections and SQL statements.

[Spanner]

  • A Spanner deployment is called a universe.
  • Spanner is organized as a set of zones, where each zone is the rough analog of a deployment of Bigtable servers.
    • Zones are the unit of administrative deployment.
    • The set of zones is also the set of locations across which data can be replicated.
    • Zones can be added to or removed from a running system as new datacenters are brought into service and old ones are turned off, respectively.
    • Zones are also the unit of physical isolation: there may be one or more zones in a datacenter, for example, if different applications data must be partitioned across different sets of servers in the same datacenter.
  • A zone has one zonemaster and between one hundred and several thousand spanservers.
    • The zonemaster assigns data to spanservers.
    • the spanserver serve data to clients. At the bottom, each spanserver is responsible for between 100 and 1000 instances of a data structure called a tablet.
    • The per-zone location proxies are used by clients to locate the spanservers assigned to serve their data.
  • The universe master and the placement driver are currently singletons.
    • The universe master is primarily a console that displays status information about all the zones for interactive debugging.
    • The placement driver handles automated movement of data across zones on the timescale of minutes. The placement driver periodically communicates with the spanservers to find data that needs to be moved, either to meet updated replication constraints or to balance load.

[CockroachDB]

  • All nodes behave symmetrically, so developers can send requests to any node (which means CockroachDB works well with load balancers).
  • Whichever node receives the request acts as the "gateway node," as other layers process the request.
  • If a node receives a read or write request it cannot directly serve, it simply finds the node that can handle the request, and communicates with it.

[YugabyteDB]

  • A YugabyteDB cluster consists of two distributed services - the YB-TServer service and the YB-Master service. Since the YB-Master service serves the role of the cluster metadata manager, it should be brought up first followed by the YB-TServer service.
  • YB-Master
    • coordinates universe-wide admin operations
      • handles DDL statements such as CREATE TABLE, ALTER TABLE, and DROP TABLE requests
      • creating a backup of a table
      • The YB-Master performs these operations with a guarantee that the operation is propagated to all tablets irrespective of the state of the YB-TServers hosting these tablets. This is essential because a YB-TServer failure while one of these universe-wide operations is in progress cannot affect the outcome of the operation by failing to apply it on some tablets.
    • stores system metadata
      • Each YB-Master stores system metadata, including information about namespaces, tables, roles, permissions, and assignments of tablets to YB-TServers.
      • These system records are replicated across the YB-Masters for redundancy using Raft as well.
    • is an authoritative source of tablet assignments to YB-TServers
      • The YB-Master stores all tablets and the corresponding YB-TServers that currently host them. This map of tablets to the hosting YB-TServers is queried by clients (such as the YugabyteDB query layer).
      • Applications using the YugabyteDB smart clients are efficient in retrieving data. The smart clients query the YB-Master for the tablet to YB-TServer map and cache it. By doing so, the smart clients can talk directly to the correct YB-TServer to serve various queries without incurring additional network hops.
    • coordinates background operations
      • data placement and load balancing: The YB-Master leader does the initial placement (at CREATE TABLE time) of tablets across YB-TServers to enforce any user-defined data placement constraints and ensure uniform load. In addition, during the lifetime of the universe, as nodes are added, fail or decommissioned, it continues to balance the load and enforce data placement constraints automatically.
      • leader balancing: Aside from ensuring that the number of tablets served by each YB-TServer is balanced across the universe, the YB-Masters also ensures that each node has a symmetric number of tablet-peer leaders across eligible nodes.
      • re-replication of data on extended YB-TServer failure: The YB-Master receives heartbeats from all the YB-TServers, and tracks their liveness. It detects if any YB-TServers has failed, and keeps track of the time interval for which the YB-TServer remains in a failed state. If the time duration of the failure extends beyond a threshold, it finds replacement YB-TServers to which the tablet data of the failed YB-TServer is re-replicated. Re-replication is initiated in a throttled fashion by the YB-Master leader so as to not impact the foreground operations of the universe.
  • YB-TServer
    • From the application perspective this is a stateless layer and the clients can connect to any (one or more) of the YB-TServers on the appropriate port to perform operations against the YugabyteDB cluster.
    • handles DML statements.
    • performs the actual I/O for end-user requests.

[Azure Synapse]

  • Compute is separate from storage, which enables you to scale compute independently of the data in your system.
  • Control node - Applications connect and issue T-SQL commands to a Control node, which is the single point of entry for Synapse SQL. The Control node runs the MPP engine, which optimizes queries for parallel processing, and then passes operations to Compute nodes to do their work in parallel.
  • Compute node - The Compute nodes store all user data in Azure Storage and run the parallel queries. The number of compute nodes ranges from 1 to 60, and is determined by the service level for Synapse SQL.

[Greenplum]

  • master - The master is the entry point to the Greenplum Database system. It is the database server process that accepts client connections and processes the SQL commands that system users issue.
    • Users connect to Greenplum Database through the master using a PostgreSQL-compatible client program such as psql or ODBC.
    • The master coordinates the workload across the other database instances in the system, called segments, which handle data processing and storage.
    • The master maintains the system catalog (a set of system tables that contain metadata about the Greenplum Database system itself), however the master does not contain any user data. Data resides only on the segments.
  • segment - the segments are where data is stored and where most query processing occurs. User-defined tables and their indexes are distributed across the available segments; each segment contains a distinct portion of the data.
    • Segment instances are the database server processes that serve segments.
    • The segments cannot accept client connections.
    • The segments communicate with each other and the master over the interconnect.

[Citus]

  • Every cluster has one special node called the coordinator, and the others are known as workers or data nodes.
    • Applications send their queries to the coordinator node which relays it to the relevant workers and accumulates the results.
    • Applications are not able to connect directly to workers.
    • The coordinator is the authoritative source of metadata for the cluster and data nodes store the actual data in shards.
  • coordinator
    • For each query on distributed tables, the coordinator either routes it to a single worker node, or parallelizes it across several depending on whether the required data lives on a single node or multiple. The coordinator decides what to do by consulting metadata tables. These tables track the DNS names and health of worker nodes, and the distribution of data across nodes.
    • The Citus coordinator only stores metadata about the table shards and does not store any data. This means that all the computation is pushed down to the workers and the coordinator does only final aggregations on the result of the workers. Therefore, it is not very likely that the coordinator becomes a bottleneck for read performance. Also, it is easy to boost up the coordinator by shifting to a more powerful machine.
    • However, in some write heavy use cases where the coordinator becomes a performance bottleneck, users can add another coordinator. As the metadata tables are small (typically a few MBs in size), it is possible to copy over the metadata onto another node and sync it regularly. Once this is done, users can send their queries to any coordinator and scale out performance.
  • Citus MX
    • is a new version of Citus that adds the ability to use hash-distributed tables from any node in a Citus cluster, which allows you to scale out your query throughput by opening many connections across all the nodes.
    • This is particularly useful for performing small reads and writes at a very high rate in a way that scales horizontally.
    • Distributed tables can only be created, altered, or dropped via the coordinator, but can be queried from any node.
    • When making changes to a table (e.g. adding a column), the metadata for the distributed tables is propagated to the workers using PostgreSQL’s built-in 2PC mechanism and distributed locks.
    • You can access your database in one of two ways:
      • through the coordinator which allows you to create or change distributed tables.
      • via the data URL, which routes you to one of the data nodes on which you can perform regular queries on the distributed tables.
    • Supported operations on the coordinator are: Create/drop distributed table, shard rebalancer, DDL, DML, SELECT, COPY.
    • Supported operations on the data URL are: DML, SELECT, COPY.
    • If you connect to the data URL using psql and run \d, then you will see all the distributed tables.
    • When performing a query on a distributed table, the right shard is determined based on the filter conditions and the query is forwarded to the node that stores the shard.
    • If a query spans all the shards, it is parallelised across all the nodes.
    • Distributed tables are the same from all nodes, so it does not matter to which node you are routed when using the data URL when querying distributed tables.

[Postgres-XL]

  • is composed of three major components:
    1. GTM (Global Transaction Manager)
      • provides consistent transaction management and tuple visibility control.
    2. Coordinator
      • is an interface to the database for applications. It acts like a conventional PostgreSQL backend process.
      • does not store any actual data.
      • receives SQL statements, gets Global Transaction Id and Global Snapshots as needed, determines which Datanodes are involved and asks them to execute (a part of) statement. When issuing statement to Datanodes, it is associated with GXID and Global Snapshot so that Multi-version Concurrency Control (MVCC) properties extend cluster-wide.
    3. Datanode
      • actually stores user data. Tables may be distributed among Datanodes, or replicated to all the Datanodes.
      • does not have a global view of the whole database, it just takes care of locally stored data.

Cluster membership

<Questions>

  • Q1: Where do we manage the membership of cluster nodes?
    • Text file
    • Binary file
    • System catalog
    • External storage such as etcd and ZooKeeper

System catalog management and data definition

<Questions>

  • Q1: Where do we store system catalog, on a single central node or on all nodes? The system catalog contains per-database objects and global objects.
  • Q2: If a single central node stores the system catalog, how do the other nodes access it efficiently?
    • Option 1: Always fetch accessed data from the central node and use it in one transaction.
    • Option 2: Fetch accessed data from the central node and cache it on each node.
  • Q3: If all nodes store the system catalog, how do they synchronize the catalog data? If we replicate catalog on an SQL statement basis, Oid values for the same schema object differ on each node. Could it be inconvenient?
    • Option 1: DDL statements are executed on all nodes, synchronizing data with 2PC. The DDL execution fails if any node is down.
    • Option 2: Applications can only run DDL on one central node. The central node records the DDLs to track which DDLs each node has executed, and pushes them to other nodes.

[Oracle]

  • To create a schema in a sharded database, you must issue DDL commands on the shard catalog database, which validates the DDLs and executes them locally, prior to their execution on the shards. Therefore, the shard catalog database contains local copies of all of the objects that exist in the sharded database, and serves as the master copy of the sharded database schema.
  • If the catalog validation and execution of DDLs are successful, the DDLs are automatically propagated to all of the shards and applied in the order in which they were issued on the shard catalog.
  • If a shard is down or not accessible during DDL propagation, the catalog keeps track of DDLs that could not be applied to the shard, and then applies them when the shard is back up.
  • When a new shard is added to a sharded database, all of the DDLs that have been executed in the SDB are applied in the same order to the shard before it becomes accessible to clients.
  • When connecting to the shard catalog using SQL*Plus, two types of objects can be created: SDB objects and local objects. Local objects are traditional objects that exist only in the shard catalog. Local objects can be used for administrative purposes, or they can be used by multi-shard queries originated from the catalog database, to generate and store a report, for example.
  • The type of object (SDB or local) that is created in a SQL*Plus session depends on whether the SHARD DDL mode is enabled in the session.
    • To create a local object, the SDB user must first run alter session disable shard ddl. All of the objects created while SHARD DDL is disabled are local objects.
    • All of the objects created while SHARD DDL is enabled in a session are SDB objects. To enable SHARD DDL in the session, the SDB user must run alter session enable shard ddl.

[MySQL Cluster]

  • Schema operations (DDL statements) are rejected while any data node restarts.
  • Schema operations are also not supported while performing an online upgrade or downgrade.

[YugabyteDB]

  • Each YB-Master stores system metadata, including information about namespaces, tables, roles, permissions, and assignments of tablets to YB-TServers.

[Azure Synapse]

  • No support for DDL such as CREATE TABLE inside a user-defined transaction

[Greenplum]

  • The master maintains the system catalog (a set of system tables that contain metadata about the Greenplum Database system itself).

[Citus]

  • The coordinator is the authoritative source of metadata for the cluster.
  • To run Citus on a new database, you’ll need to create the database on the coordinator and workers, create the Citus extension within that database, and register the workers in the coordinator database.
  • DDL commands can be run from the coordinator node only.
  • Citus propagates schema changes from the coordinator node to the workers using a two-phase commit protocol.
  • Some DDL statements require manual propagation, and certain others are prohibited such as those which would modify a distribution column.
  • Certain commands, when run on the coordinator node, do not get propagated to the workers:
    • CREATE ROLE/USER (gets propagated in Citus Enterprise)
    • CREATE DATABASE
    • ALTER SET SCHEMA
    • ALTER TABLE ALL IN TABLESPACE
    • CREATE FUNCTION (use create_distributed_function)
    • CREATE TABLE
  • For the other types of objects above, create them explicitly on all nodes. Citus provides a function to execute queries across all workers:
    • SELECT run_command_on_workers($cmd$ CREATE ROLE ...; $cmd$);
  • The advantage of automatic propagation is that Citus will automatically create a copy on any newly added worker nodes.
    • The citus.pg_dist_object table contains a list of objects such as types and functions that have been created on the coordinator node and propagated to worker nodes.
    • When an administrator adds new worker nodes to the cluster, Citus automatically creates copies of the distributed objects on the new nodes (in the correct order to satisfy object dependencies).

[Postgres-XL]

  • Every coordinator and datanode stores the system catalog.
  • Propagates DDL execution to other nodes except for node management statements: CREATE/ALTER/DROP NODE and CREATE/DROP NODE GROUP.
  • The OID for the same schema object is different on each node.

Inter-node communication

<Questions>

  • Q1: Do we support IPv6?
  • Q2: How do we implement connection pooling to reduce the number of connections and the overhead of connection establishment?
  • Q3: How do we propagate session settings to remote nodes?
  • Q4: What should we care about for performant communication, e.g., socket buffer, jumbo frame, message format (text/binary), etc.

[MySQL Cluster]

  • IPv6 is supported for connections between SQL nodes (MySQL servers), but connections between all other types of NDB Cluster nodes must use IPv4. In practical terms, this means that you can use IPv6 for replication between NDB Clusters, but connections between nodes in the same NDB Cluster must use IPv4.

[Greenplum]

  • By default, Greenplum Database interconnect uses UDP with flow control for interconnect traffic to send messages over the network. The Greenplum software does the additional packet verification and checking not performed by UDP, so the reliability is equivalent to TCP and the performance and scalability exceeds that of TCP.
  • Connection pooling
    • You can configure a Greenplum system to use proxies for interconnect communication to reduce the use of connections and ports during query processing.
    • In general, when running a query, a QD (query dispatcher) on the Greenplum master creates connections to one or more QE (query executor) processes on segments, and a QE can create connections to other QEs. By default, connections between the QD on the master and QEs on segment instances and between QEs on different segment instances require a separate network port. You can configure a Greenplum system to use proxies when Greenplum communicates between the QD and QEs and between QEs on different segment instances.
    • The interconnect proxies require only one network connection for Greenplum internal communication between two segment instances, so it consumes fewer connections and ports than TCP mode, has better performance than UDPIFC mode in a high-latency network.
    • For details, see Configuring Proxies for the Greenplum Interconnect.

[Citus]

  • Data transfer format
    • Citus by default transfers intermediate query data in the text format. This is generally better as text files typically have smaller sizes than the binary representation. Hence, this leads to lower network and disk I/O while writing and transferring intermediate data.
    • However, for certain data types like hll or hstore arrays, the cost of serializing and deserializing data is pretty high. In such cases, using binary format for transferring intermediate data can improve query performance due to reduced CPU usage.
    • There are two configuration parameters which relate to the format in which intermediate data will be transferred across workers or between workers and the coordinator.
      • citus.binary_master_copy_format: uses binary format to transfer intermediate query results from the workers to the coordinator
      • citus.binary_worker_copy_format: is useful in queries which require dynamic shuffling of intermediate data between workers

[Postgres-XL]

  • Connection pooling
    • Postgres-XL is equipped with connection pooler between Coordinator and Datanode.
    • When a Coordinator backend requires connection to a Datanode, the pooler looks for appropriate connection from the pool. If there's an available one, the pooler assigns it to the Coordinator backend.
    • When the connection is no longer needed, the Coordinator backend returns the connection to the pooler. The pooler does not disconnect the connection. It keeps the connection to the pool for later reuse, keeping Datanode backend running.

FDW or non-FDW

<Questions>

  • Q1: Do we use FDW as the infrastructure for scale-out?
  • Q2: Is there anything we cannot achieve if we adopt FDW? If yes, is it really worth developing built-in scale-out with the shackle?

Citus Data did not adopt FDW for Citus because of its architectural limitations.

  • cf. PostgreSQL, pg_shard, foreign data wrappers, and our technical learnings
  • "One important learning is that the foreign data wrapper APIs can’t push down the count(distinct) computation, even if they get full cooperation from the planner. Both the planner and executor APIs would need to fundamentally change to support parallelizing this computation. The same challenge exists for many other queries."
  • "This limitation was one of the four reasons we decided to revert from using FDWs in pg_shard. Even with major improvements to the FDW APIs, we still wouldn't be able to scale out many complex SELECT queries."
  • Four technical issues with FDW
    1. Complex SELECT queries can’t be parallelized even after significant changes to FDW APIs
    2. UPDATE and DELETE operations are performed by first fetching records from the table scanning functions, and then going over the fetched records. If the user wanted to update a single row, this involved first pulling rows and then updating related records. This limitation isn't as fundamental as the one for SELECTs, but it's still there.
    3. Our options to provide high availability (HA) characteristics became fairly limited. The FDW APIs were designed to read data from one remote data source. If we wanted to replicate data through any means other than streaming replication, or failover to another machine midway through a query, we'd end up writing logic to open connections and start new scans in the middle of a function that was supposed to read the next record.
    4. When the user creates a distributed table, propagating DDL commands for each partition. The issue here was that regular and foreign PostgreSQL tables don't support the same features. For example, you can't create an index on a foreign table today. So, when we used a foreign table to mimic a distributed regular table, how do you allow and propagate CREATE INDEX commands? We ended up inferring and creating two tables behind the covers and swapping between them for this.

<Current limitations and potential barriers of FDW>

  • Some distributed query optimization is impossible.
    • In the FDW world, one local node unilaterally accesses remote nodes. Remote nodes cannot access one another.
    • Some distributed queries require direct communication between remote nodes. Greenplum, Citus, and Postgres-XL do so for some joins.
  • Only support tables, not other objects.
  • No central user and privilege management.
  • No central sequences.
  • it is currently not allowed to create the foreign table as a partition of the parent table if there are UNIQUE indexes on the parent table.* Lacks support for INSERT statements with an ON CONFLICT DO UPDATE clause.
  • INSERT with an ON CONFLICT clause does not support specifying the conflict target, as unique constraints or exclusion constraints on remote tables are not locally known. This in turn implies that ON CONFLICT DO UPDATE is not supported, since the specification is mandatory there.
  • postgres_fdw currently lacks support for INSERT statements with an ON CONFLICT DO UPDATE clause. However, the ON CONFLICT DO NOTHING clause is supported, provided a unique index inference specification is omitted.
  • Oid and ctid values are different on each node.
    • psqlODBC's cursor update/delete uses either Oid or primary key. If they are not available, it resorts to ctid.
  • While rows can be moved from local partitions to a foreign-table partition (provided the foreign data wrapper supports tuple routing), they cannot be moved from a foreign-table partition to another partition.
  • Scrollable cursor across multiple nodes is OK?
  • Session variables are not propagated to remote nodes.
  • libpq overhead: conversion of data between text and binary formats.
  • Only IMMUTABLE built-in data types, operators, and functions are executed on remote servers.
  • Rechecking join conditions in READ COMMITTED mode
    • An additional consideration is that in READ COMMITTED isolation mode, PostgreSQL may need to re-check restriction and join conditions against an updated version of some target tuple. Rechecking join conditions requires re-obtaining copies of the non-target rows that were previously joined to the target tuple.
    • When working with standard PostgreSQL tables, this is done by including the TIDs of the non-target tables in the column list projected through the join, and then re-fetching non-target rows when required. This approach keeps the join data set compact, but it requires inexpensive re-fetch capability, as well as a TID that can uniquely identify the row version to be re-fetched.
    • By default, therefore, the approach used with foreign tables is to include a copy of the entire row fetched from a foreign table in the column list projected through the join. This puts no special demands on the FDW but can result in reduced performance of merge and hash joins. An FDW that is capable of meeting the re-fetch requirements can choose to do it the first way.
  • postgres_fdw limitations
    • Transactions on remote servers use REPEATABLE READ even when the local transaction uses READ COMMITTED.
    • Lacks support for INSERT statements with an ON CONFLICT DO UPDATE clause. However, the ON CONFLICT DO NOTHING clause is supported, provided a unique index inference specification is omitted.
    • postgres_fdw supports row movement invoked by UPDATE statements executed on partitioned tables, but it currently does not handle the case where a remote partition chosen to insert a moved row into is also an UPDATE target partition that will be updated later.
    • Users cannot set fallback_application_name (always set to postgres_fdw)
    • To reduce the risk of misexecution of queries, WHERE clauses are not sent to the remote server unless they use only data types, operators, and functions that are built-in or belong to an extension that's listed in the foreign server's extensions option. Operators and functions in such clauses must be IMMUTABLE as well.
    • In the remote sessions opened by postgres_fdw, the search_path parameter is set to just pg_catalog, so that only built-in objects are visible without schema qualification.

[MySQL Cluster]

  • NDB Cluster requires communication between data nodes and API nodes (including SQL nodes), as well as between data nodes and other data nodes, to execute queries and updates.

[Greenplum]

  • To perform a join, matching rows must be located together on the same segment. If data is not distributed on the same join column, the rows needed from one of the tables are dynamically redistributed to the other segments. In some cases a broadcast motion, in which each segment sends its individual rows to all other segments, is performed rather than a redistribution motion, where each segment rehashes the data and sends the rows to the appropriate segments according to the hash key.
  • In addition to common database operations such as table scans, joins, and so on, Greenplum Database has an additional operation type called motion. A motion operation involves moving tuples between the segments during query processing.
  • For example, consider the following simple query involving a join between two tables:

SELECT customer, amount
FROM sales JOIN customer USING (cust_id)
WHERE dateCol = '04-30-2016';

  • The query plan for this example has a redistribute motion that moves tuples between the segments to complete the join. The redistribute motion is necessary because the customer table is distributed across the segments by cust_id, but the sales table is distributed across the segments by sale_id.
  • To perform the join, the sales tuples must be redistributed by cust_id. The plan is sliced on either side of the redistribute motion, creating slice 1 and slice 2.

[Citus]

  • Sometimes workers need to connect to one another, such as during repartition joins.
  • The INSERT..SELECT command can now shuffle the data between the workers, rather than pulling the data to the coordinator. This change means that INSERT..SELECT can be up to 5x faster. The ability to do INSERT..SELECT with re-partitioning means users are no longer bound to a single distribution column in order to scale a data processing pipelining in Citus, which enables more advanced real-time analytics scenarios and data processing pipelines inside your Citus database.
  • The task tracker executor is designed to efficiently handle complex queries which require repartitioning and shuffling intermediate data among workers.

[Postgres-XL]

  • shared_queues (integer)
    • A parameter for Datanode only
    • For some joins that occur in queries, data from one Datanode may need to be joined with data from another Datanode. Postgres-XL uses shared queues for this purpose. During execution each Datanode knows if it needs to produce or consume tuples, or both.

Scalability and performance

Goal

  • The performance of transactions which only access one node in the cluster is almost the same as that of transactions which access a non-clustered node.
  • Linear scalability of transactions per second, with response time staying constant as new shards are added to support larger workload and/or data volume.
  • A single logical database can be geo-distributed for read scalability and low latency as well as for availability and security
    • Allows to store particular data close to its consumers

<Questions>

  • Q1: How much data do we aim for handling?
  • Q2: What's the maximum number of nodes do we accommodate in a cluster?

[Oracle]

  • Maximum 1,000 database nodes

[MySQL Cluster]

  • Storage Limits: 128 TB
  • Number of nodes
    • Maximum is 255. This number includes all SQL nodes (MySQL Servers), API nodes (applications accessing the cluster other than MySQL servers), data nodes, and management servers.
    • The maximum number of data nodes is 145.
    • The maximum number of node groups is 48.
  • Number of database objects
    • The maximum number of all NDB database objects in a single NDB Cluster - including databases, tables, and indexes - is limited to 20320.

[Spanner]

  • Spanner is designed to scale up to millions of machines across hundreds of datacenters and trillions of database rows.

[CockroachDB]

  • CockroachDB has no theoretical limitations to scaling, throughput, latency, or concurrency other than the speed of light. In practice, can achieve near-linear performance at 256 nodes with TPC-C.
  • Returns single-row reads in 1 ms and processes single-row writes in 2 ms within a single availability zone.

[Greenplum]

  • Data size: hundreds of terabytes

[Citus]

  • Data size of largest customer: Heap: 1.4PB of data on a 70-node Citus database cluster

Data sharding and placement

  • Provide both automatic and manual data placement methods.
    • Automatic: Postgres determines which records to place on which nodes based on the node count, data size, node load.
    • Manual: The user controls which records to place on which nodes based on the user proximity and security requirements such as GDPR

<Questions>

  • Q1: What's the unit of shards? How is it related to traditional partitions?
    • Option 1: Partition and subpartition defined by the user
    • Option 2: Range, split, or chunk that is automatically divided by Postgres
  • Q2: What sharding methods do we adopt?
    • Option 1: Distribute by consistent hash
    • Option 2: Distribute by range
    • Option 3: Distribute by list: geographic distribution
    • Option 4: Composite distribution: combine list/range and hash, e.g. list-partition by country to distribute data to nearby regions, and then hash-subpartition data across multiple nodes in each region
    • Option 5: Distribute by replication: all nodes hold the same copy of a table, e.g. master data like the product catalog
  • Q3: What's the requirement of the sharding key?
  • Q4: What kind of sharding metadata do we store in the system catalog?
  • Q5: How does the user create a distributed table?

[Oracle]

  • Supports sharded, duplicated, and local tables.
  • System-managed sharding
    • Does not require you to map data to shards. The data is automatically distributed across shards using partitioning by consistent hash. The partitioning algorithm uniformly and randomly distributes data across shards.
    • The distribution used in system-managed sharding is intended to eliminate hot spots and provide uniform performance across shards. Oracle Sharding automatically maintains the balanced distribution of chunks when shards are added to or removed from a sharded database.
  • User-defined sharding
    • Lets you explicitly specify the mapping of data to individual shards. It is used when, because of performance, regulatory, or other reasons, certain data needs to be stored on a particular shard, and the administrator needs to have full control over moving data between shards.
    • Another advantage of user-defined sharding is that, in case of planned or unplanned outage of a shard, the user knows exactly what data is not available.
    • The disadvantage of user-defined sharding is the need for the database administrator to monitor and maintain balanced distribution of data and workload across shards.
    • With user-defined sharding, a sharded table can be partitioned by range or list. The CREATE TABLE syntax for a sharded table is not very different from the syntax for a regular table, except for the requirement that each partition should be stored in a separate tablespace.

CREATE SHARDED TABLE accounts
( id             NUMBER
, account_number NUMBER
, customer_id    NUMBER
, branch_id      NUMBER
, state          VARCHAR(2) NOT NULL
, status         VARCHAR2(1)
)
PARTITION BY LIST (state)
( PARTITION p_northwest VALUES ('OR', 'WA') TABLESPACE ts1
, PARTITION p_southwest VALUES ('AZ', 'UT', 'NM') TABLESPACE ts2
, PARTITION p_northcentral VALUES ('SD', 'WI') TABLESPACE ts3
, PARTITION p_southcentral VALUES ('OK', 'TX') TABLESPACE ts4
, PARTITION p_northeast VALUES ('NY', 'VM', 'NJ') TABLESPACE ts5
, PARTITION p_southeast VALUES ('FL', 'GA') TABLESPACE ts6
);
CREATE TABLESPACE tbs1 IN SHARDSPACE west;
CREATE TABLESPACE tbs2 IN SHARDSPACE west;

  • Composite sharding
    • Allows you to use two levels of sharding. First the data is sharded by range or list and then it is sharded further by consistent hash.
    • In many use cases, especially for data sovereignty and data proximity requirements, the composite sharding method offers the best of both system-managed and user-defined sharding methods, giving you the automation you want and the control over data placement you need.
  • Shard storage
    • Each partition of a sharded table resides in a separate tablespace, and each tablespace is associated with a specific shard.
    • Depending on the sharding method, the association can be established automatically or defined by the administrator.
    • The number of chunks in a sharded database with system-managed sharding can be specified when the shard catalog is created. If not specified, the default value, 120 chunks per shard, is used. Once a sharded database is deployed, the number of chunks can only be changed by splitting chunks.
    • Oracle Sharding creates and manages tablespaces as a unit called a tablespace set. The PARTITIONS AUTO clause specifies that the number of partitions should be automatically determined. This type of hashing provides more flexibility and efficiency in migrating data between shards, which is important for elastic scalability.

CREATE SHARDED TABLE Customers 
( CustNo      NUMBER NOT NULL
, Name        VARCHAR2(50)
 , Address     VARCHAR2(250) 
, CONSTRAINT RootPK PRIMARY KEY(CustNo)
)
PARTITION BY CONSISTENT HASH (CustNo)
PARTITIONS AUTO
TABLESPACE SET ts1;
CREATE SHARDED TABLE Orders 
( OrderNo   NUMBER NOT NULL
, CustNo    NUMBER NOT NULL
, OrderDate DATE
, CONSTRAINT OrderPK PRIMARY KEY (CustNo, OrderNo)
, CONSTRAINT CustFK  FOREIGN KEY (CustNo) REFERENCES Customers(CustNo) 
)
PARTITION BY REFERENCE (CustFK);

    • Before creating a sharded table partitioned by consistent hash, a set of tablespaces (one tablespace per chunk) has to be created to store the table partitions. The tablespaces are automatically created by executing the SQL statement, CREATE TABLESPACE SET.
      • CREATE TABLESPACE SET ts1;
    • PARTITIONS AUTO in this statement means that the number of partitions is automatically set to the number of tablespaces in the tablespace set ts1 (which is equal to the number of chunks) and each partition will be stored in a separate tablespace.
  • Shard key
    • The following data types are supported for the sharding key:
      • NUMBER, INTEGER, SMALLINT
      • RAW
      • (N)VARCHAR, (N)VARCHAR2, (N)CHAR
      • DATE, TIMESTAMP

[MySQL Cluster]

  • It is possible to create tables using other storage engines (such as InnoDB or MyISAM) on a MySQL server being used with NDB Cluster, but since these tables do not use NDB, they do not participate in clustering; each such table is strictly local to the individual MySQL server instance on which it is created.
  • Partition. This is a portion of the data stored by the cluster. Each node is responsible for keeping at least one copy of any partitions assigned to it (that is, at least one replica) available to the cluster.
  • The number of partitions used by default by NDB Cluster depends on the number of data nodes and the number of LDM threads in use by the data nodes, as shown here:
    • [# of partitions] = [# of data nodes] * [# of LDM threads]

[Spanner]

  • Cloud Spanner optimizes performance by automatically sharding the data based on request load and size of the data. As a result, you can spend less time worrying about how to scale your database, and instead focus on scaling your business.

[CockroachDB]

  • CockroachDB is the only database in the world that enables you to attach to your data at the row level. This capability allows you to regulate the distance between your users and their data.
    • Ensure low latency to end-users wherever they are.
    • Pin data to a locale to comply with data protection laws.
    • Tie specific data to specific clouds or datacenters.

[YugabyteDB]

  • Sharding method
    • User tables are implicitly managed as multiple shards by DocDB. These shards are referred to as tablets. The primary key for each row in the table uniquely determines the tablet the row lives in.
    • YugabyteDB currently supports two ways of sharding data - hash (aka consistent hash) sharding and range sharding.
    • The hash space for hash-sharded YugabyteDB tables is the 2-byte range from 0x0000 to 0xFFFF. Such a table may therefore have at most 64K tablets.

CREATE TABLE customers (
    customer_id bpchar NOT NULL,
    company_name character varying(40) NOT NULL,
    ...
    PRIMARY KEY (customer_id HASH)
);

    • With consistent hash sharding, there are many more shards than the number of nodes and there is an explicit mapping table maintained tracking the assignment of shards to nodes. When adding new nodes, a subset of shards from existing nodes can be efficiently moved into the new nodes without requiring a massive data reassignment.
    • Range sharding involves splitting the rows of a table into contiguous ranges that respect the sort order of the table based on the primary key column values. The tables that are range sharded usually start out with a single shard. As data is inserted into the table, it is dynamically split into multiple shards because it is not always possible to know the distribution of keys in the table ahead of time.

CREATE TABLE order_details (
    order_id smallint NOT NULL,
    product_id smallint NOT NULL,
    unit_price real NOT NULL,
    quantity smallint NOT NULL,
    discount real NOT NULL,
    PRIMARY KEY (order_id ASC, product_id),
    FOREIGN KEY (product_id) REFERENCES products,
    FOREIGN KEY (order_id) REFERENCES orders
);

  • Tablet splitting
    • DocDB allows data resharding by splitting tablets using the following three mechanisms:
      1. Presplitting tablets: All tables created in DocDB can be split into the desired number of tablets at creation time.
      2. Manual tablet splitting: The tablets in a running cluster can be split manually at runtime by the user.
      3. Automatic tablet splitting: The tablets in a running cluster are automatically split according to some policy by the database.
    • Automatic tablet splitting enables resharding of data in a cluster automatically while online, and transparently to users, when a specified size threshold has been reached.
  • Shard key
    • There are two types of primary key columns:
      1. Hash primary key columns: The primary key may have zero or more leading hash-partitioned columns. By default, only the first column is treated as the hash-partition column. But this behavior can be modified by explicit use of the HASH annotation.
      2. Range primary key columns: A table can have zero or more range primary key columns and it controls the top-level ordering of rows within a table (if there are no hash partition columns) or the ordering of rows among rows that share a common set of hash partitioned column values. By default, the range primary key columns are stored in ascending order. But this behavior can be controlled by explicit use of ASC or DESC.
    • For example, if the primary key specification is PRIMARY KEY ((a, b) HASH, c DESC) then columns a & b are used together to hash partition the table, and rows that share the same values for a and b are stored in descending order of their value for c.
    • If the primary key specification is PRIMARY KEY(a, b), then column a is used to hash partition the table and rows that share the same value for a are stored in ascending order of their value for b.
  • Specifying the number of shards
    • For hash-sharded tables, you can use the SPLIT INTO clause to specify the number of tablets to be created for the table. The hash range is then evenly split across those tablets.
    • Presplitting tablets, using SPLIT INTO, distributes write and read workloads on a production cluster. For example, if you have 3 servers, splitting the table into 30 tablets can provide write throughput on the table.
    • By default, YugabyteDB presplits a table in ysql_num_shards_per_tserver * num_of_tserver shards. The SPLIT INTO clause can be used to override that setting on a per-table basis.
    • To specify the number of tablets for a table, you can use SPLIT INTO clause.
      • CREATE TABLE tracking (id int PRIMARY KEY) SPLIT INTO 10 TABLETS;

[Azure Synapse]

  • Synapse SQL leverages Azure Storage to keep your user data safe. The data is sharded into distributions to optimize the performance of the system. You can choose which sharding pattern to use to distribute the data when you define the table.
  • The supported sharding patterns are hash, round robin, and replicate.

[Greenplum]

  • All tables are distributed. When you create or alter a table, you optionally specify DISTRIBUTED BY (hash distribution), DISTRIBUTED RANDOMLY (round-robin distribution), or DISTRIBUTED REPLICATED (fully distributed) to determine the table row distribution.
  • The server configuration parameter gp_create_table_random_default_distribution controls the table distribution policy if the DISTRIBUTED BY clause is not specified when you create a table.
  • Shard key
    • Columns with geometric or user-defined data types are not eligible as distribution key columns. If a table does not have an eligible column, Greenplum Database distributes the rows randomly or in round-robin fashion.
    • The hash function used for hash distribution policy is defined by the hash operator class for the column's data type. As the default Greenplum Database uses the data type's default hash operator class, the same operator class used for hash joins and hash aggregates, which is suitable for most use cases. However, you can declare a non-default hash operator class in the DISTRIBUTED BY clause.
      • CREATE TABLE atab (a int) DISTRIBUTED BY (a abs_int_hash_ops);

[Citus]

  • Distributed, replicated, and local tables are available.
  • You can use the create_distributed_table() function to specify the table distribution column and create the worker shards.
    • SELECT create_distributed_table('github_events', 'repo_id');
    • This function informs Citus that the github_events table should be distributed on the repo_id column (by hashing the column value). The function also creates shards on the worker nodes using the citus.shard_count and citus.shard_replication_factor configuration values.
    • This example would create a total of citus.shard_count number of shards where each shard owns a portion of a hash token space and gets replicated based on the default citus.shard_replication_factor configuration value. The shard replicas created on the worker have the same table schema, index, and constraint definitions as the table on the coordinator. Once the replicas are created, this function saves all distributed metadata on the coordinator.
    • Each created shard is assigned a unique shard id and all its replicas have the same shard id. Each shard is represented on the worker node as a regular PostgreSQL table with name tablename_shardid where tablename is the name of the distributed table and shardid is the unique id assigned to that shard.
    • You can connect to the worker postgres instances to view or run commands on individual shards.
    • The number of shards is configurable per table at the time of its distribution across the cluster. Choosing the shard count for each distributed table is a balance between the flexibility of having more shards, and the overhead for query planning and execution across them. The optimal choice varies depending on your access patterns for the data.
      • For instance, in the Multi-Tenant Database use-case we recommend choosing between 32 - 128 shards. For smaller workloads say <100GB, you could start with 32 shards and for larger workloads you could choose 64 or 128. This means that you have the leeway to scale from 32 to 128 worker machines.
      • In the Real-Time Analytics use-case, shard count should be related to the total number of cores on the workers. To ensure maximum parallelism, you should create enough shards on each node such that there is at least one shard per CPU core. We typically recommend creating a high number of initial shards, e.g. 2x or 4x the number of current CPU cores. This allows for future scaling if you add more workers and CPU cores.
    • To choose a shard count for a table you wish to distribute, update the citus.shard_count variable. This affects subsequent calls to create_distributed_table. For example
      • SET citus.shard_count = 64;
    • However keep in mind that for each query Citus opens one database connection per shard, and these connections are limited.
  • Shard key
    • Citus requires that primary and foreign key constraints include the distribution column. This requirement makes enforcing these constraints much more efficient in a distributed environment as only a single node has to be checked to guarantee them.
    • You must choose a single column per table as the distribution column. A common scenario where people want to distribute by two columns is for timeseries data. However for this case we recommend using a hash distribution on a non-time column, and combining this with PostgreSQL partitioning on the time column.
  • Manual placement for tenant isolation
    • To improve resource allocation and make guarantees of tenant QoS it is worthwhile to move large tenants to dedicated nodes. Citus provides the tools to do this.

[Postgres-XL]

  • Distributed and replicated tables are available.
  • The Datanode for a particular row is determined based upon the value of the distribution column, chosen if DISTRIBUTE BY HASH was used.
  • If not used, the first column of a primary key or unique constraint is used if present in the CREATE TABLE clause.
  • If neither of those are present, the first column of a foreign key constraint is used, the idea being that child data can be col-located on the same node as the parent table.
  • If no such constraint is defined, Postgres-XL will choose the first reasonable column it can find, that is the first column that has a deterministically distributable data type. You may also choose another distribution method such as MODULO and ROUNDROBIN. To specify what column to choose as the distribution column and what value test to choose, you can do as follows:

CREATE TABLE disttab(
    col1 int, col2 int, col3 text
) DISTRIBUTE BY HASH(col1);

Replicated table

<Questions>

  • Q1: How does the user create a replicated table?
  • Q2: Where do we store the replicated table data? Where do we allow users to update it?
  • Q3: How is the replicated table replicated across nodes?
    • Logical replication
    • Statement-based replication and 2PC (Citus, Postgres-XL)
    • Materialized view-based replication (Oracle Sharding: the default lag is 60 seconds, which is configurable)

[Oracle]

  • A duplicated table is a non-sharded tables which are replicated to all shards.
  • Usually contain common reference data.
  • Can be read and updated on each shard.
  • Oracle Sharding synchronizes the contents of duplicated tables using Materialized View Replication. A duplicated table on each shard is represented by a materialized view. The master table for the materialized views is located in the shard catalog. The CREATE DUPLICATED TABLE statement automatically creates the master table, materialized views, and other objects required for materialized view replication.
  • You can update a duplicated table on a shard. The update is first propagated over a dblink from the shard to the master table on the shard catalog. Then the update is asynchronously propagated to all other shards as a result of a materialized view refresh.
  • The materialized views on all of the shards are automatically refreshed at a configurable frequency. The refresh frequency of all duplicated tables is controlled by the database initialization parameter SHRD_DUPL_TABLE_REFRESH_RATE. The default value for the parameter is 60 seconds.
  • A race condition is possible when a transaction run on a shard tries to update a row which was deleted on the shard catalog. In this case, an error is returned and the transaction on the shard is rolled back.

CREATE DUPLICATED TABLE Products 
( StockNo     NUMBER PRIMARY KEY
, Description VARCHAR2(20)
, Price       NUMBER(6,2)
);

[Azure Synapse]

  • A replicated table provides the fastest query performance for small tables.
    • A table that is replicated caches a full copy of the table on each compute node.
    • Consequently, replicating a table removes the need to transfer data among compute nodes before a join or aggregation.
    • Replicated tables are best utilized with small tables. Extra storage is required and there is additional overhead that is incurred when writing data, which make large tables impractical.

[Greenplum]

  • The primary use cases for replicated tables are to:
    • remove restrictions on operations that user-defined functions can perform on segments
    • improve query performance by making it unnecessary to broadcast frequently used tables to all segments
  • The hidden system columns (ctid, cmin, cmax, xmin, xmax, and gp_segment_id) cannot be referenced in user queries on replicated tables because they have no single, unambiguous value. Greenplum Database returns a column does not exist error for the query.

[Citus]

  • Reference table
    • Create a reference table that is replicated on all nodes like this:
      • SELECT create_reference_table('geo_ips');
    • In addition to distributing a table as a single replicated shard, the create_reference_table UDF marks it as a reference table in the Citus metadata tables.
    • Citus automatically performs two-phase commits (2PC) for modifications to tables marked this way, which provides strong consistency guarantees.

[Postgres-XL]

  • Create a replicated table like this:

CREATE TABLE repltab (
    col1 int, col2 int
) DISTRIBUTE BY REPLICATION;

  • In the case of reading replicated tables, the Coordinator can choose any Datanode to read. The most efficient way is to select one running in the same hardware or virtual machine. This is called preferred Datanode and can be specified by a GUC local to each Coordinator.
  • In the case of writing replicated tables, all the Coordinators choose the same Datanode to begin with to avoid update conflicts. This is called primary Datanode.

Partitioning

<Questions>

  • Q1: What's the relationship between the table distribution across nodes and partitioning?

[MySQL Cluster]

  • The only types of user-defined partitioning supported are KEY and LINEAR KEY.
  • The maximum number of partitions when employing user-defined partitioning is 8 per node group.

[Greenplum]

  • Greenplum Database divides tables into parts (also known as partitions) to enable massively parallel processing. Tables are partitioned during CREATE TABLE using the PARTITION BY (and optionally the SUBPARTITION BY) clause.
  • Greenplum Database supports:
    • range partitioning
    • list partitioning
    • A combination of both types
  • Partitioning does not change the physical distribution of table data across the segments.
    • Table distribution is physical: Greenplum Database physically divides partitioned tables and non-partitioned tables across segments to enable parallel query processing.
    • Table partitioning is logical: Greenplum Database logically divides big tables to improve query performance and facilitate data warehouse maintenance tasks, such as rolling old data out of the data warehouse.
  • Does not support partitioning replicated tables (DISTRIBUTED REPLICATED).
  • A primary key or unique constraint on a partitioned table must contain all the partitioning columns.
  • A unique index can omit the partitioning columns; however, it is enforced only on the parts of the partitioned table, not on the partitioned table as a whole.

Tablespace

<Questions>

  • Q1: How do we show the tablespace feature to users?

[Greenplum]

  • The file system location must exist on all hosts including the hosts running the master, standby master, each primary segment, and each mirror segment.

[Postgres-XL]

  • Assigns the same path to a tablespace for all the Coordinators and Datanodes. So when creating a tablespace, user needs to have permission to the same location path on all the servers involved in the cluster.

Data co-location

  • Enable users to co-locate related data that are accessed together on the same node for performance, and move them together to another node.

<Questions>

  • Q1: How do users specify data co-location?

[Oracle]

  • Provides a feature called table family.

[Spanner]

  • Cloud Spanner's table interleaving is a good choice for many parent-child relationships where the child table's primary key includes the parent table's primary key columns. The co-location of child rows with their parent rows can significantly improve performance.
    • For example, if you have a Customers table and an Invoices table, and your application frequently fetches all the invoices for a given customer, you can define Invoices as a child table of Customers. In doing so, you're declaring a data locality relationship between two logically independent tables: you're telling Cloud Spanner to physically store one or more rows of Invoices with one Customers row.
    • You should choose either to represent parent-child relationships as interleaved tables or foreign keys, but not both.
    • Cloud Spanner stores rows in sorted order by primary key values, with child rows inserted between parent rows that share the same primary key prefix. This insertion of child rows between parent rows along the primary key dimension is called interleaving, and child tables are also called interleaved tables.
    • If your application frequently needs to retrieve information about all the albums for a given singer, then you should create Albums as a child table of Singers, which co-locates rows from the two tables along the primary key dimension.
    • Interleaved rows are ordered first by rows of the parent table, then by contiguous rows of the child table that share the parent's primary key, i.e. "Singers(1)", then "Albums(1, 1)", then "Albums(1, 2)", and so on.

CREATE TABLE Singers (
  SingerId   INT64 NOT NULL,
  FirstName  STRING(1024),
  LastName   STRING(1024),
  SingerInfo BYTES(MAX),
) PRIMARY KEY (SingerId);
CREATE TABLE Albums (
  SingerId     INT64 NOT NULL,
  AlbumId      INT64 NOT NULL,
  AlbumTitle   STRING(MAX),
) PRIMARY KEY (SingerId, AlbumId),
  INTERLEAVE IN PARENT Singers ON DELETE CASCADE;

[CockroachDB]

  • When tables are interleaved, data written to one table (known as the child) is inserted directly into another (known as the parent) in the key-value store. This is accomplished by matching the child table's Primary Key to the parent's.
    • For interleaved tables to have Primary Keys that can be matched, the child table must use the parent table's entire Primary Key as a prefix of its own Primary Key -- these matching columns are referred to as the interleave prefix. It's easiest to think of these columns as representing the same data, which is usually implemented with Foreign Keys.
    • To formally enforce the relationship between each table's interleave prefix columns, we recommend using Foreign Key constraints.

CREATE TABLE orders (
    customer INT,
    id INT,
    total DECIMAL(20, 5),
    PRIMARY KEY (customer, id),
    CONSTRAINT fk_customer FOREIGN KEY (customer) REFERENCES customers
  ) INTERLEAVE IN PARENT customers (customer);

[YugabyteDB]

  • Colocating tables puts all of their data into a single tablet, called the colocation tablet. This can dramatically increase the number of relations (tables, indexes, etc) that can be supported per node while keeping the number of tablets per node low.
  [Citus]
  • Co-location is the practice of dividing data tactically, keeping related information on the same machines to enable efficient relational operations, while taking advantage of the horizontal scalability for the whole dataset.
    • The principle of data co-location is that all tables in the database have a common distribution column and are sharded across machines in the same way, such that rows with the same distribution column value are always on the same machine, even across different tables. As long as the distribution column provides a meaningful grouping of data, relational operations can be performed within the groups.
    • Tables are co-located in groups. To manually control a table’s co-location group assignment use the optional colocate_with parameter of create_distributed_table.
    • If you don’t care about a table’s co-location then omit this parameter. It defaults to the value 'default', which groups the table with any other default co-location table having the same distribution column type, shard count, and replication factor. If you want to break or update this implicit colocation, you can use update_distributed_table_colocation().

-- these tables are implicitly co-located by using the same
-- distribution column type and shard count with the default co-location group
SELECT create_distributed_table('A', 'some_int_col');
SELECT create_distributed_table('B', 'other_int_col');

    • When a new table is not related to others in its would-be implicit co-location group, specify colocated_with => 'none'.
    • Splitting unrelated tables into their own co-location groups will improve shard rebalancing performance, because shards in the same group have to be moved together.

-- not co-located with other tables
SELECT create_distributed_table('A', 'foo', colocate_with => 'none');

    • When tables are indeed related (for instance when they will be joined), it can make sense to explicitly co-locate them. The gains of appropriate co-location are more important than any rebalancing overhead.
    • To explicitly co-locate multiple tables, distribute one and then put the others into its co-location group. For example:

-- distribute stores
SELECT create_distributed_table('stores', 'store_id');
-- add to the same group as stores
SELECT create_distributed_table('orders', 'store_id', colocate_with => 'stores');
SELECT create_distributed_table('products', 'store_id', colocate_with => 'stores');

  • To ensure co-location, shards with the same hash range are always placed on the same node even after rebalance operations, such that equal distribution column values are always on the same node across tables.
  • INSERT SELECT
  • Supports the INSERT / SELECT syntax for copying the results of a query on a distributed table into a distributed table, when the tables are co-located.

Query planning, execution and tuning

Want to realize:

  • parallel query across nodes
  • shard-wise join where co-located shards of tables are joined on each remote node and the join results are sent back to the local node
  • optimal join execution by exchanging data directly between remote nodes, without the local node relaying the data
  • take the inter-node data transfer into the cost when planning queries

<Questions>

  • Q1: Where are the database statistics stored?
  • Q2: Should the whole query plan made on the local node and sent to the remote nodes, or the query fragment texts be sent and their query plans made on the remote nodes?
  • Q3: How do we represent the inter-node traffic cost?
  • Q4: What information does EXPLAIN present for distributed queries?

[Oracle]

  • Multi-shard queries and queries that do not specify a sharding key are routed through the multi-shard query coordinator which acts as a proxy for the application requesting the data. The multi-shard query coordinator (or, "the coordinator") runs on the shard catalog or its replicas.
  • Parallel DML is not supported by multi-shard DML. The DML will always run on one shard at a time (serially).
  • The MERGE statement is partially supported by Oracle Sharding, that is, a MERGE statement affecting only single shard is supported. ORA error is raised if a MERGE statement requires the modification of multiple shards.
  • Multi-shard DML does not support triggers.
  • The coordinator’s SQL compiler analyzes and rewrites the query into query fragments that are sent and executed by the participating shards. The queries are rewritten so that most of the query processing is done on the participating shards and then aggregated by the coordinator.
    • Database links are used for the communication between the coordinator and the shards.
    • At a high level, the coordinator rewrites each incoming query, Q, into a distributive form composed of two queries, CQ and SQ, where SQ (Shard Query) is the part of Q that executes on each participating shard, and CQ (Coordinator Query) is the part of Q that executes on the coordinator shard.
    • Q => CQ ( Shard_Iterator( SQ ) )
    • Each shard produces an independent execution plan which is optimized for the data size and compute resources available on the shard.
  • In the case where only one participating shard was identified, the full query is routed to that shard for execution. This is called a single-shard query.
  • Single-shard query supports SELECT, UPDATE, DELETE, INSERT, FOR UPDATE, and MERGE. UPSERT is not supported.
  • User-defined PL/SQL is allowed in multi-shard queries only in the SELECT clause. If it is specified in the WHERE clause then an error is thrown.
  • Queries involving only duplicated tables are run on the coordinator.
  • You do not need to connect to individual shards to see the explain plan for SQL fragments. Interfaces provided in dbms_xplan.display_cursor() display on the coordinator the plans for the SQL segments executed on the shards, and [V/X]$SHARD_SQL uniquely maps a shard SQL fragment of a multi-shard query to the target shard database.

[Spanner] Locality aware cost-based optimizer

  • Splits can move independently from each other and get assigned to different servers, which could be in different physical locations. To evaluate execution plans over the distributed data, Cloud Spanner uses execution based on:
    • local execution of subplans in servers that contain the data
    • orchestration and aggregation of multiple remote executions with aggressive distribution pruning
  • Cloud Spanner uses the primitive operator distributed union, along with its variants distributed cross apply and distributed outer apply, to enable this model.
  • A SQL query in Cloud Spanner is first compiled into an execution plan, then it is sent to an initial root server for execution. The root server is chosen so as to minimize the number of hops to reach the data being queried. The root server then:
    1. initiates remote execution of subplans (if necessary)
    2. waits for results from the remote executions
    3. handles any remaining local execution steps such as aggregating results
    4. returns results for the query
  • Remote servers that receive a subplan act as a "root" server for their subplan, following the same model as the topmost root server. The result is a tree of remote executions.
  • You can retrieve query execution plans through the Cloud Console or the client libraries.

[CockroachDB]

  • By default, CockroachDB generates table statistics automatically when tables are created and as they are updated. It does this using a background job that automatically gets statistics on
    • all indexed columns
    • up to 100 non-indexed columns
  • Schema changes trigger automatic statistics collection for the affected table(s).
  • Given multiple identical indexes that have different locality constraints using replication zones, the optimizer will prefer the index that is closest to the gateway node that is planning the query.
  • EXPLAIN shows the involved nodes where the query is executed. For example output, see EXPLAIN.

[Greenplum]

  • The statistics used by the optimizer are calculated and saved in the system catalog on the master host.
  • The master optimizes the query. The resulting query plan is either parallel or targeted.
    • The master dispatches parallel query plans to all segments.
    • The master dispatches targeted query plans to a single segment.
  • Most database operations - such as table scans, joins, aggregations, and sorts—execute across all segments in parallel.
  • A motion operation involves moving tuples between the segments during query processing. For example, a join between tables that are distributed by different columns performs a motion operation.
  • Related processes that are working on the same slice of the query plan but on different segments are called gangs. As a portion of work is completed, tuples flow up the query plan from one gang of processes to the next. This inter-process communication between the segments is referred to as the interconnect component.
  • For examples of EXPLAIN output, see Query Profiling.

[Citus]

  • Parallelizes incoming queries by breaking it into multiple fragment queries which run on the worker shards in parallel.
  • Employs a two stage optimizer when planning SQL queries.
    1. The first phase involves converting the SQL queries into their commutative and associative form so that they can be pushed down and run on the workers in parallel.
  • Co-located joins: When two tables are co-located then they can be joined efficiently on their common distribution columns. A co-located join is the most efficient way to join two large distributed tables.
  • Repartition joins: allows joining on non-distribution key columns by dynamically repartitioning the tables for the query. In such cases the table(s) to be partitioned are determined by the query optimizer on the basis of the distribution columns, join keys and sizes of the tables. With repartitioned tables, it can be ensured that only relevant shard pairs are joined with each other reducing the amount of data transferred across network drastically.
  • Provides a distributed EXPLAIN feature that performs the same function in the context of a distributed database.
    • SET citus.explain_all_tasks = 1;
    • This will cause EXPLAIN to show the query plan for all tasks, not just one.

[Postgres-XL]

  • A plan is generated once on a coordinator, and sent down to the individual data nodes. This is then executed, with the data nodes communicating directly with one another, where each understands from where it is expected to receive any tuples that it needs to ship, and where it needs to send to others.
  • Pulls up statistics from the Datanodes and stores it in each of the Coordinator's catalog table to help make query planning decisions.
  • Traditional PostgreSQL costing applies, but Postgres-XL also tries to take into consideration the cost of shipping rows around the network for internode joins, which is an expensive operation.
  • Stores table data in a distributed or replicated fashion. To leverage this, the planner tries to find the best way to use as much of a Datanode's power as possible.
    • If the equi-join is done by distribution columns and they share the distribution method (hash/modulo), the Coordinator can tell the Datanode to perform a join.
    • If not, it may tell the Datanodes to ship rows to other Datanodes and expect rows from other Datanodes to be shipped to it.
  • EXPLAIN provides the cluster-wide execution plan that will be executed, and provides insight into the inner workings of how queries are processed in parallel.


Availability

Goal

  • Have to consider both application availability and data availability.
  • Maximize fault isolation: failure of one node should only affect applications that access it.
  • Model the cluster configuration to fit the hierarchical region and availability zone (AZ) structure common in public clouds, and provide HA features to recover from a region or AZ failure.

<Questions>

  • Q1: What do we say about availability (say, 99.99% or 99.999% uptime)?

[MySQL Cluster]

  • 99.999%

[Spanner]

  • Multi-region configuration offers 99.999% availability, which is greater than the 99.99% availability that Cloud Spanner regional configurations provide.

Recovery from failures

<Questions>

  • Q1: Do we require or recommend clustering software like Pacemaker for fencing, STONITH, and failover coordination, or some group coordination software like JGroups, ZooKeeper, and etcd?
  • Q2: How does each node monitor which other nodes? (heartbeat, gossip, etc.)
  • Q3: How do we distinguish among a down node, a temporarily stopped node for planned maintenance, and a decommissioned node?
  • Q4: How do we ensure data availability within each node group?
    • Do we define a node group in which all member nodes have the same data set for redundancy?
    • Option 1: Rely on storage hardware or software to provide redundancy
    • Option 2: DBMS provides a master-slaves replication (physical or logical)
    • Option 3: DBMS provides a multi-master topology like MySQL Group Replication
  • Q5: What's the unit of replication?
    • Option 1: Shard (most granular; shards of a failed node can be distributed to multiple nodes, avoiding overload on one node)
    • Option 2: Tablespace
    • Option 3: Node (most coarse; the node that takes over can be overloaded)
  • Q6: What's the typical number of replicas do we assume? (recently 3, Amazon Aurora = 6)
  • Q7: How do we elect a primary/leader node within a node group when the current primary/leader node fails?
  • Q8: How does the failure of each node appears to applications?
    • Can we mask node outages from applications as much as possible?
    • How do we notify applications of node failures? (e.g., SQLSTATE that indicates the transaction is retryable)
    • Can we enable applications to escape from TCP timeouts?
    • How can we continue a session after the remote node fails over? (session variables, transactions, cursors)

[Oracle]

  • Using Oracle Data Guard for shard catalog high availability is a recommended best practice. The availability of the shard catalog has no impact on the availability of the sharded database. An outage of the shard catalog only affects the ability to perform maintenance operations or multi-shard queries during the brief period required to complete an automatic failover to a standby shard catalog. Transactions continue to be routed and executed by the sharded database and are unaffected by a catalog outage.
  • If a participating shard is down, then the coordinator sends the query to another shard with same data.
  • If failure happens during execution of the query on a participating shard, then the user will receive an error.
  • Data availability
    • For a user-defined sharded database, two replication schemes are supported: Oracle Data Guard or Oracle Active Data Guard. User-defined sharding is not supported where Oracle GoldenGate is used as the replication method.
    • Oracle Data Guard replication maintains one or more synchronized copies (standbys) of a shard (the primary) for high availability and data protection. Standbys may be deployed locally or remotely, and when using Oracle Active Data Guard can also be open for read-only access.
    • In system-managed and composite sharding, the logical unit of replication is a group of shards called a shardgroup.
    • In system-managed sharding, a shardgroup contains all of the data stored in the sharded database. The data is sharded by consistent hash across shards that make up the shardgroup. Shards that belong to a shardgroup are usually located in the same data center. An entire shardgroup can be fully replicated to one or more shardgroups in the same or different data centers.
    • With user-defined sharding the logical (and physical) unit of replication is a shard. Shards are not combined into shardgroups. Each shard and its replicas make up a shardspace which corresponds to a single Data Guard Broker configuration. Replication can be configured individually for each shardspace. Shardspaces can have different numbers of standbys which can be located in different data centers.
    • In Oracle GoldenGate, replication is handled at the chunk level. For example, half of the data stored in each shard is replicated to one shard, and the other half to another shard. If any shard becomes unavailable, its workload is split between two other shards in the shardgroup. The multiple failover destinations mitigate the impact of a shard failure because there is no single shard that has to handle all of the workload from the failed shard.
    • With Oracle GoldenGate replication, a shardgroup can contain multiple replicas of each row in a sharded table; therefore, high availability is provided within a shardgroup, and there is no need to have a local replica of the shardgroup, as there is in the case of Data Guard replication. The number of times each row is replicated within a shardgroup is called its replication factor and is a configurable parameter.
    • To provide disaster recovery, a shardgroup can be replicated to one or more data centers. Each replica of a shardgroup can have a different number of shards, replication factor, database versions, and hardware platforms. However, all shardgroup replicas must have the same number of chunks, because replication is done at the chunk level.

[MySQL Cluster]

  • Self Healing
    • Failed nodes are able to self-heal by automatically restarting and resynchronizing with other nodes before re-joining the cluster, with complete application transparency.
    • Data node recovery requires synchronization of the failed node's data from a surviving data node, and re-establishment of disk-based redo and checkpoint logs, before the data node returns to service. This recovery can take some time, during which the Cluster operates with reduced redundancy.
  • Uses heartbeating and timeout mechanisms which treat an extended loss of communication from a node as node failure.
  • Failover time
    • Automatic Failover - MySQL Cluster's heartbeating mechanism instantly detects any failures and automatically fails over, typically within one second, to other nodes in the cluster, without interrupting service to clients.
  • Data availability
    • Replica. This is a copy of a cluster partition. Each node in a node group stores a replica.
    • The number of replicas is equal to the number of nodes per node group. Up to 4 replicas of data are fully supported.
    • Synchronous Replication. Data within each data node is synchronously replicated to another data node. All data nodes are kept in synchrony, and a transaction committed by any one data node is committed for all data nodes.
    • Node group. A node group consists of one or more nodes, and stores partitions, or sets of replicas. The number of node groups in an NDB Cluster is not directly configurable; it is a function of the number of data nodes and of the number of replicas
      • [# of node groups] = [# of data nodes] / NoOfReplicas
    • All node groups in an NDB Cluster must have the same number of data nodes.
    • Node groups are formed implicitly. The first node group is formed by the set of data nodes with the lowest node IDs, the next node group by the set of the next lowest node identities, and so on. By way of example, assume that we have 4 data nodes and that NoOfReplicas is set to 2. The four data nodes have node IDs 2, 3, 4 and 5. Then the first node group is formed from nodes 2 and 3, and the second node group by nodes 4 and 5.

[Spanner]

  • Provides transparent, synchronous replication across region and multi-region configurations.
  • Replication is used for global availability and geographic locality.
  • Data distribution: Cloud Spanner automatically replicates your data between regions with strong consistency guarantees. This allows your data to be stored where it’s used, which can reduce latency and improve the user experience.
  • External consistency: Even though Cloud Spanner replicates across geographically distant locations, you can still use Cloud Spanner as if it were a database running on a single machine. Transactions are guaranteed to be serializable, and the order of transactions within the database is the same as the order in which clients observe the transactions to have been committed. External consistency is a stronger guarantee than "strong consistency," which is offered by some other products.
  • Data can also be dynamically and transparently moved between datacenters by the system to balance resource usage across datacenters.
  • Data availability
    • Even though the underlying distributed filesystem that Cloud Spanner is built on already provides byte-level replication, Cloud Spanner also replicates data to provide the additional benefits of data availability and geographic locality. At a high level, all data in Cloud Spanner is organized into rows. Cloud Spanner creates multiple copies, or "replicas," of these rows, then stores these replicas in different geographic areas. Cloud Spanner uses a synchronous, Paxos-based replication scheme, in which voting replicas take a vote on every write request before the write is committed. This property of globally synchronous replication gives you the ability to read the most up-to-date data from any Cloud Spanner read-write or read-only replica.
    • A split is a set of contiguous rows in a top-level table, where the rows are ordered by primary key. All of the data in a split is physically stored together in the replica, and Cloud Spanner serves each replica out of an independent failure zone.
    • A set of splits is stored and replicated using Paxos.
    • each Paxos replica set, one replica is elected to act as the leader. Leader replicas are responsible for handling writes, while any read-write or read-only replica can serve a read request without communicating with the leader (though if a strong read is requested, the leader will typically be consulted to ensure that the read-only replica has received all recent mutations).
    • The leader replica logs the incoming write, and forwards it, in parallel, to the other replicas that are eligible to vote on that write. Each eligible replica completes its write, and then responds back to the leader with a vote on whether the write should be committed. The write is committed when a majority of voting replicas (or "write quorum") agree to commit the write. In the background, all remaining (non-witness) replicas log the write. If a read-write or read-only replica falls behind on logging writes, it can request the missing data from another replica to have a full, up-to-date copy of the data.
    • Data rows are partitioned into clusters called directories using ancestry relationships in the schema. Each directory has at least one fragment,and large directories can have multiple fragments. Groups store a collection of directory fragments. Each group typically has one replica tablet per datacenter.
    • Data is replicated synchronously using the Paxos algorithm , and all tablets for a group store the same data. One replica tablet is elected as the Paxos leader for the group, and that leader is the entry point for all transactional activity for thegroup.
    • Groups may also include readonly replicas, which do not vote in the Paxos algorithm and cannot become the group leader.
    • The replication configurations for data can be dynamically controlled at a fine grain by applications. Applications can specify constraints to control:
      • which datacenters contain which data
      • how far data is from its users (to control read latency)
      • how far replicas are from each other (to control write latency)
      • how many replicas are maintained (to control durability, availability, and read performance)
    • For any regional configuration, Cloud Spanner maintains 3 read-write replicas, each within a different Google Cloud zone in that region. Each read-write replica contains a full copy of your operational database that is able to serve read-write and read-only requests. Cloud Spanner uses replicas in different zones so that if a single-zone failure occurs, your database remains available.
    • Consistent replication to the different copies of the split is managed by the Paxos algorithm. In Paxos, as long as a majority of the voting replicas for the split are up, one of those replicas can be elected leader to process writes and allow other replicas to serve reads.
    • In order to guarantee the durability of writes, Spanner transactions commit by writing mutations to at least a majority of the replicas of the affected splits. And the machines hosting those splits write these mutations durably in a distributed filesystem. Spanner is a "shared nothing" architecture (which provides high scalability), but because any server in a cluster can read from this distributed filesystem, we can recover quickly from whole-machine failures.
    • The distribution of splits among the nodes may be different in each zone, and the Paxos leaders do not all reside in the same zone. This flexibility helps Spanner to be more robust to certain kinds of load profiles and failure modes.
  • Cloud Spanner has three types of replicas: read-write replicas, read-only replicas, and witness replicas. Single-region instances use only read-write replicas, while multi-region instance configurations use a combination of all three types.
  • Witness replicas don’t support reads but do participate in voting to commit writes. These replicas make it easier to achieve quorums for writes without the storage and compute resources that are required by read-write replicas to store a full copy of data and serve reads.
  • Witness replicas:
    • Are only used in multi-region instances.
    • Do not maintain a full copy of data.
    • Do not serve reads.
    • Vote whether to commit writes.
    • Participate in leader election but are not eligible to become leader.
  • Reads
    • Client read requests might be executed at or require communicating with the leader replica, depending on the concurrency mode of the read request.
    • Reads that are part of a read-write transaction are served from the leader replica, because the leader replica maintains the locks required to enforce serializability.
    • Single read methods (a read outside the context of a transaction) and reads in read-only transactions might require communicating with the leader, depending on the concurrency mode of the read.
      • Strong read requests can go to any read-write or read-only replica. If the request goes to a non-leader replica, that replica must communicate with the leader in order to execute the read.
      • Stale read requests go to the closest available read-only or read-write replica that is caught up to the timestamp of the request. This can be the leader replica if the leader is the closest replica to the client that issued the read request.
  • Writes
    • Cloud Spanner needs a majority of voting replicas to agree on a commit in order to commit a mutation. In other words, every write to a Cloud Spanner database requires communication between voting replicas.
    • To minimize the latency of this communication, it is desirable to use the fewest number of voting replicas, and to place these replicas as close together as possible. That's why regional configurations contain exactly three read-write replicas, each of which contains a full copy of your data and is able to vote: if one replica fails, the other two can still form a write quorum, and because replicas in a regional configuration are within the same data center, network latencies are minimal.
  • Multi-region configurations contain more replicas by design, and these replicas are in different datacenters (so that clients can read their data quickly from more locations).
    • What characteristics should these additional replicas have? They could all be read-write replicas, but that would be undesirable because adding more read-write replicas to a configuration increases the size of the write quorum (which means potentially higher network latencies due to more replicas communicating with each other, especially if the replicas are in geographically distributed locations) and also increases the amount of storage needed (because read-write replicas contain a full copy of data).
    • Instead of using more read-write replicas, multi-region configurations contain two additional types of replicas that have fewer responsibilities than read-write replicas.
      • Read-only replicas do not vote for leaders or for committing writes, so they allow you to scale your read capacity without increasing the quorum size needed for writes.
  • Geographical replication
    • Each multi-region configuration contains two regions that are designated as read-write regions, each of which contains two read-write replicas.
    • One of these read-write regions is designated as the default leader region, which means that it contains your database's leader replicas. A leader will be chosen amongst the replicas in the default leader region for each split.
    • Cloud Spanner also places a witness replica in a third region called a witness region.
    • Each time a client issues a mutation to your database, a write quorum forms, consisting of one of the replicas from the default leader region and any two of the additional four voting replicas. (The quorum could be formed by replicas from two or three of the regions that make up your configuration, depending on which other replicas participate in the vote.)
    • In addition to these 5 voting replicas, the configuration can also contain read-only replicas for serving low-latency reads. The regions that contain read-only replicas are called read-only regions.
    • In general, the voting regions in a multi-region configuration are placed geographically close—less than a thousand miles apart—to form a low-latency quorum that enables fast writes. However, the regions are still far enough apart—typically, at least a few hundred miles—to avoid coordinated failures.
    • In the event of a leader replica failure, the other replica in the default leader region automatically assumes leadership. In fact, leaders run health checks on themselves and can preemptively give up leadership if they detect they are unhealthy. Under normal conditions when replicas in the default leader region are available, the default leader region contains the leaders and therefore is where writes are first processed.
    • The second read-write region contains the additional replicas that are eligible to be leaders. In the unlikely event of the loss of all replicas in the default leader region, new leader replicas are chosen from the second read-write region. Read and write workloads are fastest from the second read-write region.
    • A witness region contains a witness replica, which is used to vote on writes. Witnesses become important in the rare event that the read-write regions become unavailable. you can think of the witness region as a system-configured region for achieving write quorums.

[CockroachDB]

  • Data availability
    • Multi-active availability is CockroachDB's version of high availability (keeping your application online in the face of partial failures), which we've designed to avoid the downsides of both active-passive and traditional active-active systems. Like active-active designs, all replicas can handle traffic, including both reads and writes.
    • However, CockroachDB improves upon that design by also ensuring that data remains consistent across them, which we achieve by using "consensus replication." In this design, replication requests are sent to at least 3 replicas, and are only considered committed when a majority of replicas acknowledge that they've received it. This means that you can still have failures without compromising availability.
    • To prevent conflicts and guarantee your data's consistency, clusters that lose a majority of replicas stop responding because they've lost the ability to reach a consensus on the state of your data. When a majority of replicas are restarted, your database resumes operation.
    • CockroachDB replicates your data for availability and guarantees consistency between replicas using the Raft consensus algorithm, a popular alternative to Paxos.
    • You can define the location of replicas in various ways, depending on the types of failures you want to secure against and your network topology. You can locate replicas on:
      • Different servers within a rack to tolerate server failures
      • Different servers on different racks within a datacenter to tolerate rack power/network failures
      • Different servers in different datacenters to tolerate large scale network or power outages
    • Every acknowledged write has been persisted consistently on a majority of replicas (by default, at least 2) via the Raft consensus algorithm. Power or disk failures that affect only a minority of replicas (typically 1) do not prevent the cluster from operating and do not lose any data.
  • Automated Repair
    • For short-term failures, such as a server restart, CockroachDB uses Raft to continue seamlessly as long as a majority of replicas remain available. Raft makes sure that a new leader for each group of replicas is elected if the former leader fails, so that transactions can continue and affected replicas can rejoin their group once they’re back online.
    • For longer-term failures, such as a server/rack going down for an extended period of time or a datacenter outage, CockroachDB automatically rebalances replicas from the missing nodes, using the unaffected replicas as sources. Using capacity information from the gossip network, new locations in the cluster are identified and the missing replicas are re-replicated in a distributed fashion using all available nodes and the aggregate disk and network bandwidth of the cluster.
  • Reads
    • Unlike writes, read requests access the leaseholder and send the results to the client without needing to coordinate with any of the other range replicas. This reduces the network round trips involved and is possible because the leaseholder is guaranteed to be up-to-date due to the fact that all write requests also go to the leaseholder.
  • Writes
    • For each range, one of the replicas is the "leader" for write requests. Via the Raft consensus protocol, this replica ensures that a majority of replicas (the leader and enough followers) agree, based on their Raft logs, before committing the write. The Raft leader is almost always the same replica as the leaseholder.
    • When a range receives a write, a quorum of nodes containing replicas of the range acknowledge the write. This means your data is safely stored and a majority of nodes agree on the database's current state, even if some of the nodes are offline.
    • When a write doesn't achieve consensus, forward progress halts to maintain consistency within the cluster.
    • Synchronous replication requires all writes to propagate to a quorum of copies of the data before being considered committed.
  • Replication zones
    • give you the power to control what data goes where in your CockroachDB cluster. Specifically, they are used to control the number and location of replicas for data belonging to the following objects:
      • Databases
      • Tables
      • Rows
      • Indexes
      • All data in the cluster, including internal system data (via the default replication zone)
    • For each of the above objects you can control:
      • How many copies of each range to spread through the cluster.
      • Which constraints are applied to which data, e.g., "table X's data can only be stored in the German datacenters".
      • The maximum size of ranges (how big ranges get before they are split).
      • How long old data is kept before being garbage collected.
      • Where you would like the leaseholders for certain ranges to be located, e.g., "for ranges that are already constrained to have at least one replica in region=us-west, also try to put their leaseholders in region=us-west".
    • Every range in the cluster is part of a replication zone. Each range's zone configuration is taken into account as ranges are rebalanced across the cluster to ensure that any constraints are honored.

[YugabyteDB]

  • Master failure
    • The YB-Master is not in the critical path of normal IO operations, so its failure will not affect a functioning universe.
    • Nevertheless, the YB-Master is a part of a Raft group with the peers running on different nodes. One of these peers is the active master and the others are active stand-bys. If the active master (the YB-Master leader) fails, these peers detect the leader failure and re-elect a new YB-Master leader which now becomes the active master within seconds of the failure.
  • Data availability
    • The data in a DocDB table is split into tablets. By default, each tablet is synchronously replicated using the Raft algorithm across various nodes or fault domains (such as availability zones/racks/regions/cloud providers).
    • DocDB automatically replicates data synchronously in order to survive failures while maintaining data consistency and avoiding operator intervention. It does so using the Raft distributed consensus protocol.
  • Read replicas
    • The in-cluster asynchronous replicas are called read replicas.
    • In addition to the core distributed consensus based replication, DocDB extends Raft to add read replicas (aka observer nodes) that do not participate in writes but get a timeline consistent copy of the data in an asynchronous manner.
    • Read Replicas are a read-only extension to the primary data in the cluster. With read replicas, the primary data of the cluster is deployed across multiple zones in one region, or across nearby regions.
    • Read replicas do not add to the write latencies since the write does not synchronously replicate data to them - the data gets replicated to read replicas asynchronously.
  • Replication between clusters
    • Every universe contains a primary data cluster, and one or more read replica clusters. Thus, each read replica cluster can independently have its own replication factor.
    • A primary cluster can perform both writes and reads. Replication between nodes in a primary cluster is performed synchronously.
    • Read replica clusters can perform only reads. Writes sent to read replica clusters get automatically rerouted to the primary cluster for the universe. These clusters help in powering reads in regions that are far away from the primary cluster with timeline-consistent data. This ensures low latency reads for geo-distributed applications.
    • Data is brought into the read replica clusters through asynchronous replication from the primary cluster. In other words, nodes in a read replica cluster act as Raft observers that do not participate in the write path involing the Raft leader and Raft followers present in the primary cluster.
    • This read-only node (or timeline-consistent node) is still strictly better than eventual consistency, because with the latter the application's view of the data can move back and forth in time and is hard to program to.
  • Failover time
    • The RTO (recovery time objective) is 3 seconds, which is the time window for completing the failover and becoming operational out of the new zones.
    • The failure of any tablet-peer leader automatically triggers a new RAFT level leader election within seconds, and another tablet-peer on a different YB-TServer takes its place as the new leader. The unavailability window is in the order of a couple of seconds (assuming the default heartbeat interval of 500 ms) in the event of a failure of the tablet-peer leader.

[Greenplum]

  • You may optionally deploy a backup or mirror of the master instance. A backup master host serves as a warm standby if the primary master host becomes nonoperational. You can deploy the standby master on a designated redundant master host or on one of the segment hosts.
  • The standby master is kept up to date by a transaction log replication process, which runs on the standby master host and synchronizes the data between the primary and standby master hosts. If the primary master fails, the log replication process shuts down, and an administrator can activate the standby master in its place.
  • How a Segment Failure is Detected and Managed
    • On the master host, the Postgres postmaster process forks a fault probe process, ftsprobe. This is also known as the FTS (Fault Tolerance Server) process. The postmaster process restarts the FTS if it fails.
    • The FTS runs in a loop with a sleep interval between each cycle. On each loop, the FTS probes each primary segment instance by making a TCP socket connection to the segment instance using the hostname and port registered in the gp_segment_configuration table.
    • If the connection succeeds, the segment performs a few simple checks and reports back to the FTS. The checks include executing a stat system call on critical segment directories and checking for internal faults in the segment instance. If no issues are detected, a positive reply is sent to the FTS and no action is taken for that segment instance.
    • If the connection cannot be made, or if a reply is not received in the timeout period, then a retry is attempted for the segment instance.
    • If the configured maximum number of probe attempts fail, the FTS probes the segment's mirror to ensure that it is up, and then updates the gp_segment_configuration table, marking the primary segment "down" and setting the mirror to act as the primary. The FTS updates the gp_configuration_history table with the operations performed.
  • When mirroring is enabled, the system automatically fails over to the mirror copy if a primary copy becomes unavailable.
  • A Greenplum Database system can remain operational if a segment instance or host goes down only if all portions of data are available on the remaining active segments.
  • When Greenplum Database detects a primary segment failure, the WAL replication process stops and the mirror segment automatically starts as the active primary segment.
  • If a mirror segment fails or becomes inaccessible while the primary is active, the primary segment tracks database changes in logs that are applied to the mirror when it is recovered.
  • If the master cannot connect to a segment instance, it marks that segment instance as invalid in the Greenplum Database system catalog. The segment instance remains invalid and out of operation until an administrator brings that segment back online. An administrator can recover a failed segment while the system is up and running. The recovery process copies over only the changes that were missed while the segment was nonoperational.
  • If you do not have mirroring enabled and a segment becomes invalid, the system automatically shuts down. An administrator must recover all failed segments before operations can continue.
  • Mirror segments can be arranged across the hosts in the system in one of two standard configurations, or in a custom configuration you design.
    • The default configuration, called group mirroring, places the mirror segments for all primary segments on a host on one other host.
    • Another option, called spread mirroring, spreads mirrors for each host's primary segments over the remaining hosts. Spread mirroring requires that there be more hosts in the system than there are primary segments on the host. On hosts with multiple network interfaces, the primary and mirror segments are distributed equally among the interfaces.
    • With block mirroring, nodes are divided into blocks, for example a block of four or eight hosts, and the mirrors for segments on each host are placed on other hosts within the block. Depending on the number of hosts in the block and the number of primary segments per host, each host maintains more than one mirror for each other host's segments.

[Citus]

  • The Citus coordinator maintains metadata tables to track all of the cluster nodes and the locations of the database shards on those nodes. The metadata tables are small (typically a few MBs in size) and do not change very often. This means that they can be replicated and quickly restored if the node ever experiences a failure. There are several options on how users can deal with coordinator failures.
    1. Use PostgreSQL streaming replication: You can use PostgreSQL’s streaming replication feature to create a hot standby of the coordinator. Then, if the primary coordinator node fails, the standby can be promoted to the primary automatically to serve queries to your cluster.
    2. Since the metadata tables are small, users can use EBS volumes, or PostgreSQL backup tools to backup the metadata. Then, they can easily copy over that metadata to new nodes to resume operation.
  • Azure Database for PostgreSQL - Hyperscale (Citus) provides high availability (HA) to avoid database downtime. With HA enabled, every node in a server group will get a standby. If the original node becomes unhealthy, its standby will be promoted to replace it. Because HA doubles the number of servers in the group, it will also double the cost.
  • Data availability
    • Citus allows shards to be replicated for protection against data loss. There are two replication modes:
      • Citus shard replication: creates extra backup shard placements and runs queries against all of them that update any of them. This option is best suited for an append-only workload. Citus replicates shards across different nodes by automatically replicating DML statements and managing consistency. If a node goes down, the coordinator node will continue to serve queries by routing the work to the replicas seamlessly.
      • PostgreSQL streaming replication: is more efficient and utilizes PostgreSQL streaming replication to back up the entire database of each node to a follower database. This is transparent and does not require the involvement of Citus metadata tables. This option is best for heavy OLTP workloads.
  • Failover time
    • Failover happens within a few minutes, and promoted nodes always have fresh data through PostgreSQL synchronous streaming replication.

[Postgres-XL]

  • Set up a backup GTM to prepare for a GTM failure.
  • Use the streaming replication for each Coordinator and Datanode.
    • Streaming replication has only been thoroughly tested in asynchronous mode.
    • Hot Standby is not supported.
  • The pgxc_ctl utility can help in setting up slaves for datanodes and coordinators.
    • PGXC$ add datanode slave dn1 localhost 40101 40111 $dataDirRoot/dn_slave.1 none $dataDirRoot/datanode_archlog.1

<References>

Continuation of query processing

  • Want to continue and successfully complete a long-running data loading, DML, and SELECTs when a node fails by resuming the work on another node with a replica.

[Spanner]

  • For both read-only transactions and snapshot reads, commit is inevitable once a timestamp has been chosen, unless the data at that timestamp has been garbage-collected. As a result, clients can avoid buffering results inside a retry loop.
  • When a server fails, clients can internally continue the query on a different server by repeating the timestamp and the current read position.

[Citus]

  • Citus’s distributed executors run distributed query plans and handle failures that occur during query execution. The executors connect to the workers, send the assigned tasks to them and oversee their execution.
  • If the executor cannot assign a task to the designated worker or if a task execution fails, then the executor dynamically re-assigns the task to replicas on other workers.
  • The executor processes only the failed query sub-tree, and not the entire query while handling failures.

Split brain

  • Need some majority / quorum mechanisms to guarantee data consistency whenever parts of the cluster become inaccessible.

[MySQL Cluster]

  • When all data nodes in at least one node group are alive, network partitioning is not an issue, because no single subset of the cluster can form a functional cluster on its own.
  • The real problem arises when no single node group has all its nodes alive, in which case network partitioning (the split-brain scenario) becomes possible.
  • Then an arbitrator is required. All cluster nodes recognize the same node as the arbitrator, which is normally the management server; however, it is possible to configure any of the MySQL Servers in the cluster to act as the arbitrator instead.
  • The arbitrator accepts the first set of cluster nodes to contact it, and tells the remaining set to shut down.
  • Arbitrator selection is controlled by the ArbitrationRank configuration parameter for MySQL Server and management server nodes. You can also use the ArbitrationRank configuration parameter to control the arbitrator selection process.

Backup and recovery

  • Want to have backup and recovery work in parallel across cluster nodes.

<Questions>

  • Q1: Do we assume or recommend shared storage where to accumulate backups from all nodes (NFS/CIFS, object storage, cloud storage)?
  • Q2: How do we name (label) each backup of the entire cluster?
  • Q3: Do we allow restoration to a different cluster configuration, i.e., a cluster with different number of nodes?
  • Q4: How can we accomplish Recovery to preserve consistency among nodes?
    • At the time of backup, a distributed transaction may have completed commit on one node and not yet on the other nodes.
    • PITR needs to prevent a half-baked distributed transaction, which is replayed up to its commit on one node and not on other nodes.

[Oracle]

  • Because shards are hosted on individual Oracle databases, you can use Oracle Maximum Availability best practices to back up and restore shards individually.

[MySQL Cluster]

  • You can use the NDB Cluster native backup and restore functionality in the NDB management client and the ndb_restore program.
  • You can also use the traditional functionality provided for this purpose in mysqldump and the MySQL server.
  • A backup is a snapshot of the database at a given time.
  • It is possible to restore from an NDB backup to a cluster having a different number of data nodes than the original from which the backup was taken.
  • The backup consists of three main parts:
    1. Metadata. The names and definitions of all database tables
    2. Table records. The data actually stored in the database tables at the time that the backup was made
    3. Transaction log. A sequential record telling how and when data was stored in the database
  • Each of these parts is saved on all nodes participating in the backup. During backup, each node saves these three parts into three files on disk:
    1. BACKUP-backup_id.node_id.ctl: A control file containing control information and metadata. Each node saves the same table definitions (for all tables in the cluster) to its own version of this file.
    2. BACKUP-backup_id-0.node_id.data: A data file containing the table records, which are saved on a per-fragment basis. That is, different nodes save different fragments during the backup. The file saved by each node starts with a header that states the tables to which the records belong. Following the list of records there is a footer containing a checksum for all records.
    3. BACKUP-backup_id.node_id.log: A log file containing records of committed transactions. Only transactions on tables stored in the backup are stored in the log. Nodes involved in the backup save different records because different nodes host different database fragments.
  • Backup and restore commands
    • START BACKUP [backup_id] [wait_option] [snapshot_option]
    • ABORT BACKUP backup_id
    • ndb_restore [-c connection_string] -n node_id -b backup_id [-m] -r --backup-path=/path/to/backup/files

[Spanner]

  • Backup
    • Data consistency: Backups are a transactionally and externally consistent copy of a Cloud Spanner database at the create_time of the backup.
    • Any modifications to the data or schema after backup creation has started will not be included in the backup.
    • To ensure external consistency of the backup, Cloud Spanner pins the contents of the database at create time. This prevents the garbage collection system from removing the relevant data values for the duration of the backup operation.
    • Then, every zone in the instance begins copying the data in parallel. If a zone is temporarily unavailable, the backup is not complete until the zone comes back online and finishes. Backups are restorable as soon as the operation is done.
  • Restore
    • The node count of the instances does not need to be the same.
    • The restore process is designed for high-availability as the database can be restored as long as the majority quorum of the regions and zones in the instance are available.

[CockroachDB]

  • Distributed backup/restore
    • Back up your full cluster to services like AWS S3, Google Cloud Storage, or NFS for the unlikely case you need to restore data
    • BACKUP TO '<backup_location>';
    • The BACKUP process minimizes its impact to the cluster's performance by distributing work to all nodes. Each node backs up only a specific subset of the data it stores (those for which it serves writes), with no two nodes backing up the same data.
  • Point in time recovery
    • Roll back CockroachDB to the exact moment before a mistyped delete or update happened
    • RESTORE TABLE bank.customers FROM '<backup_location>';
    • RESTORE DATABASE bank FROM '<backup_location>';
    • RESTORE FROM '<backup_location>';
    • A full cluster restore can only be run on a target cluster that has never had user-created databases or tables.
    • The RESTORE process minimizes its impact to the cluster's performance by distributing work to all nodes. Subsets of the restored data (known as ranges) are evenly distributed among randomly selected nodes, with each range initially restored to only one node. Once the range is restored, the node begins replicating it to others.

[YugabyteDB]

  • ysql_dump, ysql_dumpall: These are analogue of pg_dump and pg_dumpall.
  • YugabyteDB supports distributed backup and restore of YSQL databases. Backing up and restoring of individual tables within a database is not yet supported.
    • Create a database snapshot using yb-admin create_database_snapshot command:
    • $ yb-admin -master_addresses <ip1:7100,ip2:7100,ip3:7100> create_database_snapshot ysql.<database_name>
    • For details, see Snapshot and restore data.

[Greenplum]

  • Supports parallel and non-parallel methods for backing up and restoring databases. Parallel operations scale regardless of the number of segments in your system, because segment hosts each write their data to local disk storage simultaneously.
  • gpbackup and gprestore are the Greenplum Database backup and restore utilities.
    • By default, gpbackup backs up objects in the specified database as well as global Greenplum Database system objects.
    • gpbackup utilizes ACCESS SHARE locks at the individual table level, instead of EXCLUSIVE locks on the pg_class catalog table.
    • By default, gpbackup stores only the object metadata files and DDL files for a backup in the Greenplum Database master data directory.
    • Greenplum Database segments use the COPY ... ON SEGMENT command to store their data for backed-up tables in compressed CSV data files, located in each segment's backups directory.
    • Each gpbackup task uses a single transaction in Greenplum Database. During this transaction, metadata is backed up on the master host, and data for each table on each segment host is written to CSV backup files using COPY ... ON SEGMENT commands in parallel. The backup process acquires an ACCESS SHARE lock on each table that is backed up.
    • Backing up a database with gpbackup while simultaneously running DDL commands might cause gpbackup to fail, in order to ensure consistency within the backup set. For example, if a table is dropped after the start of the backup operation, gpbackup exits and displays the error message ERROR: relation <schema.table> does not exist.
    • gpbackup might fail when a table is dropped during a backup operation due to table locking issues. gpbackup generates a list of tables to back up and acquires an ACCESS SHARED lock on the tables. If an EXCLUSIVE LOCK is held on a table, gpbackup acquires the ACCESS SHARED lock after the existing lock is released. If the table no longer exists when gpbackup attempts to acquire a lock on the table, gpbackup exits with the error message.
    • backup created with gpbackup can only be restored to a Greenplum Database cluster with the same number of segment instances as the source cluster. If you run gpexpand to add segments to the cluster, backups you made before starting the expand cannot be restored after the expansion has completed.

[Postgres-XL]

  • Function: pgxc_lock_for_backup() boolean
    • Locks the cluster for taking backup that would be restored on the new node to be added.
    • pgxc_lock_for_backup locks the cluster for taking backup using pg_dump/pg_dumpall. Locking means that we disallow the statements that change the portions of the catalog which are backed up by pg_dump/pg_dumpall. This function does not impact SELECTs or DMLs.
    • To lock the cluster for backup while adding a new node Postgres-XL uses advisory locks. Every time a disallowed statement is issued the system tries to acquire a transaction level advisory lock in shared mode and the lock is released when the DDL or the transaction issuing the DDL ends. The function pgxc_lock_for_backup tries to acquire the same advisory lock in exclusive mode at session level. It is therefore necessary to keep the session issuing pgxc_lock_for_backup alive as long as the issuer wants the system to keep the lock.
  • Barrier
    • Its important to ensure that any global transactions in the cluster must either be committed on all the nodes or none of the nodes. If global recovery is done in a way such that on one node the WAL recovery stops after a commit record of some global transaction is processed, but stops before the commit record for the same transaction is processed on some other node, the cluster may be left in an inconsistent state. Since the commit messages of global transactions can arrive out-of-order on different nodes, its very hard to find a common synchronization point.
    • For example, for two global transactions T1 and T2, the commit messages for these transactions can arrive on nodes N1 and N2 such that N1 receives commit message for T1 first and N2 receives commit message T2 first. In this case, during PITR, irrespective of whether we stop at T1 or T2, the cluster would lose its consistency since at least one transaction will be marked as committed on one node and as aborted on the other node.
    • During recovery, it can be hard and even impossible to find cluster-wide consistent synchronization points. In fact, such synchronization points may not exists at all.
    • Postgres-XL provides a mechanism to create such synchronization points, called barriers, during normal operation. A barrier can be created by using a SQL command BARRIER
    • The user must connect to one of the Coordinators and issue the BARRIER command, optionally followed by an identifier. If the user does not specify an identifier, an unique identifier will be generated and returned to the caller.
    • Upon receiving the BARRIER command, the Coordinator temporarily pauses any new two-phase commits. It also communicates with other Coordinators to ensure that there are no in-progress two-phase commits in the cluster. At that point, a barrier WAL record along with the user-given or system-generated BARRIER identifier is written to the WAL stream of all Datanodes and the Coordinators.
    • The user can create as many barriers as she wants to. At the time of point-in-time-recovery, the same barrier id must be specified in the recovery.conf files of all the Coordinators and Datanodes. When every node in the cluster recovers to the same barrier id, a cluster-wide consistent state is reached.
    • Its important that the recovery must be started from a backup taken before the barrier was generated. If no matching barrier record is found, either because the barrier was created before the base backup used for the recovery or the said barrier was never created, the recovery is run to the end.
    • CREATE BARRIER barrier_name;
    • CREATE BARRIER creates a new XLOG record on each node of the cluster consistently. Essentially a barrier is a consistent point in the cluster that you can recover to.
    • Without barriers, if you recover an individual component, it may be possible that it is not consistent with the other nodes depending on when it was committed.
    • A barrier is created via a 2PC-like mechanism from a remote Coordinator in 3 phases with a prepare, execute and ending phases.
    • A new recovery parameter called recovery_target_barrier has been added in recovery.conf. In order to perform a complete PITR recovery, it is necessary to set recovery_target_barrier to the value of a barrier already created. Then distribute recovery.conf to each data folder of each node, and then to restart the nodes one by one.
  • pg_ctl -Z restore
    • imports Postgres-XL's catalog data from other coordinator or datanode.
  • pg_resetwal
    • will only run locally for Coordinators and Datanodes. You should run it for each Coordinator or Datanode manually.

Disaster recovery

<Questions>

  • Q1: Do we allow different cluster configurations for the production and backup sites?
  • Q2: How do we replicate and apply changes?
    • One-to-one: a single node on one cluster collects all updates and send them to one node on another cluster (MySQL Cluster)
    • Many-to-many: each node on one cluster sends its updates to a corresponding node on another cluster
  • Q3: How do we achieve consistency among nodes, as in the case of PITR within the local data center?

[Oracle]

  • Back up for disaster recovery using Cloud Backup Service, RMAN, and Zero Data Loss Recovery Appliance
  • Contact Oracle Support for specific steps to recover a shard in the event of a disaster.

[MySQL Cluster]

  • Geographical Replication - Geographic replication enables nodes to be mirrored to remote data centers for disaster recovery.
  • MySQL Cluster also replicates across data centers for disaster recovery and global scalability. Using its conflict handling mechanisms, each cluster can be active, accepting updates while maintaining consistency across locations. Update-anywhere geographic replication enables multiple clusters to be distributed geographically for disaster recovery and the scalability of global web services.
  • The replication process is one in which successive states of a source cluster are logged and saved to a replica cluster. This process is accomplished by a special thread known as the NDB binary log injector thread, which runs on each MySQL server and produces a binary log (binlog). This thread ensures that all changes in the cluster producing the binary log - and not just those changes that are effected through the MySQL Server - are inserted into the binary log with the correct serialization order.
  • A replication channel requires two MySQL servers acting as replication servers (one each for the source and replica). For example, this means that in the case of a replication setup with two replication channels (to provide an extra channel for redundancy), there will be a total of four replication nodes, two per cluster.

[YugabyteDB]

  • xCluster replication Data is asynchronously replicated between different YugabyteDB clusters - both unidirectional replication (master-slave) or bidirectional replication across two clusters.
  • Each YugabyteDB's TServer has CDC subscribers (cdc_subscribers) that are responsible for getting changes for all tablets for which the TServer is a leader.
  • All data changes for one row, or multiple rows in the same tablet, will be received in the order in which they occur. Due to the distributed nature of the problem, however, there is no guarantee for the order across tablets.
  • In case of active-active configurations, if there are conflicting writes to the same key, then the update with the larger timestamp is considered the latest update. Thus, the deployment is eventually consistent across the two data centers.
  • Limitations
    • Transactions from the producer won't be applied atomically on the consumer. That is, some changes in a transaction may be visible before others.
    • Currently: DDL changes are not automatically replicated. Applying create table and alter table commands to the sync clusters is the responsibility of the user.
    • Future: Allow safe DDL changes to be propagated automatically.
    • The transactions (especially those that do not involve overlapping rows) may not be applied in the same order as they occur in the source cluster.

Connection failover

<Questions>

  • Q1: How can we avoid attempts to connect or send requests to failed nodes and TCP timeout?

[Spanner]

  • Clients automatically failover between replicas.

[Greenplum]

  • Plan how to switch clients to the new master instance when a failure occurs, for example, by updating the master address in DNS.
  • You can configure a virtual IP address for the master and standby so that client programs do not have to switch to a different network address when the current master changes. If the master host fails, the virtual IP address can be swapped to the actual acting master.


Security

<Questions>

  • Q1: Should we centrally control which nodes can join the cluster?
  • Q2: Should we encrypt inter-node traffic?
  • Q3: Should we authenticate inter-node connections?

[MySQL Cluster]

  • No encryption of inter-node traffic
    • Communications between NDB Cluster nodes are not encrypted or shielded in any way. The only means of protecting transmissions within an NDB Cluster is to run your NDB Cluster on a protected network. If you intend to use NDB Cluster for Web applications, the cluster should definitely reside behind your firewall and not in your network's De-Militarized Zone (DMZ) or elsewhere.
    • Cluster communication protocols are inherently insecure, and no encryption or similar security measures are used in communications between nodes in the cluster. Because network speed and latency have a direct impact on the cluster's efficiency, it is also not advisable to employ SSL or other encryption to network connections between nodes, as such schemes will effectively slow communications.
  • No authentication is used for controlling API node access to an NDB Cluster. As with encryption, the overhead of imposing authentication requirements would have an adverse impact on Cluster performance.
  • The DBA can control which nodes can join the cluster by creating slots in config.ini on the master host.
    • Restrict SQL nodes by creating [mysqld] or [api] sections with their host names or IP addresses
    • Allow any SQL node by creating an empty slot, a [mysqld] or [api] section without specifying a host name or IP address
  • Users and privileges
    • MySQL user accounts and privileges are normally not automatically propagated between different MySQL servers accessing the same NDB Cluster. MySQL NDB Cluster provides support for shared and synchronized users and privileges using the NDB_STORED_USER privilege;
    • A statement granting the NDB_STORED_USER privilege, such as GRANT NDB_STORED_USER ON *.* TO 'cluster_app_user'@'localhost', works by directing NDB to create a snapshot using the queries SHOW CREATE USER cluster_app_user@localhost and SHOW GRANTS FOR cluster_app_user@localhost, then storing the results in ndb_sql_metadata. Any other SQL nodes are then requested to read and apply the snapshot. Whenever a MySQL server starts up and joins the cluster as an SQL node it executes these stored CREATE USER and GRANT statements as part of the cluster schema synchronization process.
    • In the event that multiple GRANT, REVOKE, or other user administration statements from multiple SQL nodes cause privileges for a given user to diverge on different SQL nodes, you can fix this problem by issuing GRANT NDB_STORED_USER for this user on an SQL node where the privileges are known to be correct; this causes a new snapshot of the privileges to be taken and synchronized to the other SQL nodes.

[CockroachDB]

  • You can run a secure or insecure CockroachDB cluster.
    • When secure, client/node and inter-node communication is encrypted, and SSL certificates authenticate the identity of both clients and nodes.
    • When insecure, there's no encryption or authentication.

[YugabyteDB]

  • TLS encryption ensures that network communication between servers is secure. You can configure YugabyteDB to use TLS to encrypt intra-cluster and client to server network communication.

[Greenplum]

  • The pg_hba.conf file of the master instance controls client access and authentication to your Greenplum system.
  • The segments also have pg_hba.conf files, but these are already correctly configured to only allow client connections from the master host. The segments never accept outside client connections, so there is no need to alter the pg_hba.conf file on segments.

[Citus]

  • The traffic between the different nodes in the cluster is encrypted for NEW installations. This is done by using TLS with self-signed certificates. This means that this does not protect against Man-In-The-Middle attacks. This only protects against passive eavesdropping on the network.
  • When Citus nodes communicate with one another they consult a GUC for connection parameters and, in the Enterprise Edition of Citus, a table with connection credentials. This gives the database administrator flexibility to adjust parameters for security and efficiency.


Application

Workload routing

<Questions>

  • Q1: How can we route connections to the nodes that have necessary data, based on some data specified during a connection, transaction, or data access request?
  • Q2: Do we route read-only connections to the read replicas?
  • Q3: What kind of information should be used to route connection or transaction requests, e.g. server load, node capacity, locality (region, zone, rack)?
  • Q4: Do we need intermediary router software?

[Oracle]

  • Multi-shard queries and queries that do not specify a sharding key are routed through the multi-shard query coordinator which acts as a proxy for the application requesting the data. The multi-shard query coordinator (or, "the coordinator") runs on the shard catalog or its replicas.
  • Direct Routing to a Shard
    • Oracle clients and connections pools are able to recognize sharding keys specified in the connection string for high performance data dependent routing. A shard routing cache in the connection layer is used to route database requests directly to the shard where the data resides.
    • A shard topology cache is a mapping of the sharding key ranges to the shards. Oracle Integrated Connection Pools maintain this shard topology cache in their memory. Upon the first connection to a given shard (during pool initialization or when the pool connects to newer shards), the sharding key range mapping is collected from the shards to dynamically build the shard topology cache.
    • Caching the shard topology creates a fast path to the shards and expedites the process of creating a connection to a shard. When a connection request is made with a sharding key, the connection pool looks up the corresponding shard on which this particular sharding key exists (from its topology cache). If a matching connection is available in the pool then the pool returns a connection to the shard by applying its internal connection selection algorithm.
    • If a matching connection is not available in the pool, then a new connection is created by forwarding the connection request with the sharding key to the shard director.
    • Once the pools are initialized and the shard topology cache is built based on all shards, a shard director outage has no impact on direct routing.
  • Load balancing
    • The Shard Director is built upon the Oracle Global Service Manager (GSM). GSMs route connections based on database role, load, replication lag, and locality.

Constraints

[Oracle]

  • The sharding key must be included in the primary key.

[MySQL Cluster]

  • Primary/unique key
    • If no primary key is defined by the user when a table is created, the NDBCLUSTER storage engine automatically generates a hidden one.
  • Foreign key
    • Provides support for foreign key constraints which is comparable to that found in the InnoDB storage engine.
    • Every column referenced as a foreign key requires an explicit unique key, if it is not the table's primary key.
    • ON UPDATE CASCADE is not supported when the reference is to the parent table's primary key. This is because an update of a primary key is implemented as a delete of the old row (containing the old primary key) plus an insert of the new row (with a new primary key). This is not visible to the NDB kernel, which views these two rows as being the same, and thus has no way of knowing that this update should be cascaded.

[Spanner]

  • Primary/unique key
    • Every table must have a primary key, and that primary key can be composed of zero or more columns of that table.
    • If you declare a table to be a child of another table, the primary key column(s) of the parent table must be the prefix of the primary key of the child table.
  • Foreign key
    • Cloud Spanner validates foreign key constraints when a transaction is being committed, or when the effects of writes are being made visible to subsequent operations in the transaction.
    • Validation occurs immediately after each DML statement. You must, for example, insert the referenced row before inserting its referencing rows.
    • Cloud Spanner's table interleaving is a good choice for many parent-child relationships where the child table's primary key includes the parent table's primary key columns. The co-location of child rows with their parent rows can significantly improve performance.
    • Foreign keys are a more general parent-child solution and address additional use cases. They are not limited to primary key columns, and tables can have multiple foreign key relationships, both as a parent in some relationships and a child in others.
    • However, a foreign key relation does not imply co-location of the tables in the storage layer.

[CockroachDB]

  • Primary/unique key
    • When you do not explicitly define a primary key, CockroachDB will automatically add a hidden rowid column as the primary key.
    • The primary key required for partitioning is different from the conventional primary key. To define the primary key for partitioning, prefix the unique identifier(s) in the primary key with all columns you want to partition and subpartition the table on, in the order in which you want to nest your subpartitions.
    • The order in which the columns are defined in the primary key is important. The partitions and subpartitions need to follow that order.

[YugabyteDB]

  • Does not support exclusion constraints.

[Greenplum]

  • Primary/unique key
    • UNIQUE and PRIMARY KEY constraints must be compatible with their table's distribution key and partitioning key, if any.
    • The table must be hash-distributed or replicated (not DISTRIBUTED RANDOMLY).
    • If the table is hash-distributed, the constraint columns must be the same as (or a superset of) the table's distribution key columns.
    • If a table has a primary key, this column (or group of columns) is chosen as the distribution key for the table by default.
    • Note that a UNIQUE CONSTRAINT (such as a PRIMARY KEY CONSTRAINT) implicitly creates a UNIQUE INDEX that must include all the columns of the distribution key and any partitioning key. The UNIQUE CONSTRAINT is enforced across the entire table, including all table partitions (if any).
  • Foreign key
    • Not supported. You can declare them, but referential integrity is not enforced.

[Citus]

  • Primary/unique key
    • Due to the nature of distributed systems, Citus will not cross-reference uniqueness constraints or referential integrity between worker nodes.
    • Primary keys and uniqueness constraints must include the distribution column. Adding them to a non-distribution column will generate an error.
  • Foreign key
    • To set up a foreign key between colocated distributed tables, always include the distribution column in the key. This may involve making the key compound.
    • Foreign keys may be created in these situations:
      • between two local (non-distributed) tables
      • between two reference tables
      • between two colocated distributed tables when the key includes the distribution column
      • as a distributed table referencing a reference table
    • Foreign keys from reference tables to distributed tables are not supported.

[Postgres-XL]

  • Primary/unique key
    • In distributed tables, UNIQUE constraints must include the distribution column of the table. This is because Postgres-XL currently only allows that it can push down to the Datanodes to be enforced locally. If we include the distribution column in unique constraints, it stands to reason that it can be enforced locally.
    • There's no restriction in UNIQUE constraint in replicated tables.
    • When an expression is used on a UNIQUE constraint, this expression must contain the distribution column of its parent table. It cannot use other columns as well.
    • The distribution column must be included in PRIMARY KEY. Other restrictions apply to the PRIMARY KEY as well.
    • When an expression is used on a PRIMARY KEY constraint, this expression must contain the distribution column of its parent table. It cannot use other columns as well.
  • Foreign key
    • You cannot omit the column name in REFERENCES clause.
    • You cannot specify both PRIMARY KEY and REFERENCES key for different columns.
    • You cannot specify more than one foreign key constraints.
    • Only the distribution column can have a foreign key constraint.
    • When distributing tables A and B, where A has a foreign key to B, distribute the key destination table B first. Doing it in the wrong order will cause an error.
    • If it’s not possible to distribute in the correct order then drop the foreign keys, distribute the tables, and recreate the foreign keys.
  • Does not support exclusion constraints.

Sequence

<Questions>

  • Q1: Where do we store the sequence?
  • Q2: Do we want a node-specific sequence whose values don't overlap with those of other nodes?

[Oracle]

  • A sharded sequence is created on the shard catalog but has an instance on each shard. Each instance generates monotonically increasing numbers that belong to a range which does not overlap with ranges used on other shards. Therefore, every generated number is globally unique.
  • Note that the number generated by a sharded sequence cannot be immediately used as a sharding key for a new row being inserted into this shard, because the key value may belong to another shard and the insert will result in an error. To insert a new row, the application should first generate a value of the sharding key and then use it to connect to the appropriate shard. A typical way to generate a new value of the sharding key would be use a regular (non-sharded) sequence on the shard catalog.
  • If a single sharding key generator becomes a bottleneck, a sharded sequence can be used for this purpose. In this case, an application should connect to a random shard (using the global service without specifying the sharding key), get a unique key value from a sharded sequence, and then connect to the appropriate shard using the key value.

[Spanner]

  • There is no auto-increment capability.

[Greenplum]

  • You cannot use the nextval() function in UPDATE or DELETE statements if mirroring is enabled in Greenplum Database.
  • The segments need a single point of truth to go for sequence values so that all segments get incremented correctly and the sequence moves forward in the right order. A sequence server process runs on the master and is the point-of-truth for a sequence. Segments get sequence values at runtime from the master.
  • Because of this distributed sequence design, there are some limitations on the functions that operate on a sequence:
    • lastval() and currval() functions are not supported.
    • setval() can only be used to set the value of the sequence generator on the master, it cannot be used in subqueries to update records on distributed table data.
    • nextval() sometimes grabs a block of values from the master for a segment to use, depending on the query. So values may sometimes be skipped in the sequence if all of the block turns out not to be needed at the segment level. Note that a regular PostgreSQL database does this too, so this is not something unique to Greenplum Database.

[Citus]

  • Citus MX
    • Serial columns must have type bigserial. Globally in the cluster the sequence values will not be monotonically increasing because the sixteen most significant bits hold the worker node id.
    • When performing writes on a hash-distributed table with a bigserial column via the data URL, sequence numbers are no longer monotonic, but instead have the form <16-bit unique node ID><48-bit local sequence number> to ensure uniqueness.
    • The coordinator node always has node ID 0, meaning it will generate sequence numbers as normal.
    • Serial types smaller than bigserial cannot be used in distributed tables.

View

<Questions>

  • Q1: Where are materialized view data stored?

[Oracle]

  • Materialized views created on sharded tables remain empty on the catalog database, while the corresponding materialized views on shards contain data from each of the individual shards.

[Citus]

  • Materialized view
    • Stores materialized views as local tables on the coordinator node.
    • Using them in distributed queries after materialization requires wrapping them in a subquery.

[Postgres-XL]

  • TEMPORARY views are not yet supported.
  • Materialized view
    • Just like usual views, materialized view is created at coordinator level, not datanode level, and is replicated among all the coordinators. When materialized view is created, originating coordinator collects all the rows and replicate them.
    • When materialized view is refreshed, originating coordinator corrects all the rows, drops all the existing rows and then replicates new ones.
    • If new row data comes from originating coordinator, the data is handled using COPY protocol, not by running queries.

Trigger

[Oracle]

  • Triggers are not supported.

[Postgres-XL]

  • Triggers are not supported.
  • Event triggers are not supported.

Index

[Spanner]

  • Unique index
    • Constraint is enforced by Cloud Spanner at transaction commit time. Specifically, any transaction that would cause multiple index entries for the same key to exist will fail to commit.
    • If a table contains non-UNIQUE data in it to begin with, attempting to create a UNIQUE index on it will fail.
  • Cloud Spanner stores the following data in each secondary index:
    • All key columns from the base table
    • All columns that are included in the index
    • All columns specified in the optional STORING clause of the index definition
  • You can add a new secondary index to an existing table while the database continues to serve traffic. Like any other schema changes in Cloud Spanner, adding an index to an existing database does not require taking the database offline and does not lock entire columns or tables.

[YugabyteDB]

  • CREATE INDEX CONCURRENTLY is not supported yet.

[Citus]

  • Supports CREATE INDEX CONCURRENTLY.

[Postgres-XL]

  • Unique indexes on distributed tables must contain the distribution column. For replicated tables, there's no such restriction.
  • Does not support CREATE INDEX CONCURRENTLY.

System columns

<Questions>

  • Q1: Which system columns are valid?

oid, tableoid [Postgres-XL]

  • Does not enforce OID integrity among the cluster. OIDs are assigned locally in each Coordinator and Datanode. You can use this in expressions but you should not expect OID values are the same throughout the XL cluster.
  • ctid is local to Coordinators and Datanodes. It is not good practice to use this value in SQL statements and can be very dangerous when using it to update data.

Foreign data

<Questions>

  • Q1: How can we fully utilize cluster nodes to access foreign data in parallel?

[Greenplum]

  • External tables access external files from within the database as if they are regular database tables.
  • External tables defined with the gpfdist/gpfdists, pxf, and s3 protocols utilize Greenplum parallelism by using the resources of all Greenplum Database segments to load or unload data.
    • The pxf protocol leverages the parallel architecture of the Hadoop Distributed File System to access files on that system.
    • The s3 protocol utilizes the Amazon Web Services (AWS) capabilities.
  • You can query external table data directly and in parallel using SQL commands such as SELECT, JOIN, or SORT EXTERNAL TABLE DATA, and you can create views for external tables.
  • Reading external table data in parallel from multiple Greenplum database segment instances, to optimize large load operations
    • PostgreSQL foreign-data wrappers connect only through the Greenplum Database master and do not access the Greenplum Database segment instances directly.

[Postgres-XL]

  • Not supported.

Query and data manipulation

[Oracle]

  • The coordinator only supports aggregate functions COUNT, SUM, MIN, MAX, and AVG.

[Citus]

  • Citus has 100% SQL support for queries which access a single node in the database cluster.
  • The following queries work in single-shard queries only:
    • SELECT FOR UPDATE: This feature is supported for hash distributed and reference tables only, and only those that have a replication_factor of 1.
    • TABLESAMPLE
    • Recursive CTEs
    • Grouping sets
  • Correlated subqueries are supported only when the correlation is on the Distribution Column and the subqueries conform to subquery pushdown rules (e.g., grouping by the distribution column, with no LIMIT or LIMIT OFFSET clause).
  • Aggregate functions
    • Supports and parallelizes most aggregate functions supported by PostgreSQL, including custom user-defined aggregates. Aggregates execute using one of three methods, in this order of preference:
      1. When the aggregate is grouped by a table’s distribution column, Citus can push down execution of the entire query to each worker. All aggregates are supported in this situation and execute in parallel on the worker nodes. (Any custom aggregates being used must be installed on the workers.)
      2. When the aggregate is not grouped by a table’s distribution column, Citus can still optimize on a case-by-case basis. Citus has internal rules for certain aggregates like sum(), avg(), and count(distinct) that allows it to rewrite queries for partial aggregation on workers. For instance, to calculate an average, Citus obtains a sum and a count from each worker, and then the coordinator node computes the final average.
      3. Last resort: pull all rows from the workers and perform the aggregation on the coordinator node. When the aggregate is not grouped on a distribution column, and is not one of the predefined special cases, then Citus falls back to this approach. It causes network overhead, and can exhaust the coordinator’s resources if the data set to be aggregated is too large. To avoid accidentally pulling data to the coordinator, you can set a GUC:
        • SET citus.coordinator_aggregation_strategy TO 'disabled';

[Postgres-XL]

  • The following are not allowed:
    • Modifying distribution column values
  • Aggregate functions
    • The aggregation works in two different modes.
      1. Two phased aggregation - is used when the entire aggregation takes place on the Coordinator node. In first phase called transition phase, Postgres-XL creates a temporary variable of data type stype to hold the current internal state of the aggregate. At each input row, the aggregate argument value(s) are calculated and the state transition function is invoked with the current state value and the new argument value(s) to calculate a new internal state value. After all the rows have been processed, in the second phase or finalization phase the final function is invoked once to calculate the aggregate's return value. If there is no final function then the ending state value is returned as-is.
      2. Three phased aggregation - is used when the process of aggregation is divided between Coordinator and Datanodes. In this mode, each Postgres-XL Datanode involved in the query carries out the first phase named transition phase. This phase is similar to the first phase in the two phased aggregation mode discussed above, except that, every Datanode applies this phase on the rows available at the Datanode. The result of transition phase is then transferred to the Coordinator node. Second phase called collection phase takes place on the Coordinator. Postgres-XL Coordinator node creates a temporary variable of data type stype to hold the current internal state of the collection phase. For every input from the Datanode (result of transition phase on that node), the collection function is invoked with the current collection state value and the new transition value (obtained from the Datanode) to calculate a new internal collection state value.
    • After all the transition values from data nodes have been processed, in the third or finalization phase the final function is invoked once to calculate the aggregate's return value. If there is no final function then the ending collection state value is returned as-is.
    • Postgres-XL planner chooses the cheapest feasible mode of the above two, during planning.
  • SELECT INTO distributes data of the newly-created table on all the nodes respecting the default distribution which is HASH on the first column having a type that can be distributed. If no columns are found, distribution is done by ROUNDROBIN.

Cursor

[Postgres-XL]

  • WHERE CURRENT OF clause is not supported.
  • LAST and BACKWARD is not supported.

Transaction

<Questions>

[Oracle]

  • The shard coordinator iterates over all primary shard databases, and invokes remote execution of the UPDATE statement. The coordinator starts a distributed transaction and performs two phase commit to guarantee the consistency of the distributed transaction.
  • In the case of an in-doubt transaction, you must recover it manually.
  • Oracle extends read consistency to multi-shard operations as well using global consistent read.
  • A multi-shard query must maintain global read consistency (CR) by issuing the query at the highest common SCN across all the shards.
  • You can use the initialization parameter MULTISHARD_QUERY_DATA_CONSISTENCY to set different consistency levels when executing multi-shard queries across shards.
    • For example, you might want some queries to avoid the cost of SCN synchronization across shards, and these shards could be globally distributed.
    • Another use case is when you use standbys for replication and slightly stale data is acceptable for multi-shard queries, as the results could be fetched from the primary and its standbys.
    • This parameter can be set either at the system level or at the session level.
    • MULTISHARD_QUERY_DATA_CONSISTENCY = { STRONG | SHARD_LOCAL | DELAYED_STANDBY_ALLOWED }
      • STRONG: With this setting, SCN synchronization is performed across all shards, and data is consistent across all shards. This setting provides global consistent read capability. This is the default value.
      • SHARD_LOCAL: With this setting, SCN synchronization is not performed across all shards. Data is consistent within each shard. This setting provides the most current data.
      • DELAYED_STANDBY_ALLOWED: With this setting, SCN synchronization is not performed across all shards. Data is consistent within each shard. This setting allows data to be fetched from Data Guard standby databases when possible (for example, depending on load balancing), and may return stale data from standby databases.

[MySQL Cluster]

  • Supports only the READ COMMITTED transaction isolation level.
  • NDB implements READ COMMITTED on a per-row basis; when a read request arrives at the data node storing the row, what is returned is the last committed version of the row at that time.
  • Uncommitted data is never returned, but when a transaction modifying a number of rows commits concurrently with a transaction reading the same rows, the transaction performing the read can observe the values, or both, for different rows among these, due to the fact that a given row read request can be processed either before or after the commit of the other transaction.
  • Not MVCC

[Spanner]

  • Spanner uses two-phase commit (2PC) and strict two-phase locking to ensure isolation and strong consistency.
  • Uses Paxos for replication
    • 2PC has been called the "anti-availability" protocol because all members must be up for it to work. Spanner mitigates this by having each member be a Paxos group, thus ensuring each 2PC member is highly available even if some of its Paxos participants are down.
    • Data is divided into groups that form the basic unit of placement and replication.
    • Transactions in Spanner will work as long as all of the touched groups have a quorum-elected leader and are on one side of the partition. This means that some transactions work perfectly and some will time out, but they are always consistent.
    • An implementation property of Spanner is that any reads that return are consistent, even if the transaction later aborts (for any reason, including time outs).
  • Uses 2PC
    • For a given transaction, Spanner assigns it the timestamp that Paxos assigns to the Paxos write that represents the transaction commit.
    • Transactions can be assigned timestamps at any time when all locks have been acquired, but before any locks have been released.
    • When a client has completed all reads and buffered all writes, it begins two-phase commit. The client chooses a coordinator group and sends a commit message to each participant leader with the identity of the coordinator and any buffered writes. Having the client drive two-phase commit avoids sending data twice across wide-area links.
    • A non-coordinator-participant leader first acquires write locks. It then chooses a prepare timestamp that must be larger than any timestamps it has assigned to previous transactions (to preserve monotonicity), and logs a prepare record through Paxos. Each participant then notifies the coordinator of its prepare timestamp.
    • The coordinator leader also first acquires write locks, but skips the prepare phase. It chooses a timestamp for the entire transaction after hearing from all other participant leaders. The commit timestamp s must be greater or equal to all prepare timestamps, greater than TT.now().latest at the time the coordinator received its commit message, and greater than any timestamps the leader has assigned to previous transactions (again, to preserve monotonicity).
    • The coordinator leader then logs a commit record through Paxos (or an abort if it timed out while waiting on the other participants).
    • Before allowing any coordinator replica to apply the commit record, the coordinator leader waits until TT.after(s), so as to obey the commit-wait rule. This wait is typically overlapped with Paxos communication.
    • After commit wait, the coordinator sends the commit timestamp to the client and all other participant leaders.
    • Each participant leader logs the transaction outcome through Paxos.
    • All participants apply at the same timestamp and then release locks.
  • In addition to normal transactions, Spanner supports snapshot reads, which are read at a particular time in the past.
    • Spanner maintains multiple versions over time, each with a timestamp, and thus can precisely answer snapshot reads with the correct version.
    • In particular, each replica knows the time for which it is caught up (for sure), and any replica can unilaterally answer a read before that time (unless it is way too old and has been garbage collected).
    • Similarly, it is easy to read (asynchronously) at the same time across many groups.
    • Snapshot reads do not need locks at all.
  • Read-only transactions are implemented as a snapshot read at the current time (at any up-to-date replica).
  • A read-write transaction in Cloud Spanner executes a set of reads and writes atomically at a single logical point in time. Furthermore, the timestamp at which read-write transactions execute matches wall clock time, and the serialization order matches the timestamp order.
  • Here are the isolation properties you get for a read-write transaction that contains a series of reads and writes:
    • All reads within that transaction return data from the same timestamp.
    • If a transaction successfully commits, then no other writer modified the data that was read in the transaction after it was read.
    • These properties hold even for reads that returned no rows, and the gaps between rows returned by range reads: row non-existence counts as data.
    • All writes within that transaction are committed at the same timestamp.
    • All writes within that transaction are only visible after the transaction commits.
  • Cloud Spanner provides serializability, which means that all transactions appear as if they executed in a serial order. Cloud Spanner assigns commit timestamps that reflect the order of committed transactions to implement this property.
  • Cloud Spanner offers a stronger guarantee than serializability called external consistency:
    • transactions commit in an order that is reflected in their commit timestamps, and these commit timestamps reflect real time so you can compare them to your watch.
    • Reads in a transaction see everything that has been committed before the transaction commits, and writes are seen by everything that starts after the transaction is committed.
  • Read
    • Strong reads are guaranteed to see the effects of all transactions that have committed before the start of the read.
    • Furthermore, all rows yielded by a single read are consistent with each other - if any part of the read observes a transaction, all parts of the read see the transaction.
    • Cloud Spanner provides a bound type for bounded staleness. Bounded staleness modes allow Cloud Spanner to pick the read timestamp, subject to a user- provided staleness bound. Cloud Spanner chooses the newest timestamp within the staleness bound that allows execution of the reads at the closest available replica without blocking.
    • Boundedly stale reads are not repeatable: two stale reads, even if they use the same staleness bound, can execute at different timestamps and thus return inconsistent results.
    • Bounded staleness reads are usually a little slower than comparable exact staleness reads.
    • Cloud Spanner provides a bound type for exact staleness. These timestamp bounds execute reads at a user-specified timestamp. Reads at a timestamp are guaranteed to see a consistent prefix of the global transaction history: they observe modifications done by all transactions with a commit timestamp less than or equal to the read timestamp, and observe none of the modifications done by transactions with a larger commit timestamp. They will block until all conflicting transactions that may be assigned commit timestamps less than or equal to the read timestamp have finished.
    • The timestamp can either be expressed as an absolute Cloud Spanner commit timestamp or a staleness relative to the current time.
    • Stale reads do not provide any latency benefits in regional configurations, so you should almost always use strong reads when your instance does not have a multi-region configuration.

[CockroachDB]

  • Transactions implement the strongest ANSI isolation level: serializable. This means that transactions will never result in anomalies.
  • It does so by combining the Raft consensus algorithm for writes and a custom time-based synchronization algorithms for reads.
    • Stored data is versioned with MVCC, so reads simply limit their scope to the data visible at the time the read transaction started.
    • Writes are serviced using the Raft consensus algorithm, a popular alternative to Paxos. A consensus algorithm guarantees that any majority of replicas together always agree on whether an update was committed successfully. Updates (writes) must reach a majority of replicas (2 out of 3 by default) before they are considered committed.
  • Timestamp cache
    • To ensure that a write transaction does not interfere with read transactions that start after it, CockroachDB also uses a timestamp cache which remembers when data was last read by ongoing transactions. The timestamp cache tracks the highest timestamp (i.e., most recent) for any read operation that a given range has served.
    • Each write operation in a BatchRequest checks its own timestamp versus the timestamp cache to ensure that the write operation has a higher timestamp; this guarantees that history is never rewritten and you can trust that reads always served the most recent data. It's one of the crucial mechanisms CockroachDB uses to ensure serializability. If a write operation fails this check, it must be restarted at a timestamp higher than the timestamp cache's value.
  • Transaction retry
    • Automatically retries individual statements (implicit transactions) and transactions sent from the client as a single batch, as long as the size of the results being produced for the client, including protocol overhead, is less than 16KiB by default. Once that buffer overflows, CockroachDB starts streaming results back to the client, at which point automatic retries cannot be performed any more.
  • Parallel commits
  • Uses HLC for transaction timestamps
    • While it's possible to rely entirely on Raft consensus to maintain serializability, it would be inefficient for reading data. To optimize performance of reads, CockroachDB implements hybrid-logical clocks (HLC) which are composed of a physical component (always close to local wall time) and a logical component (used to distinguish between events with the same physical component). This means that HLC time is always greater than or equal to the wall time.
    • In terms of transactions, the gateway node picks a timestamp for the transaction using HLC time. Whenever a transaction's timestamp is mentioned, it's an HLC value. This timestamp is used to both track versions of values (through multi-version concurrency control), as well as provide our transactional isolation guarantees.
    • When nodes send requests to other nodes, they include the timestamp generated by their local HLCs (which includes both physical and logical components). When nodes receive requests, they inform their local HLC of the timestamp supplied with the event by the sender. This is useful in guaranteeing that all data read/written on a node is at a timestamp less than the next HLC time.
    • This then lets the node primarily responsible for the range (i.e., the leaseholder) serve reads for data it stores by ensuring the transaction reading the data is at an HLC time greater than the MVCC value it's reading (i.e., the read always happens "after" the write).
  • Needs clock synchronization to preserve data consistency
    • When a node detects that its clock is out of sync with at least half of the other nodes in the cluster by 80% of the maximum offset allowed (500ms by default), it crashes immediately.
    • While serializable consistency is maintained regardless of clock skew, skew outside the configured clock offset bounds can result in violations of single-key linearizability between causally dependent transactions. It's therefore important to prevent clocks from drifting too far by running NTP or other clock synchronization software on each node.
    • The one rare case to note is when a node's clock suddenly jumps beyond the maximum offset before the node detects it. Although extremely unlikely, this could occur, for example, when running CockroachDB inside a VM and the VM hypervisor decides to migrate the VM to different hardware with a different time. In this case, there can be a small window of time between when the node's clock becomes unsynchronized and when the node spontaneously shuts down. During this window, it would be possible for a client to read stale data and write data derived from stale reads. To protect against this, we recommend using the server.clock.forward_jump_check_enabled and server.clock.persist_upper_bound_interval cluster settings.

[Yugabyte]

  • The transactions design is based on the Google Spanner architecture.
  • Strongly consistency of writes is achieved by using Raft consensus for replication and cluster-wide distributed ACID transactions using hybrid logical clocks.
  • Reads (queries) have strong consistency by default, but can be tuned dynamically to read from followers and read-replicas.
  • Supports two transaction isolation levels - SERIALIZABLE (which maps to the SQL isolation level of the same name) and SNAPSHOT (which is mapped to the SQL isolation level REPEATABLE READ). Even READ COMMITTED and READ UNCOMMITTED isolation levels are mapped to snapshot isolation.
  • In order to support these two isolation levels, the lock manager internally supports three types of locks:
    1. Snapshot isolation write lock: This type of a lock is taken by a snapshot isolation transaction on values that it modifies.
    2. Serializable read lock: This type of a lock is taken by serializable read-modify-write transactions on values that they read in order to guarantee they are not modified until the transaction commits.
    3. Serializable write lock: This type of a lock is taken by serializable transactions on values they write, as well as by pure-write snapshot isolation transactions. Multiple snapshot-isolation transactions writing the same item can thus proceed in parallel.
  • Just as YugabyteDB stores values written by single-shard ACID transactions into DocDB, it needs to store uncommitted values written by distributed transactions in a similar persistent data structure. However, we cannot just write them to DocDB as regular values, because they would then become visible at different times to clients reading through different tablet servers, allowing a client to see a partially applied transaction and thus breaking atomicity. YugabyteDB therefore writes provisional records to all tablets responsible for the keys the transaction is trying to modify. We call them "provisional" as opposed to "regular" ("permanent") records, because they are invisible to readers until the transaction commits.

[Azure Synapse]

  • No distributed transactions
  • No nested transactions permitted
  • The isolation level of the transactional support is default to READ UNCOMMITTED.
  • You can change it to READ COMMITTED SNAPSHOT ISOLATION by turning ON the READ_COMMITTED_SNAPSHOT database option for a user database when connected to the master database.

[Greenplum]

  • Serializable Snapshot Isolation (SSI) is not available.
  • READ UNCOMMITTED and READ COMMITTED behave like the standard READ COMMITTED.
  • REPEATABLE READ and SERIALIZABLE behave like REPEATABLE READ.
  • XID is a property of the database. Each segment database has its own XID sequence that cannot be compared to the XIDs of other segment databases.
  • The master coordinates distributed transactions with the segments using a cluster-wide session ID number, called gp_session_id. The segments maintain a mapping of distributed transaction IDs with their local XIDs.
  • The master coordinates distributed transactions across all of the segment with the two-phase commit protocol.

[Citus]

  • There is no notion of snapshot isolation across shards, which means that a multi-shard SELECT that runs concurrently with a COPY might see it committed on some shards, but not on others.
  • When updates/deletes affect multiple shards, Citus defaults to using a one-phase commit protocol. For greater safety you can enable two-phase commits by setting
    • SET citus.multi_shard_commit_protocol = '2pc';

[Postgres-XL]

  • Isolation level SERIALIZABLE is converted to REPEATABLE READ internally silently.

<References>

Sub-transactions and savepoints

[MySQL Cluster]

  • Savepoints and rollbacks to savepoints are ignored.
  • There are no partial transactions, and no partial rollbacks of transactions. A duplicate key or similar error causes the entire transaction to be rolled back.

[CockroachDB]

  • Supports nested transactions using SAVEPOINT.

[YugabyteDB]

  • Savepoint is not supported.

[Azure Synapse]

  • No save points allowed.

[Postgres-XL]

  • SAVEPOINT has not been supported.

Lock

  • We want to avoid a central node with heavy duty for lock management.

[MySQL Cluster]

  • No distributed table locks.
    • A LOCK TABLES works only for the SQL node on which the lock is issued; no other SQL node in the cluster recognizes this lock.
    • This is also true for a lock issued by any statement that locks tables as part of its operations.
  • ALTER TABLE is not fully locking when running multiple MySQL servers (SQL nodes).

[Greenplum]

  • By default Greenplum Database acquires the more restrictive EXCLUSIVE lock (rather than ROW EXCLUSIVE in PostgreSQL) for UPDATE, DELETE, and SELECT...FOR UPDATE operations on heap tables. When the Global Deadlock Detector is enabled the lock mode for UPDATE and DELETE operations on heap tables is ROW EXCLUSIVE.
  • Greenplum always holds a table-level lock for SELECT...FOR UPDATE statements.
  • The Greenplum Database Global Deadlock Detector background worker process collects lock information on all segments and uses a directed algorithm to detect the existence of local and global deadlocks. This algorithm allows Greenplum Database to relax concurrent update and delete restrictions on heap tables.
  • By default, the Global Deadlock Detector is disabled and Greenplum Database executes the concurrent update and delete operations on a heap table serially. You can enable these concurrent updates and have the Global Deadlock Detector determine when a deadlock exists by setting the server configuration parameter gp_enable_global_deadlock_detector.
  • When the Global Deadlock Detector is enabled, the background worker process is automatically started on the master host when you start Greenplum Database. You configure the interval at which the Global Deadlock Detector collects and analyzes lock waiting data via the gp_global_deadlock_detector_period server configuration parameter.
  • If the Global Deadlock Detector determines that deadlock exists, it breaks the deadlock by cancelling one or more backend processes associated with the youngest transaction(s) involved.
  • To view lock waiting information for all segments, run the gp_dist_wait_status() user-defined function. You can use the output of this function to determine which transactions are waiting on locks, which transactions are holding locks, the lock types and mode, the waiter and holder session identifiers, and which segments are executing the transactions.

[Spanner]

  • Cloud Spanner uses the standard "wound-wait" algorithm to handle deadlock detection.
    • Under the hood, Cloud Spanner keeps track of the age of each transaction that requests conflicting locks.
    • It also allows older transactions to abort younger transactions (where "older" means that the transaction's earliest read, query, or commit happened sooner).

[Citus]

  • To handle deadlocks that span across nodes, Citus adds a new distributed deadlock detector. This deadlock detector runs as a background worker process within the Postgres extension framework.
  • When a client sends a transaction to Citus, the coordinator sends the transaction to related worker nodes. Before sending the transaction however, the coordinator also calls the function SELECT assign_distributed_transaction_id(); This call ensures that the local transaction on the worker node is associated with the distributed transaction on the coordinator.
  • To detect a distributed deadlock, Citus needs to continuously monitor all nodes for processes that are waiting for locks for a non-negligible amount of time (e.g. 1 second). When this occurs, we collect the lock tables from all nodes and construct a directed graph of the processes that are waiting for each other across all the nodes. If there is a cycle in this graph, then there is a distributed deadlock. To end the deadlock, we need to proactively kill processes or cancel transactions until the cycle is gone.

[Postgres-XL]

  • Does not detect global deadlocks where multiple node (Coordinators and/or Datanodes) are involved. To get around this it is recommended to set statement_timeout to cause those statements to fail in a normal processing environment.
  • Advisory locks are local to each Coordinator or Datanode. If you wish to acquire an advisory lock on a different Coordinator, you should do it manually using EXECUTE DIRECT statement.

Procedures and functions

<Questions>

  • Q1: What should we care about procedure and function execution? Whether the execution is pushed down to remote nodes?

[Spanner]

  • Does not allow users to create functions or procedures.

[CockroachDB]

  • Does not allow users to create functions or procedures.

[YugabyteDB]

  • SQL, PL/pgSQL and C functions and procedures are supported by default.

[Greenplum]

  • Limited Use of VOLATILE and STABLE Functions: To prevent data from becoming out-of-sync across the segments in Greenplum Database, any function classified as STABLE or VOLATILE cannot be executed at the segment level if it contains SQL or modifies the database in any way. For example, functions such as random() or timeofday() are not allowed to execute on distributed data in Greenplum Database because they could potentially cause inconsistent data between the segment instances.
  • To ensure data consistency, VOLATILE and STABLE functions can safely be used in statements that are evaluated on and execute from the master. In cases where a statement has a FROM clause containing a distributed table and the function used in the FROM clause simply returns a set of rows, execution may be allowed on the segments.
  • You cannot return a refcursor from any kind of function.
  • A function with the EXECUTE ON MASTER attribute is executed only on the master segment and a function with the EXECUTE ON ALL SEGMENTS attribute is executed on all primary segment instances (not the master).
  • A function with the EXECUTE ON ANY (the default) indicates that the function can be executed on the master, or any segment instance, and it returns the same result regardless of where it is executed. Greenplum Database determines where the function executes.
  • Functions And Replicated Tables: A user-defined function that executes only SELECT commands on replicated tables can run on segments. It is safe for a function to read them on the segments, but updates to replicated tables must execute on the master instance.
  • The shared library files for user-created functions must reside in the same library path location on every host in the Greenplum Database array (masters, segments, and mirrors).

[Postgres-XL]

  • Only one SQL statement can be handled in a PL/pgSQL function.
  • In PL/pgSQL, SELECT INTO command is not supported yet.
  • It is highly recommended not to use VARIADIC in SQL functions. It is not well reviewed and may return wrong result.
  • Postgres-XL function usage currently requires a lot of care, otherwise unexepected results may occur, and you may bring your data to an inconsistent state. This behavior may change in a future version to make it safer.
    • A call such as SELECT my_function(1,2);, without any FROM clause, will execute on a local Coordinator, and may involve other Datanodes and behave as expected, being driven from a Coordinator.
    • A call such as SELECT col1, my_table_function(col2) FROM mytable will be pushed down to the datanodes involved. If my_table_function happens to do a SELECT, it will only be from data local to that node.
    • Similarly, if it executes an UPDATE, it will only update data local to that node. If the UPDATE writes to a replicated table for example, it would mean that the tables would be out of sync.

Various limitations

[MySQL Cluster]

  • MySQL Cluster does not support:
    • FULLTEXT indexing
    • Temporary tables

[Postgres-XL]

  • DELETE: with_query is not supported.
  • Large objects are not supported by Postgres-XL. Postgres-XL does not provide a consistent way to handle the large object as OIDs are inconsistent among cluster nodes.
  • Does not support NOTIFY, LISTEN and UNLISTEN commands yet.


Database administration

Prerequisite

<Questions>

  • Q1: Do we require or recommend the preparation of SSH to enable cluster-wide operations such as starting and stopping the cluster?
  • Q2: What do we say about time synchronization among cluster nodes by using NTP?

[CockroachDB]

  • It's important to prevent clocks from drifting too far by running NTP or other clock synchronization software on each node.
  • All nodes in the cluster must be synced to the same time source, or to different sources that implement leap second smearing in the same way.

[YugabyteDB]

  • Use ntp to synchronize time among the machines.
  • The maximum clock drift on any node should be bounded to no more than 500 PPM (or parts per million). This means that the clock on any node should drift by no more than 0.5 ms per second. Note that 0.5 ms per second is the standard assumption of clock drift in Linux.

[Greenplum]

  • passwordless SSH: The gpadmin user on each Greenplum host must be able to SSH from any host in the cluster to any other host in the cluster without entering a password or passphrase.

Deployment

<Questions>

  • Q1: What do we say about node autonomy? Do we need to allow the nodes with existing data to form a new cluster? Can a node that left the cluster continue to run standalone?
  • Q2: What do we say about the processing power of cluster nodes? Should we recommend that all nodes have equal capacity?
  • Q3: Do all the cluster nodes have the same database character set and locale settings? How about other settings like WAL segment size and block size?
  • Q4: Can we allow the mix of different DBMS versions in the cluster so that users can perform online rolling upgrade?
  • Q5: Do we allow or recommend shared storage to store data for all cluster nodes?
  • All nodes in the cluster have to:
    • have the same computer architecture (64-bit, endianness): this may allow the fast shard movement between nodes by simply moving files.
    • use the same OS version: this is necessary to prevent inconsistent query results caused by different glibc or ICU versions.

[Oracle]

  • All nodes must have the same operating system and the same version of Oracle.
  • They do not have to be of the same capacity. This saves on capital expenditures as this allows customers to buy servers with latest hardware configurations and use it alongside existing servers.
  • The database character set and national character set must be the same, because it is used for all of the shard databases. This means that the character set chosen must contain all possible characters that will be inserted into the shard catalog or any of the shards.

[MySQL Cluster]

  • Can mix different kinds of hardware and operating systems in one NDB Cluster, as long as all machines and operating systems have the same endianness (all big-endian or all little-endian).
  • It is also possible to use software from different NDB Cluster releases on different nodes. However, we support such use only as part of a rolling upgrade procedure.
  • NDB Cluster is currently designed with the intention that data nodes are homogeneous in terms of processor power, memory space, and bandwidth.
  • It is also worth noting that all data nodes should have the same amount of RAM, since no data node in a cluster can use more memory than the least amount available to any individual data node. For example, if there are four computers hosting Cluster data nodes, and three of these have 3GB of RAM available to store Cluster data while the remaining data node has only 1GB RAM, then each data node can devote at most 1GB to NDB Cluster data and indexes.

[Greenplum]

  • Greenplum Database's performance will be as fast as the slowest segment server in the array. Therefore, it is advised that all segment hosts in a Greenplum Database array have identical hardware resources and configurations.

Initialize cluster

<Questions>

  • Q1: What's the procedure for creating the cluster that is ready to start?
  • Q2: How are the initial cluster nodes specified, a command line option, configuration file, or SQL statement?

[Oracle]

  • The sharded database administrator defines the topology (regions, shard hosts, replication technology) and invokes the DEPLOY command with a declarative specification using the GDSCTL command-line interface.
    1. Create a database that will become the shard catalog along with any desired replicas for disaster recovery (DR) and high availability (HA).
    2. Create databases that will become the shards in the configuration including any standby databases needed for DR and HA.
    3. Specify the sharding topology using some or all the following commands from the GDSCTL command line utility: CREATE SHARDCATALOG, ADD GSM, START GSM, ADD SHARDSPACE, ADD SHARDGROUP, ADD SHARD, ADD INVITEDNODE
    4. Run DEPLOY to deploy the sharding topology configuration.
    5. Add the global services needed to access any shard in the sharded database.
  • The procedure is complex. For details, see Sharded Database Deployment.
  • Provides Terraform, Kubernetes, and Ansible scripts to automate and simplify the sharded database deployment operations.

[MySQL Cluster]

  1. On each data node and SQL node, create a my.cnf file that contains a connection string that tells the node where to find the management node.
  2. On the management node, create a config.ini file telling it how many replicas to maintain, how much memory to allocate for data and indexes on each data node, where to find the data nodes, where to save data to disk on each data node, and where to find any SQL nodes.
  3. Start the cluster.

[CockroachDB]

  • Run cockroach start to start the initial nodes, and then run cockroach init to perform a one-time initialization of the cluster.

$ cockroach start \
  --certs-dir=certs \
  --advertise-addr=<node1 address> \
  --join=<node1 address>,<node2 address>,<node3 address> \
  --cache=.25 \
  --max-sql-memory=.25 \
  --background
$ cockroach init
  --certs-dir=certs \
  --host=<address of any node>

[YugabyteDB]

  • There's no initialization. Starting the YB-Masters initializes the cluster.

[Greenplum]

    1. Create the data directory on the master host and segment hosts.
    2. Run gpinitsystem. gpinitsystem creates the primary master instance, standby master instance, and primary and mirror segment instances based on the user-supplied cluster configuration file. It uses SSH and initdb internally. Each segment instance is set up in parallel.
      • $ gpinitsystem -c gpconfigs/gpinitsystem_config -h gpconfigs/hostfile_gpinitsystem -s standby_master_hostname -S
    3. As a best practice, configure Greenplum Database and the host systems to use a known, supported timezone. Setting the Greenplum Database timezone prevents Greenplum Database from selecting a timezone each time the cluster is restarted and sets the timezone for the Greenplum Database master and segment instances.
      • $ gpconfig -c TimeZone -v 'US/Pacific'

[Citus]

  1. On every coordinator and worker node, do:
    1. Run initdb.
    2. Add 'citus' to shared_preload_libraries parameter.
    3. Start the instance and run "CREATE EXTENSION citus;".
  2. On coordinator, run the UDF to add the worker node information to the pg_dist_node catalog table.
    • SELECT * from master_add_node('worker-101', 5432);

[Postgres-XL]

  • The manual initialization steps are as follows. pgxc_ctl command eases these tasks by ssh'ing to remote nodes and performing necessary steps.
  1. Create the database cluster on every Coordinator and Datanode.
    • $ initdb -D /usr/local/pgsql/data --nodename foo
  2. Create the GTM master and GTM slave on different nodes.
    • $ initgtm -Z gtm -D /usr/local/pgsql/data_gtm
  3. Create the GTM-Proxy on every Coordinator and Datanode.
    • $ initgtm -Z gtm_proxy -D /usr/local/pgsql/data_gtm_proxy
  4. Set the following parameters in postgresql.conf in the Datanode's data directory.
    • max_connections, max_prepared_transactions
    • pgxc_node_name
    • gtm_host, gtm_port
  5. Set the following parameters in postgresql.conf in the Coordinator's data directory.
    • max_prepared_transactions
    • pgxc_node_name
    • gtm_host, gtm_port
    • pooler_port, max_pool_size, min_pool_size
    • max_coordinators, max_datanodes

Start/stop database instances

<Questions>

  • Q1: What can we use on Windows instead of SSH?
  • Provide a command to start and shutdown the entire cluster with one command. It uses SSH internally to run commands on remote nodes.

[Oracle]

  • The startup sequence is:
    1. Start the shard catalog database and local listener.
    2. Start the shard directors (GSMs).
    3. Start up the shard databases and local listeners.
    4. Start the global services.
    5. Start the connection pools and clients.
  • The shutdown sequence is the reverse.

[MySQL Cluster]

  • To start the cluster:
    1. On the management host, run "ndb_mgmd -f /var/lib/mysql-cluster/config.ini" to start the management node.
    2. On each data node host, run "ndbd" to start the data node.
    3. On each SQL host, run "mysqld_safe &" to start the MySQL server.
  • To stop the cluster:
    1. On the management host, run "ndb_mgm -e shutdown" to terminate the management nodes and data nodes gracefully.
    2. Any SQL nodes can be terminated using mysqladmin shutdown and other means.

[CockroachDB]

  • Start the nodes by running the cockroach start with a --join flag for all of the initial nodes in the cluster, so the process knows all of the other machines it can communicate with.

$ cockroach start \
  --certs-dir=certs \
  --advertise-addr=<node1 address> \
  --join=<node1 address>,<node2 address>,<node3 address> \
  --cache=.25 \
  --max-sql-memory=.25 \
  --background

To stop the node, send SIGTERM to the server process.

[YugabyteDB]

  • Start the YB-Masters on as many nodes as the replication factor (3 here.) When starting each yb-master, specify the addresses of all YB-Masters and its own listen addresses.

$ ./bin/yb-master \
  --master_addresses 172.151.17.130:7100,172.151.17.220:7100,172.151.17.140:7100 \
  --rpc_bind_addresses 172.151.17.130:7100 \
  --fs_data_dirs "/home/centos/disk1,/home/centos/disk2" \
  --placement_cloud aws \
  --placement_region us-west \
  --placement_zone us-west-2a \

Start the YB-TServers on more nodes than the replication factor. When starting each yb-tserver, specify the addresses of all YB-Masters, its own listen addresses for interconnect traffic, and the listen addresses for PostgreSQL clients.

$ ./bin/yb-tserver \
  --tserver_master_addrs 172.151.17.130:7100,172.151.17.220:7100,172.151.17.140:7100 \
  --rpc_bind_addresses 172.151.17.130:9100 \
  --start_pgsql_proxy \
  --pgsql_proxy_bind_address 172.151.17.130:5433 \
  --fs_data_dirs "/home/centos/disk1,/home/centos/disk2" \
  --placement_cloud aws \
  --placement_region us-west \
  --placement_zone us-west-2a \

[Greenplum]

  • The gpstart utility is used on the master instance to start the master and all of the segment instances in parallel.
  • gpstop stops all postgres processes in the system, including the master and all segment instances. The gpstop utility uses a default of up to 64 parallel worker threads to bring down the Postgres instances that make up the Greenplum Database cluster.

[Citus]

  • Start and stop each coordinator and workers using pg_ctl just like the vanilla PostgreSQL.

[Postgres-XL] Start the cluster in order of GTM, GTM-Proxy, Datanode, and Coordinator.

  1. $ gtm_ctl -Z gtm start -D /usr/local/pgsql/data_gtm
  2. $ gtm_ctl start -Z gtm_proxy -D /usr/local/pgsql/data_gtm_proxy
  3. $ postgres --datanode -D /usr/local/pgsql/data
  4. $ postgres --coordinator -D /usr/local/pgsql/coorddata

Coordinator and Datanode can be started with "pg_ctl start -Z {coordinator | datanode}" instead.

Stop the cluster in order of Coordinator, Datanode, GTM-Proxy and GTM.

  1. $ pg_ctl stop -Z coordinator -D /usr/local/pgsql/coorddata
  2. $ pg_ctl stop -Z datanode -D /usr/local/pgsql/data
  3. $ gtm_ctl stop -Z gtm_proxy -D /usr/local/pgsql/data_gtm_proxy
  4. $ gtm_ctl -Z gtm stop -D /usr/local/pgsql/data_gtm

pgxc_ctl command can start and stop all components with one command. It uses ssh to run necessary commands on remote nodes.

Server configuration

<Questions>

  • Q1: What parameters should be set to the same value on all database instances? How can we assure it?
  • Q2: Should we have a mechanism that manages parameter settings centrally and distributes them to the cluster members?
    • The management server stores a configuration file and passes the content when the cluster member joins the cluster, or
    • Provides a command and/or ALTER SYSTEM that modifies the configuration files on all nodes.
  • Q3: How can we modify the parameter setting online on all nodes? (the cluster-wide version of pg_ctl reload and pg_reload_conf())
  • SHOW and/or pg_settings display the parameter values of a particular or all database instances.

[Oracle]

  • When you configure system parameter settings at the shard catalog, they are automatically propagated to all shards of the sharded database. Propagation of system parameters happens only if done under ENABLE SHARD DDL on the shard catalog, then include SHARD=ALL in the ALTER statement.

alter session enable shard ddl;
alter system set enable_ddl_logging=true shard=all;

[MySQL Cluster]

  • The management server manages the cluster configuration file. Each node in the cluster retrieves the configuration data from the management server, and so requires a way to determine where the management server resides.

[CockroachDB]

  • Cluster settings apply to all nodes of a CockroachDB cluster and control, for example, whether or not to share diagnostic details with Cockroach Labs as well as advanced options for debugging and cluster tuning. They can be updated anytime after a cluster has been started, but only by a member of the admin role, to which the root user belongs by default.
    • SET CLUSTER SETTING sql.defaults.distsql = 1;
  • Changing a cluster setting is not instantaneous, as the change must be propagated to other nodes in the cluster.
  • Can display cluster-wide settings.
    • SHOW CLUSTER SETTING <setting>;
    • SHOW CLUSTER SETTINGS;

[YugabyteDB]

  • Although the value of a parameter can be set, displayed, and reset, the effect is not yet supported. The default settings and behaviors will be used for the moment.

[Greenplum]

  • The master and each segment instance have their own postgresql.conf file. Some parameters are local: each segment instance examines its postgresql.conf file to get the value of that parameter. Set local parameters on the master and on each segment instance.
  • Other parameters are master parameters that you set on the master instance. The value is passed down to (or in some cases ignored by) the segment instances at query run time.
  • To change a local configuration parameter across multiple segments, update the parameter in the postgresql.conf file of each targeted segment, both primary and mirror. Use the gpconfig utility to set a parameter in all Greenplum postgresql.conf files.
    • $ gpconfig -c gp_vmem_protect_limit -v 4096
  • SHOW lists the settings for the master instance only. To see the value of a particular parameter across the entire system (master and all segments), use the gpconfig utility.
    • $ gpconfig --show max_connections

[Postgres-XL]

  • Only shows local settings of the node to which client has connected.

Scaling cluster

<Questions>

  • Q1: How are the cluster nodes specified, a command line option, configuration file, or SQL statement?
  • Q2: Should adding or deleting a node always entail data redistribution, or separate them and allow leaving a new node empty?
  • Q3: How can we realize automatic scaling? What triggers automatic scaling (amount of data, server load, ...)?
  • Nodes should be able to be added and deleted online.
  • Adding and deleting a node should not affect applications that access other nodes.

[Oracle]

  • When you add a shard to a sharded database, if the environment is sharded by consistent hash, then chunks from existing shards are automatically moved to the new shard to rebalance the sharded environment. Automatic resharding is a feature of the system-managed sharding method that provides elastic scalability of a sharded database.
  • When using user-defined sharding, populating a new shard with data may require manually moving chunks from existing shards to the new shard using the GDSCTL split chunk and move chunk commands.
  • When you add a shard to the environment, verify that the standby server is ready, and after the new shard is in place take backups of any shards that have been involved in a move chunk operation.

[MySQL Cluster]

  • Online scaling is possible. (adding nodes for capacity and performance)
  • The redistribution for NDBCLUSTER tables already existing before the new data nodes were added is not automatic, but can be accomplished using simple SQL statements in mysql or another MySQL client application. However, all data and indexes added to tables created after a new node group has been added are distributed automatically among all cluster data nodes, including those added as part of the new node group.
  • Currently, you must add new data nodes to an NDB Cluster as part of a new node group. In addition, it is not possible to change the number of replicas (or the number of nodes per node group) online.
  • It is possible to add a new node group without all of the new data nodes being started. It is also possible to add a new node group to a degraded cluster - —that is, a cluster that is only partially started, or where one or more data nodes are not running. In the latter case, the cluster must have enough nodes running to be viable before the new node group can be added.
  • Normal DML operations using NDB Cluster data are not prevented by the creation or addition of a new node group, or by table reorganization.
  • It is not possible to perform DDL concurrently with table reorganization. That is, no other DDL statements can be issued while an ALTER TABLE ... REORGANIZE PARTITION statement is executing.
  • During the execution of ALTER TABLE ... REORGANIZE PARTITION (or the execution of any other DDL statement), it is not possible to restart cluster data nodes.
  • Adding data nodes online requires the following steps:
    1. Edit the cluster configuration config.ini file, adding new [ndbd] sections corresponding to the nodes to be added. In the case where the cluster uses multiple management servers, these changes need to be made to all config.ini files used by the management servers.
    2. Perform a rolling restart of all NDB Cluster management servers.
    3. Perform a rolling restart of all existing NDB Cluster data nodes.
    4. Perform a rolling restart of any SQL or API nodes connected to the NDB Cluster.
    5. Start the new data nodes. The new data nodes may be started in any order. They can also be started concurrently, as long as they are started after the rolling restarts of all existing data nodes have been completed, and before proceeding to the next step.
    6. Execute one or more CREATE NODEGROUP commands in the NDB Cluster management client to create the new node group or node groups to which the new data nodes will belong.
      • ndb_mgm> CREATE NODEGROUP 3,4

[Spanner]

  • Cloud Spanner optimizes performance by automatically sharding the data based on request load and size of the data.
    • Cloud Spanner splits data based on load: it adds split boundaries automatically when it detects high read or write load spread among many keys in a split. You have some control over how your data is split because Cloud Spanner can only draw split boundaries between rows of tables that are at the root of a hierarchy (that is, tables that are not interleaved in a parent table).
    • Rows of an interleaved table cannot be split from their corresponding row in their parent table because the rows of the interleaved table are stored in sorted primary key order together with the row from their parent table that shares the same primary key prefix.
    • Suppose your database contains a table with 10 rows that are read more frequently than all of the other rows in the table. As long as that table is at the root of the database hierarchy (in other words, it's not an interleaved table), Cloud Spanner can add split boundaries between each of those 10 rows so that they're each handled by a different server, rather than allowing all the reads of those rows to consume the resources of a single server.

[CockroachDB]

  • Adding capacity is as easy as pointing a new node at the running cluster. CockroachDB scales horizontally without reconfiguration or need for a massive architectural overhaul. Simply add a new node to the cluster and CockroachDB takes care of the underlying complexity.
    • Scale by simply adding new nodes to a CockroachDB cluster
    • Automate balancing and distribution of ranges, not shards
    • Optimize server utilization evenly across all nodes
  • When your cluster spans multiple nodes (physical machines, virtual machines, or containers), newly split ranges are automatically rebalanced to nodes with more capacity. CockroachDB communicates opportunities for rebalancing using a peer-to-peer gossip protocol by which nodes exchange network addresses, store capacity, and other information.
  • A node is considered to be decommissioned when it meets two criteria:
    1. The node has completed the decommissioning process.
    2. The node has been stopped and has not updated its liveness record for the duration configured via server.time_until_store_dead, which defaults to 5 minutes.
  • The decommissioning process transfers all range replicas on the node to other nodes. During and after this process, the node is considered "decommissioning" and continues to accept new SQL connections. Even without replicas, the node can still function as a gateway to route connections to relevant data.
  • Before decommissioning a node, make sure other nodes are available to take over the range replicas from the node. If no other nodes are available, the decommissioning process will hang indefinitely.

[YugabyteDB]

  • Just start a new YB-TServer by pointing to YB-Masters.
  • Existing data is automatically redistributed.

[Azure Synapse]

  • To perform a scale operation, SQL pool first kills all incoming queries and then rolls back transactions to ensure a consistent state. Scaling only occurs once the transaction rollback is complete. For a scale operation, the system detaches the storage layer from the compute nodes, adds compute nodes, and then reattaches the storage layer to the Compute layer. Each SQL pool is stored as 60 distributions, which are evenly distributed to the compute nodes.

[Greenplum]

  • Expanding Greenplum Database can be performed when the system is up and available.
  • The time required depends on the number of schema objects in the Greenplum system and other factors related to hardware performance. In most environments, the initialization of new segments requires less than thirty minutes offline.
  • The gpexpand utility performs system expansion in two phases: segment instance initialization and then table data redistribution.
    • $ gpexpand -f /home/gpadmin/new_hosts_file
    • In the initialization phase, gpexpand runs with an input file that specifies data directories, dbid values, and other characteristics of the new segment instances. Creates an expansion schema named gpexpand in the postgres database to store the status of the expansion operation, including detailed status for tables.
    • In the table data redistribution phase, gpexpand redistributes table data to rebalance the data across the old and new segment instances.

[Citus]

  • Add nodes by calling the master_add_node UDF with the hostname (or IP address) and port number of the new node.
    • SELECT * from master_add_node('node_name', 5432);
  • The new node is available for shards of new distributed tables. Existing shards will stay where they are unless redistributed, so adding a new worker may not help performance without further steps.
  • It also copies reference tables to the new node.
  • To rebalance existing shards from the older nodes to the new node, Citus provide a shard rebalancer utility. For maximum control, the choice of when to run the shard rebalancer is left to the database administrator. Citus does not automatically rebalance on node creation.
  • Remove nodes by calling the master_remove_node UDF with the hostname (or IP address) and port number of the node. The shards have to be deleted or moved to other nodes beforehand.
    • SELECT * from master_remove_node('node_name', 5432);
[Postgres-XL]
  • Add a data node online using the pgxc_ctl's "add datanode" command.
    • PGXC$ add datanode master dn3 localhost 40003 40013 $dataDirRoot/dn_master.3 none none none
    • During cluster reconfiguration, all outstanding transactions are aborted and sessions are reset. So you would typically see errors like these on open sessions:
      • ERROR: canceling statement due to user request <==== pgxc_pool_reload() resets all sessions and aborts all open transactions
    • Existing data is not moved to the new data node automatically. To redistribute data, you need to run ALTER TABLE ADD NODE for each table.
  • Remove a data node online using the pgxc_ctl's "remove datanode" command.
    • Does not employ any additional checks to ascertain if the datanode being dropped has data from tables that are replicated/distributed. It is the responsibility of the user to ensure that it's safe to remove a datanode.
    • ALTER TABLE disttab DELETE NODE (dn3);
    • PGXC$ remove datanode master dn3 clean
  • Coordinators are added/removed by pgxc_ctl's add/remove coordinator command. It also cancels all transactions and resets sessions, and causes a "canceling statement due to user request" error.

Data redistribution

<Questions>

  • Q1: What's the unit of data migration?
  • Q2: How can users identify which shards need to be moved to other nodes? Which shards are hot spots?
  • Q3: How do we relocate data online between nodes quickly with minimal impact on application performance?
    • Can we copy files not records? Can we just rename files when the relevant nodes use the shared storage?
    • Can we eliminate WAL emission per data record?
  • Q4: Can we notify applications of the data movement and new destination so that they can automatically connect to the best node?
  • Allow automatic and manual data movement.

[Oracle]

  • The unit of data migration between shards is a chunk. A chunk is a set of tablespaces that store corresponding partitions of all tables in a table family. A chunk contains a single partition from each table of a set of related tables. This guarantees that related data from different sharded tables can be moved together. The number of chunks within each shard is specified when the SDB is created.
  • Oracle Enterprise Manager Cloud Control can be used to help identify chunks that would be good candidates to move, or split and move to the new shard.
  • Sharding MOVE CHUNK commands use Oracle Data Pump internally to move transportable tablespaces from one shard to another.
  • A particular chunk can also be moved from one shard to another, when data or workload skew occurs, without any change in the number of shards. In this case, chunk migration can be initiated by the database administrator to eliminate the hot spot.
  • RMAN Incremental Backup, Transportable Tablespace, and Oracle Notification Service technologies are used to minimize impact of chunk migration on application availability. A chunk is kept online during chunk migration. There is a short period of time (a few seconds) when data stored in the chunk is available for read-only access only.
  • FAN-enabled clients receive a notification when a chunk is about to become read-only in the source shard, and again when the chunk is fully available in the destination shard on completion of chunk migration. When clients receive the chunk read-only event, they can either repeat connection attempts until the chunk migration is completed, or access the read-only chunk in the source chunk. In the latter case, an attempt to write to the chunk will result in a run-time error.
  • Running multi-shard queries while a sharded database is resharding can result in errors, so it is recommended that you do not deploy new shards during multi-shard workloads.
  • If the shard is only temporarily removed, keep track of the chunks moved to each shard so that they can be easily identified and moved back once the maintenance is complete.
  • Any time a chunk is moved from one shard to another, you should make a full backup of the databases involved in the operation (both the source of the chunk move, and the target of the chunk move.)
  • Oracle Sharding supports the online split of a chunk. Theoretically it is possible to have a single chunk for each shard and split it every time data migration is required. However, even though a chunk split does not affect data availability, the split is a time-consuming and CPU-intensive operation because it scans all of the rows of the partition being split, and then inserts them one by one into the new partitions. For composite sharding, it is time consuming and may require downtime to redefine new values for the shard key or super shard key. Therefore, it is recommended that you pre-create multiple chunks on each shard and split them either when the number of chunks is not big enough for balanced redistribution of data during re-sharding, or a particular chunk has become a hot spot.

[MySQL Cluster]

  • Redistribute the cluster's data among all data nodes as follows:
    1. Issue an "ALTER TABLE ... ALGORITHM=INPLACE, REORGANIZE PARTITION" statement in the mysql client for each table.
    2. ALTER TABLE ... REORGANIZE PARTITION ALGORITHM=INPLACE reorganizes partitions but does not reclaim the space freed on the old nodes. You can do this by issuing, for each table, an OPTIMIZE TABLE statement in the mysql client.
  • The redistribution with "ALTER TABLE ... REORGANIZE PARTITION" does not currently include unique indexes (only ordered indexes are redistributed).

[Spanner]

  • Spanner automatically reshards data across machines as the amount of data or the number of servers changes, and it automatically migrates data across machines (even across datacenters) to balance load and in response to failures.
  • Spanner will shard a directory into multiple fragments if it grows too large. Fragments may be served from different Paxos groups (and therefore different servers). Movedir actually moves fragments, and not whole directories, between groups.
  • When data is moved between Paxos groups, it is moved directory by directory.
  • Spanner might move a directory to shed load from a Paxos group; to put directories that are frequently accessed together into the same group; or to move a directory into a group that is closer to its accessors.
  • Directories can be moved while client operations are ongoing.
  • Movedir is not implemented as a single transaction, so as to avoid blocking ongoing reads and writes on a bulky data move. Instead, movedir registers the fact that it is starting to move data and moves the data in the background. When it has moved all but a nominal amount of the data, it uses a transaction to atomically move that nominal amount and update the metadata for the two Paxos groups.
  • One could expect that a 50MB directory can be moved in a few seconds.

[CockroachDB]

  • Distributed transactions proceed without downtime or additional latency while rebalancing is underway. You can even move tables - or entire databases - between data centers or cloud infrastructure providers while the cluster is under load.

[YugabyteDB]

  • The existing tablets are automatically redistributed among all YB-TServers.

[Greenplum]

  • Data redistribution should be performed during low-use hours. Redistribution can be divided into batches over an extended period.
  • To begin the redistribution phase, run gpexpand with either the -d (duration) or -e (end time) options, or with no options. If you specify an end time or duration, then the utility redistributes tables in the expansion schema until the specified end time or duration is reached. If you specify no options, then the utility redistribution phase continues until all tables in the expansion schema are reorganized. Each table is reorganized using ALTER TABLE commands to rebalance the tables across new segments.
  • When planning the redistribution phase, consider the impact of the ACCESS EXCLUSIVE lock taken on each table, and the table data redistribution method. User activity on a table can delay its redistribution, but also tables are unavailable for user activity during redistribution.
  • There are two methods of redistributing data when performing a Greenplum Database expansion.
    1. rebuild - Create a new table, copy all the data from the old to the new table, and replace the old table. This is the default. The rebuild method is similar to creating a new table with a CREATE TABLE AS SELECT command. During data redistribution, an ACCESS EXCLUSIVE lock is acquired on the table.
    2. move - Scan all the data and perform an UPDATE operation to move rows as needed to different segment instances. During data redistribution, an ACCESS EXCLUSIVE lock is acquired on the table. In general, this method requires less disk space, however, it creates obsolete table rows and might require a VACUUM operation on the table after the data redistribution. Also, this method updates indexes one row at a time, which can be much slower than rebuilding the index with the CREATE INDEX command.
  • Because the gpexpand utility must re-index each indexed table after redistribution, a high level of indexing has a large performance impact.
  • After creating an expansion schema, you can redistribute tables across the entire system with gpexpand. Plan to run this during low-use hours when the utility's CPU usage and table locks have minimal impact on operations. Rank tables to redistribute the largest or most critical tables first.
  • For large systems, you can control the table redistribution order. Adjust tables' rank values in the expansion schema to prioritize heavily-used tables and minimize performance impact.
  • ALTER TABLE provides options to change a table's distribution policy. When the table distribution options change, the table data may be redistributed on disk, which can be resource intensive. You can also redistribute table data using the existing distribution policy.
    • ALTER TABLE sales SET DISTRIBUTED BY (customer_id);

[Citus]

  • To move existing shards to a newly added worker, connect to the cluster coordinator node and run:
    • SELECT rebalance_table_shards('distributed_table_name');
    • If the table name is omitted, all shards are balanced.
  • The rebalance_table_shards function rebalances all tables in the colocation group of the table named in its argument. Thus you do not have to call it for every single table, just call it on a representative table from each colocation group.
  • Applications do not need to undergo downtime during shard rebalancing.
    • On >= PostgreSQL 10, Citus shard rebalancing uses PostgreSQL logical replication to move data from the old shard (called the "publisher" in replication terms) to the new (the "subscriber." Logical replication allows application reads and writes to continue uninterrupted while copying shard data. Citus puts a brief write-lock on a shard only during the time it takes to update metadata to promote the subscriber shard as active.
    • On < PostgreSQL 10 where logical replication is not supported, fall back to a less efficient solution: locking a shard for writes as we copy it to its new location. Unlike logical replication, this approach introduces downtime for write statements (although read queries continue unaffected).

[Postgres-XL]

  • Use ALTER TABLE to add or delete a node as the target of a distributed table.
    • ALTER TABLE disttab ADD NODE (dn3);
    • ALTER TABLE disttab DELETE NODE (dn1);
  • It is done in 3 or 4 steps. An ACCESS EXCLUSIVE lock is held on the target table.
    1. Data is saved on Coordinator by fetching all the data with COPY TO command. At this point all the tuples are saved using a tuple store.
    2. The table is truncated on all the nodes.
    3. Catalogs are updated.
    4. Finally data inside the tuple store is redistributed using an internal COPY FROM mechanism. REINDEX is issued if necessary.

Data import and export

  • Parallelize COPY FROM to push data asynchronously to shards so that multiple cluster nodes can work in parallel.
  • Enable COPY FROM/TO to instruct every node in the cluster to read or write files on its local storage, thus parallelizing file input and output. Here, the local storage is assumed to be typically locally mounted remote storage such as NFS, Ceph, and cloud storage.
  • Aim for automatically continuing the import and export when one cluster node fails over, because it's a disaster to have to start over.

[Oracle]

  • Loading the data directly into the database shards in parallel by running Data Pump on each shard is recommended because it's much faster. Loading the data through the sharding coordinator is slower than loading the entire data set into a non-sharded table, because of the splitting logic running on the sharding coordinator (catalog) node and additional overhead of pushing the data to the shards.

[MySQL Cluster]

  • LOAD DATA is not transactional when used on NDB tables. When executing a LOAD DATA statement, the NDB engine performs commits at irregular intervals that enable better utilization of the communication network. It is not possible to know ahead of time when such commits take place.

[Cloud Spanner]

  • Uses Dataflow, a managed service for transforming and enriching data, to export and import individual database to a set of Avro or CSV files in a Cloud Storage bucket, and import data from those files. Each table data is split into multiple export files.

[CockroachDB]

  • IMPORT accepts multiple files in CSV, delimited, and Avro formats. To maximize import performance, it's recommended to specify as many input files on the cloud storage as nodes so that all cluster nodes can import data in parallel.
  • The EXPORT statement exports tabular data or the results of arbitrary SELECT statements to CSV files. Using the CockroachDB distributed execution engine, EXPORT parallelizes CSV creation across all nodes in the cluster, making it possible to quickly get large sets of data out of CockroachDB in a format that can be ingested by downstream systems.
  • You can specify the base directory where you want to store the exported .csv files. CockroachDB will create the export file(s) in the specified directory with programmatically generated names (e.g., n1.1.csv, n1.2.csv, n2.1.csv, ...).

EXPORT INTO CSV
  'azure://acme-co/customer-export-data?AZURE_ACCOUNT_KEY=hash&AZURE_ACCOUNT_NAME=acme-co'
  WITH delimiter = '|' FROM TABLE bank.customers;

[YugabyteDB]

  • Provides PostgreSQL's COPY command, but has no parallel feature.

[Greenplum]

  • The COPY command transfers data between a text file on the master host, or multiple text files on segment hosts, and a table.
  • Readable external tables allow you to query data outside of the database directly and in parallel using SQL commands such as SELECT, JOIN, or SORT EXTERNAL TABLE DATA, and you can create views for external tables. External tables are often used to load external data into a regular database table using a command such as CREATE TABLE table AS SELECT * FROM ext_table.

[Citus]

  • A powerful feature of COPY for distributed tables is that it asynchronously copies data to the workers over many parallel connections, one for each shard placement. This means that data can be ingested using multiple workers and multiple cores in parallel. Especially when there are expensive indexes such as a GIN, this can lead to major performance boosts over ingesting into a regular PostgreSQL table.

Log management

<Questions>

  • Q1: How can the events on different nodes be associated? Should we add some marker to log_line_prefix that shows the operation, source session, or a combination of them?
  • Q2: Should we gather some events such as cluster reconfiguration, data movement, and DDL execution to the server log on a central node? Do we leave it up to external log management software such as syslog?
  • Q3: Should we provide a feature that gathers log files on all nodes?

[Oracle]

  • Master shard director (GSM) trace/alert files include status and errors on any and all asynchronous commands or background tasks (move chunk, split chunk, deploy, shard registration, Data Guard configuration, shard DDL execution, etc.)

[MySQL Cluster]

  • Event logs are of the two types listed here:
    • Cluster log: Keeps a record of all desired reportable events for the cluster as a whole. Under normal circumstances, it is necessary and sufficient to keep and examine only the cluster log.
    • Node log: A separate log which is also kept for each individual node. Node logs are intended to be used only during application development, or for debugging application code.
  • When interesting events occur in the data nodes, the nodes transfer information about these events to the management server, which then writes the information to the cluster log.

[CockroachDB]

  • The "cockroach debug zip" command collects log files from each active node into a single file (inactive nodes are not included).

[Greenplum]

  • You can identify related log entries of a particular query by the query's session identifier (gp_session_id) and command identifier (gp_command_count).

Monitoring

  • The cluster member nodes and node groups
  • Each node's in-memory view of the current cluster state
  • Cluster-wide total sizes of databases, tables, and indexes
  • Data distribution among nodes
  • Load distribution among nodes: transactions and reads/writes to find hot spots
  • Data redistribution/movement progress
  • Nodes accessed by session
  • Connections from other nodes
  • Inter-node communication, e.g. count and amount of send/recv per session/database
  • Wait events for remote operations
  • Distributed transaction count
  • Locks with node info
  • Logging of detected distributed deadlocks
  • Cluster-wide statistics views (pg_gstat_*) like Oracle's GV$ views

[Oracle]

  • The Shard Load Map shows how transactions are distributed among the shards.
  • The shard catalog database can be used as the entry point for centralized diagnostic operations using the SQL SHARDS() clause. The SHARDS() clause allows you to query the same Oracle supplied objects, such as V$, DBA/USER/ALL views and dictionary objects and tables, on all of the shards and return the aggregated results. A virtual column called SHARD_ID is automatically added to a SHARDS()-wrapped object during execution of a multi-shard query to indicate the source of every row in the result.

select ORA_SHARD_ID, INSTANCE_NAME from SHARDS(sys.v_$instance);

[MySQL Cluster]

[CockroachDB]

  • Provides round-trip latencies between all nodes in the cluster.

[Greenplum]

  • Includes an optional system monitoring and management database, gpperfmon.
  • The gp_toolkit schema contains views that show the cluster-wide total sizes of databases, tables, and indexes.
  • Provides the system column gp_segment_id to enable seeing the data distribution of a table's rows (the number of rows on each segment)

SELECT gp_segment_id, count(*) 
FROM table_name GROUP BY gp_segment_id;

[Citus]

  • pg_dist_node table contains information about the worker nodes in the cluster.
  • pg_total_relation_size() drastically under-reports the size of distributed tables. This function reveals the size of tables on the coordinator node. The following functions are available instead.
    • citus_total_relation_size(relation_name)
    • citus_relation_size(relation_name)
    • citus_table_size(relation_name)
  • Provides the following views to watch queries and locks throughout the cluster, including shard-specific queries used internally to build results for distributed queries.
    • citus_dist_stat_activity: shows the distributed queries that are executing on all nodes.
    • citus_worker_stat_activity: shows queries on workers, including fragment queries against individual shards.
    • citus_lock_waits: Blocked queries throughout the cluster.
  • citus_stat_statements shows which queries work on a single node and which on multiple nodes.

SELECT sum(calls),
    partition_key IS NOT NULL AS single_tenant
FROM citus_stat_statements
GROUP BY 2;

Maintenance operations

<Questions>

  • Q1: Do we want to allow cluster-wide operations to be run from any node?
  • Q2: Does the autovacuum launcher on an idle node need to be sensitive to updates on other nodes to prevent XID wraparound? Can each node be independent in that regard?
  • Q3: Should CHECKPOINT has an option to do the work only on the local node? (Oracle RAC has such LOCAL clause.)
  • These commands work on all shards in parallel if a table or index is targeted, not a particular shard.
    • VACUUM
    • ANALYZE
    • REINDEX
    • TRUNCATE
    • CLUSTER
  • CHECKPOINT runs the checkpoint/restartpoint processing on all nodes in parallel.

[YugabyteDB]

  • The ACCESS EXCLUSIVE locking option is not yet fully supported.

[Citus]

  • Propagates the ANALYZE command to all worker node placements.
  • Using vacuum against a distributed table will send a vacuum command to every one of that table’s placements (one connection per placement). This is done in parallel.
  • VACUUM commands without a table specified do not propagate to worker nodes.
  • VACUUM's VERBOSE option is not supported.

[Postgres-XL]

  • A manual VACUUM will be pushed down to all the Datanodes as well.
  • CLUSTER will be executed on all the Datanodes as well.
  • CHECKPOINT is performed at the local Coordinator and all of the underlying Datanodes.

Modifying a schema

<Questions>

  • Q1: Where can schema changes be performed and how are they propagated across the cluster?
  • Q2: What if some cluster nodes are down when a DDL statement is run?
  • Q3: Is there any schema modification to be prohibited?
  • Q4: How can we minimize the required downtime for schema updates and keep applications reading and writing data?

[Oracle]

  • Making changes to duplicated tables or sharded tables should be done from the shard catalog database.
  • If more granular control is needed you can issue the command directly on each shard.
  • If you perform an operation that requires a lock on a table, such as adding a not null column, it is important to remember that each shard needs to obtain the lock on the table in order to perform the DDL operation.

[MySQL Cluster]

  • Online DROP COLUMN operations are not supported.
  • The table being altered is not locked with respect to API nodes other than the one on which an online ALTER TABLE ADD COLUMN, ADD INDEX, or DROP INDEX operation (or CREATE INDEX or DROP INDEX statement) is run. However, the table is locked against any other operations originating on the same API node while the online operation is being executed.
  • The tablespace used by the table cannot be changed online.

[Spanner]

  • Schema updates in Cloud Spanner do not require downtime. When you issue a batch of DDL statements to a Cloud Spanner database, you can continue writing and reading from the database without interruption while Cloud Spanner applies the update as a long-running operation.
  • Schema updates that do not require Cloud Spanner to validate existing data can happen in minutes. Schema updates that require validation can take longer, depending on the amount of existing data that needs to be validated, but data validation happens in the background at a lower priority than production traffic.
  • Some schema updates can change the behavior of requests to the database before the schema update completes. For example, if you're adding NOT NULL to a column, Cloud Spanner almost immediately begins rejecting writes for new requests that use NULL for the column. If the new schema update ultimately fails for data validation, there will have been a period of time when writes were blocked, even if they would have been accepted by the old schema.
  • Cloud Spanner uses schema versioning so that there is no downtime during a schema update to a large database. Cloud Spanner maintains the older schema version to support reads while the schema update is processed. Cloud Spanner then creates one or more new versions of the schema to process the schema update. Each version contains the result of a collection of statements in a single atomic change.
  • Schema versions can consume significant server and storage resources, and they persist for up to a week. Avoid more than 30 DDL statements that require validation or index backfill in a given 7-day period, because each statement creates multiple versions of the schema internally.
  • TrueTime enables Spanner to support atomic schema changes.
    • It would be infeasible to use a standard transaction, because the number of participants (the number of groups in a database) could be in the millions.
    • A Spanner schema-change transaction is a generally non-blocking variant of a standard transaction.
    • First, it is explicitly assigned a timestamp in the future, which is registered in the prepare phase. As a result, schema changes across thousands of servers can complete with minimal disruption to other concurrent activity.
    • Second, reads and writes, which implicitly depend on the schema, synchronize with any registered schema-change timestamp at time t: they may proceed if their timestamps precede t, but they must block behind the schema-change transaction if their timestamps are after t. Without TrueTime, defining the schema change to happen at t would be meaningless.

[CockroachDB]

  • Our solution for maintaining a consistent distributed schema cache and consistent table data embraces the concurrent use of multiple versions of the schema, allowing the rollout of a new schema while the older version is still in use. It backfills (or deletes) the underlying table data without holding locks. This solution is derived from the work done by the F1 team at Google.
  • How online schema changes are possible in CockroachDB

[Citus]

  • Citus propagates schema changes from the coordinator node to the workers using a two-phase commit protocol.
  • Some DDL statements require manual propagation, and certain others are prohibited such as those which would modify a distribution column.

[Postgres-XL]

  • The following are not allowed:
    • Dropping distribution column

Terminating a session

<Questions>

  • Q1: Can a session always be forcibly terminated within a reasonable amount of time when the control is passed to a remote node?
  • Q2: How are the associated inter-node connections returned to the connection pool when a session is forcibly terminated?

Canceling a query

<Questions>

  • Q1: Can a query always be canceled within a reasonable amount of time when the control is passed to a remote node?
  • Q2: How are the associated inter-node connections returned to the connection pool when a query is canceled?

Software update, upgrade and downgrade

Want to enable:

  • rolling hardware/software maintenance: stop, patch, and restart nodes one by one
  • change the capacity (CPU, RAM) of a particular VM instance or container without affecting the whole cluster
  • upgrade software without shutting down the whole cluster

<Questions>

  • Q1: Do we allow a cluster configuration that includes different PostgreSQL major and minor versions?
  • Q2: Do we allow downgrade and/or reverting to earlier minor versions just in case the latest minor version has a critical bug?
  • Q3: How do we distinguish a planned maintenance that takes a little long and a failure that needs to create new replicas on other nodes?

[Oracle]

  • Update
    • oPatchAuto in rolling mode automates patches and upgrades.
    • Most patches can be applied to a single shard at a time.
    • Some patches should be applied across all shards.
    • If a patch addresses an issue with multi-shard queries, replication, or the sharding infrastructure, it should be applied across all shards.
  • Upgrade
    • The shard catalog must be upgraded first, followed by the shard directors, and finally the shards.
    • The shard catalog and shards are upgraded in a rolling manner in their respective Data Guard configurations.
    • Stop, upgrade, and restart all shard director servers one at a time. To ensure zero downtime, at least one shard director server should always be running.
  • Oracle Sharding does not support downgrading.

[MySQL Cluster]

  • All management nodes must be upgraded before any data nodes are upgraded.
  • SQL nodes can be upgraded before or after upgrading the management nodes, data nodes, or both.
  • Features specific to the new version must not be used until all management nodes and data nodes have been upgraded.

[CockroachDB]

  • Users need to upgrade only one node at a time, and wait at least one minute after a node rejoins the cluster to upgrade the next node. Simultaneously upgrading more than one node increases the risk that ranges will lose a majority of their replicas and cause cluster unavailability.

[YugabyteDB]

  • The basic flow is to upgrade each YB-Master and YB-TServer one at a time, verifying after each step from the yb-master Admin UI that the cluster is healthy and the upgraded process is back online.
  • Pause ~60 seconds between the upgrades of nodes.

[Citus]

  • If upgrading both Citus and Postgres, always be sure to upgrade the Citus extension first, and the PostgreSQL version second. They must not be upgraded at once.
  • The basic upgrade flow is, back up Citus metadata in the old coordinator, create a new database cluster with the new version, stop the old server, run pg_upgrade, start the new server, and restore Citus metadata. For details, see Upgrading Citus.

[Postgres-XL]

  • pg_dumpall and psql can be used to upgrade Postgres-XL just like PostgreSQL.
  • Support for pg_upgrade is not tested in Postgres-XL.


Migration

<Questions>

  • Q1: How can we migrate a single-server database to a scale-out database and vice versa?
  • Q2: How can we migrate data from one scale-out database to another efficiently?

[Oracle]

  • The Sharding Advisor tool connects to the existing non-sharded database, analyzes its schema and query workload, and produces a set of alternative designs for the sharded database, including recommendations for an effective sharding key, which tables to shard, and which tables to duplicate on all shards.
  • Oracle Data Pump is sharding aware and can do parallel data export and import when migrating data from a non-sharded Oracle database to a sharded Oracle database.

[MySQL Cluster]

  • ndb_import program imports data in a CSV-formatted file directly into an NDB table using the NDB API without connecting to a MySQL Server. Although this is not described in the context of migration, it seems to be fast and help migration.

[Spanner]

  • Uses Dataflow, a managed service for transforming and enriching data, to export and import individual database to a set of Avro or CSV files in a Cloud Storage bucket, and import data from those files. Each table data is split into multiple export files.

[CockroachDB]

  • The user can use pg_dump to dump a database or an individual table into an SQL script file, and run IMPORT statement to load the data. CockroachDB does not support type or function definitions, or loading data from non-public schema, so the user has to remove or modify those in the SQL script.
  • IMPORT accepts multiple files in CSV, delimited, and Avro formats. To maximize import performance, it's recommended to specify as many input files on the cloud storage as nodes so that all cluster nodes can import data in parallel.

[YugabyteDB]

  • The user can use pg_dump or ysql_dump, which is derived from pg_dump, to dump a database or an individual table into an SQL script file, and run ysqlsh to load the data.
  • COPY TO and COPY FROM can be used to export and import a table just like the vanilla PostgreSQL.

[Greenplum]

  • pg_dump, pg_dumpall, psql, and COPY can be used just like PostgreSQL.
  • COPY with ON SEGMENT clause has each segment instance read or write its data to a separate file on the segment host. The example syntax is "COPY table [TO|FROM] '<SEG_DATA_DIR>/gpdumpname<SEGID>_suffix' ON SEGMENT;".
  • You can use the gpcopy utility to transfer data between databases in different Greenplum Database clusters. gpcopy is a high-performance utility that can copy metadata and data from one Greenplum database to another Greenplum database. You can migrate the entire contents of a database, or just selected tables. The clusters can have different Greenplum Database versions.

[Citus]

  • Since Citus is deployed as a PostgreSQL extension, PostgreSQL users can often start using Citus by simply installing the extension on their existing database. Once the extension is created, you can create and use distributed tables through standard PostgreSQL interfaces while maintaining compatibility with existing PostgreSQL tools.
  • If an existing PostgreSQL database is converted into the coordinator node for a Citus cluster, the data in its tables can be distributed efficiently and with minimal interruption to an application. The create_distributed_table function described earlier works on both empty and non-empty tables, and for the latter it automatically distributes table rows throughout the cluster. Writes on the table are blocked while the data is migrated, and pending writes are handled as distributed queries once the function commits. (If the function fails then the queries become local again.) Reads can continue as normal and will become distributed queries once the function commits.


Exploiting Postgres-XL code

<Questions>

  • Q1: What does Postgres-XL do other than the atomic commit and global visibility?
  • Q2: Which part of Postgres-XL code can we reuse?

<What could possibly be reused>

  • Node management
    • CREATE/ALTER/DROP NODE statements
    • CREATE/DROP NODE GROUP statements
  • DDL propagation across cluster nodes
  • Data sharding and placement
    • Distributed table
    • Replicated table
    • Data redistribution
    • CREATE/ALTER TABLE, CREATE TABLE AS statements
  • System catalog
    • pgxc_node: cluster nodes
    • pgxc_group: cluster node groups
    • pgxc_class: distribution or replication method and placement
  • Inter-node communication
    • Connection pooling
    • CLEAN CONNECTION statement
  • Sequence
    • Cluster-wide centralized sequence management (in GTM)
    • CREATE/ALTER/DROP SEQUENCE statements
    • Sequence manipulation: nextval, currval, setval, etc.
  • Transaction management
    • Two-phase commit across cluster nodes
  • Backup and recovery
    • CREATE BARRIER statement: create a consistent point on all cluster nodes
  • Query processing
    • Locator: determine nodes over which given table is replicated or distributed
    • FQS (Fast Query Shipping): decide whether the query can be executed on Datanodes directly without involving Coordinator
    • Distributed query planning and execution
      • Pushdown of join, ORDER BY, LIMIT, GROUP BY
      • Aggregate across data nodes
    • Updating replicated tables
  • Database administration
    • initdb
    • pgxc_ctl
      • Initialize and configure the cluster based upon the configuration definition
      • Add and remove master and slave nodes in the cluster
      • Start and stop the cluster
      • Perform failover of each node
      • Monitor the health of the cluster nodes
    • pgxc_monitor
    • pg_dump, pg_dumpall
      • Including node definitions (pg_dumpall only)
      • Including table distribution parameters into table definitions
    • COPY statement: COPY TO collects data from datanodes and COPY FROM distributes data to datanodes
    • PAUSE/UNPAUSE CLUSTER statements: block and unblock new transactions for performing maintenance operations
    • EXPLAIN statement: output distributed query plan
    • Vacuum: trigger autovacuum even on idle nodes to prevent XID wraparound

<What would not be reused>

  • Process management
    • GTM, GTM_proxy
    • Separation of coordinator and data node
  • Transaction management (in GTM)
    • Assigning XIDs
    • Providing snapshots
  • Utility
    • initgtm, gtm, gtm_proxy, gtm_ctl
    • pg_dump, pg_dumpall
      • Obtain sequence values not only from sequence relations but GTM

<References>