PostgreSQL并行查询

简介

当前计算机的单个CPU的运算能力没有显著增长,但一个CPU有越来越多的CPU核心,充分利用这些CPU核,才能有效的提高性能。另一方面,随着SSD的发展普及,数据库越来越多地采用该种存储,传统的基于磁盘的存储在查找数据时,存在较高的IO延迟,而SSD能够使IO延迟大大减小(尤其是随机读写)。因此,提升数据库性能的关键在于提升CPU的运算能力,即充分利用多CPU的运算能力。

PG的多会话任务虽然可以利用操作系统的调度策略,但是单个任务却只能最多使用一个CPU和一个I/O通道,并且,这种调度PG也无法控制,不够灵活。数据库要处理的单个会话任务的复杂程度急剧增加(处理更复杂的多表join,聚合、排序和大表扫描任务),单个任务的处理能力越来越成为了数据库任务处理的瓶颈。

从PG9.6开始,引入了并行的特征。PG的并行查询功能主要由PostgreSQL社区的核心开发者Robert Haas等人开发。PostgreSQL的并行查询在大数据量(中间结果在GB以上)的join、merge场合,效果比较明显。效果上,因为系统开销,投入的资源跟性能提升并不是线性关系,比如增加4个worker,性能可能提升2倍左右,而不是4倍。

并行查询的场景主要有以下3种:

1. 并行扫描

PG9.6 唯一一种被修改用于并行查询的扫描类型是顺序扫描。因此在并行计划中的驱动表总是被使用并行顺序扫描进行扫描。关系的块将被划分给合作进程。一次发放一个文件块,这样对于关系的访问仍然保持为顺序访问。在请求一个新页面之前,每一个进程将访问分配给它的页面上的每一个元组。

2. 并行连接

驱动表将被使用嵌套循环或者哈希连接到一个或多个其他表。在连接的外侧可以是任何一种被 planner 支持安全地并行 worker 中运行的非并行计划。例如,它可以是一个索引扫描,基于从内表取得的一列来茶轴值。每个 worker 都将会完整地执行外侧的计划。归并连接的外侧常常涉及到排序整个内表,即便使用索引,多次在内表上进行完全索引扫描也效率不高,这也是为什么这里不能支持归并连接的原因。

3. 并行聚集

将查询的聚集部分整个第并行执行是不可能。例如,如果一个查询涉及到选择 count(*) ,每个 worker 可以计算一个总和,但是这些总和需要被整合在一起产生最终的答案。如果一个计划涉及到 group by 子句,为每个组需要计算出一个单独的总和。即使聚集不能完全第并行执行,但是涉及聚集的查询常常是并行查询很好的候选,因为它们通常是读很多行但只返回少数几行给客户端。返回很多行给客户端的查询常常受制于客户端读取数据的速度,这种情况下并行查询帮助不大。

PG通过做两次聚集来支持并行聚集。第一次,每个参与查询计划并行执行部分的进程执行一个聚集步骤,为进程发现的每个分组产生一个部分结果。这在计划中反映为一个 partial aggreagate 节点。第二次,部分结果被通过 gather 节点传输给 leader。最后,leader 对所有工作者的部分结果进行重聚集以得到最终的结果,这在计划中反映为一个 finalize aggreaget 节点。

并行聚集并不能支持所有的情况。每个聚集对于并行机制一定要是安全的,并且必须有一个结合函数。如果聚集有一个internal 类型的状态转移,它必须有序列化和反序列化。对于有序聚集或者查询设计 grouping sets 时不支持并行聚集。只有当查询中涉及的所有连接也是计划中并行的一部分时,才能使用并行聚集。

章 15. 并行查询 
Chapter 15. Parallel Query  

并行查询设计思路

1. 执行框架的基础设施

基础设施包括容错机制等,主进程需要了解自己工作进程的执行状态,处理工作进程执行过程中发生的任何错误,还有动态工作进程,动态共享内存API等工作进程消息处理。

1)动态共享内存,提供了几种底层的实现选择(不同的OS选择不同),通过参数 dynamic_shared_memory_type 配置;

2)共享内存消息队列 shm_mq,用于通过共享内存传递数据和状态。通过核心函数 shm_mq_receive,可以看到无论是数据还是错误消息都通过改机制来同步。

3)主进程同步给工作进程相关的各种会话信息
动态库 RestoreLibraryState()
用户信息,用户登录的DB BackgroundWorkerIntilizeConnectionByOid()
当前会话中的GUC参数,并行SQL所在的事务信息和状态 RestoreGUCState()
快照信息 RestoreSnapshot()
ComboCID信息 RestoreComboCIDState()

为了让工作进程完成部分工作,需要装载主进程的很多上下文信息,这也意味着并行模式需要承担一定的代价。

2. 优化器-代价模型

在传统的代价模型基础上增加计算并行执行路径的代价数据,优化器能够输出并行执行计划,增加并行执行节点相关的path、plan,用于存放并行相关的代价信息。实现原理为:新增并行相关的节点的执行path,并填充它们准确的cost,让它们参与到动态规划或遗传算法的迭代计算中,最终如果并行相关path最优,则创建完整的执行计划交给执行器执行。

1)新增代价类型
parallel_setup_cost
并行计划启动代价,对应工作进程的创建和上下文信息的传递所需要的代价。它也说明只有需要一定工作量的复杂SQL才有必要使用并行方式执行。
parallel_tuple_cost 
主进程和工作进程间传递数据是需要消耗资源的,这取决于实现它的方式,目前消耗的资源多是内存拷贝和tuple的重组和解析。上述代价是并行执行模型需要考虑的,结合统计信息中表上的其他信息,能预估出对应表或join使用并行模型执行时的代价。

2)cost_seqscan 
顺序扫描采取了并行的执行方式,需要计算并行模式的代价。顺序扫描的代价分为3个部分:startup_cost + cpu_run_cost + disk_run_cost,并行模式下CPU和disk被分担到了多个工作进程中,每个工作进程处理整个表中的一部分数据,相应的代价被重新评估。
create_parallel_paths
适合并行的表创建并行path,并填充cost。
standard_planner 
并行模式并不适合所有查询,做逻辑优化阶段需要关掉并行计划的计算。
随着工作进程能承担的工作越多,更多的执行节点可以让工作进程完成,在优化器中需要做适当的节点下推(push down)。

3. 多进程数据交换-共享内存

进程间同步数据的机制,目前的实现是开辟共享内存,发送和接收数据的格式和形式也需要设计。这部分底层使用共享内存在一个OS中,在多个独立进程间同步数据。在实现上又抽象成了消息队列的形式,用于工作进程和主进程间同步数据。

表上的数据(tuple)和错误消息被封装成“消息”的形式发送给主进程,核心函数 shm_mq_sendv shm_mq_receive,底层实现是通过共享内存上用 memcpy 来做的。

4. 并行相关执行节点

动态启动多个工作进程,把查询计划中部分任务下发给它们执行。需要重组目前传统的执行器流程,也就是在目前执行器上面添加用户并行处理的执行节点:1)并行扫描节点;2)数据发送接收节点。

执行器的工作主要是改造传统的逐层迭代方式以支持并行执行方式,当然是在重用之前代码的基础上,几个关键的实现是:
1)ExecGather 添加用于接收工作进程发送数据的节点,内部调用了底层 shm_mq 模块中的 API;
2)在工作进程空间中,添加流程 ParallelQueryMain 用于工作进程完成工作并把数据通过 shm_mq 发送给主进程;
3)改造顺序扫描执行节点和下层的存取节点,支持按照 blocknum 为单位并行扫描同一个表。核心函数 heap_parallelscan_nextpage,它决定当前工作进程扫描任务是如何分配的。

该部分的工作重用了大量的旧的流程,但这和之前的执行器的工作模式有本质的区别,大量任务在独立的进程空间中由OS并行的调度执行,它们用 shm_mq 传递数据。

PgSQL · 答疑解惑 · PostgreSQL 9.6 并行查询实现分析​ 

并行计算相关参数

max_worker_processes (integer)
该参数决定了整个数据库允许启动多少个work process,如果设置为0,便是不允许并行。

Sets the maximum number of background processes that the system can support. This parameter can only be set at server start. The default is 8.

When running a standby server, you must set this parameter to the same or higher value than on the master server. Otherwise, queries will not be allowed in the standby server.

When changing this value, consider also adjusting max_parallel_workers and max_parallel_workers_per_gather.

max_parallel_workers_per_gather (integer) 
该参数决定每个Gather node最多允许启动多少个 work process。

Sets the maximum number of workers that can be started by a single Gather or Gather Merge node. Parallel workers are taken from the pool of processes established by max_worker_processes, limited by max_parallel_workers. Note that the requested number of workers may not actually be available at run time. If this occurs, the plan will run with fewer workers than expected, which may be inefficient. The default value is 2. Setting this value to 0 disables parallel query execution.

Note that parallel queries may consume very substantially more resources than non-parallel queries, because each worker process is a completely separate process which has roughly the same impact on the system as an additional user session. This should be taken into account when choosing a value for this setting, as well as when configuring other settings that control resource utilization, such as work_mem. Resource limits such as work_mem are applied individually to each worker, which means the total utilization may be much higher across all processes than it would normally be for any single process. For example, a parallel query using 4 workers may use up to 5 times as much CPU time, memory, I/O bandwidth, and so forth as a query which uses no workers at all.

parallel_setup_cost (floating point) 
表示启动worker process的启动成本,因为启动 worker 进程需要建立共享内容等操作,属于附带的额外成本。

Sets the planner's estimate of the cost of launching parallel worker processes. The default is 1000.

parallel_tuple_cost (floating point) 
worker进程处理完后的 tuple 要传输给上层node,即进程间的row 交换成本,按 node 评估的输出rows 来乘。

Sets the planner's estimate of the cost of transferring one tuple from a parallel worker process to another process. The default is 0.1.

costsize.c

double		parallel_tuple_cost = DEFAULT_PARALLEL_TUPLE_COST;
double		parallel_setup_cost = DEFAULT_PARALLEL_SETUP_COST;

/*
 * cost_gather
 *      Determines and returns the cost of gather path.
 *
 * 'rel' is the relation to be operated upon
 * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL
 * 'rows' may be used to point to a row estimate; if non-NULL, it overrides
 * both 'rel' and 'param_info'.  This is useful when the path doesn't exactly
 * correspond to any particular RelOptInfo.
 */
void
cost_gather(GatherPath *path, PlannerInfo *root,
            RelOptInfo *rel, ParamPathInfo *param_info,
            double *rows)
{
    Cost        startup_cost = 0;
    Cost        run_cost = 0;

    /* Mark the path with the correct row estimate */
    if (rows)
        path->path.rows = *rows;
    else if (param_info)
        path->path.rows = param_info->ppi_rows;
    else
        path->path.rows = rel->rows;

    startup_cost = path->subpath->startup_cost;

    run_cost = path->subpath->total_cost - path->subpath->startup_cost;

    /* Parallel setup and communication cost. */
    startup_cost += parallel_setup_cost;
    run_cost += parallel_tuple_cost * path->path.rows;

    path->path.startup_cost = startup_cost;
    path->path.total_cost = (startup_cost + run_cost);
}

min_parallel_table_scan_size(integer)
表的大小,也作为是否启用并行计算的条件,如果小于它,不启用并行计算。但还有其他条件决定是否启用并行,所以并不是小于它的表就一定不会启用并行。

Sets the minimum amount of table data that must be scanned in order for a parallel scan to be considered. For a parallel sequential scan, the amount of table data scanned is always equal to the size of the table, but when indexes are used the amount of table data scanned will normally be less. The default is 8 megabytes (8MB).

allpaths.c

/*
 * create_plain_partial_paths
 *	  Build partial access paths for parallel scan of a plain relation
 *	  构建并行扫描的局部访问路径
 */
static void
create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel)
{
	int			parallel_workers;

	parallel_workers = compute_parallel_worker(rel, rel->pages, -1);

	/* If any limit was set to zero, the user doesn't want a parallel scan. */
	if (parallel_workers <= 0)
		return;

	/* Add an unordered partial path based on a parallel sequential scan. */
	add_partial_path(rel, create_seqscan_path(root, rel, NULL, parallel_workers));
}

/*
 * Compute the number of parallel workers that should be used to scan a
 * relation.  We compute the parallel workers based on the size of the heap to
 * be scanned and the size of the index to be scanned, then choose a minimum
 * of those.
 *
 * "heap_pages" is the number of pages from the table that we expect to scan, or
 * -1 if we don't expect to scan any.
 *
 * "index_pages" is the number of pages from the index that we expect to scan, or
 * -1 if we don't expect to scan any.
 * 
 * 计算用于扫描关系的并行worker的数量,计算并行worker的数量是基于要扫描的heap表的
 * 大小和要扫描的index的大小,选取最小的一个
 */
int
compute_parallel_worker(RelOptInfo *rel, double heap_pages, double index_pages)
{
    int            parallel_workers = 0;

    /*
     * If the user has set the parallel_workers reloption, use that; otherwise
     * select a default number of workers.
     *
     * 如果设置了表级的parallel_workers参数,则直接使用这个作为并行度。 
     * 如果没有设置表级并行度参数,则使用表的大小计算出一个合适的并行度 
     */
    if (rel->rel_parallel_workers != -1)
        parallel_workers = rel->rel_parallel_workers;
    else
    {
        /*
         * If the number of pages being scanned is insufficient to justify a
         * parallel scan, just return zero ... unless it's an inheritance
         * child. In that case, we want to generate a parallel path here
         * anyway.  It might not be worthwhile just for this relation, but
         * when combined with all of its inheritance siblings it may well pay
         * off.
         *
         * 如果要扫描的 pages 小于 min_parallel_table_scan_size 或者 min_parallel_index_scan_size,
         * 不启用并行。
         */
        if (rel->reloptkind == RELOPT_BASEREL &&
            ((heap_pages >= 0 && heap_pages < min_parallel_table_scan_size) ||
             (index_pages >= 0 && index_pages < min_parallel_index_scan_size)))
            return 0;

        if (heap_pages >= 0)
        {
            int            heap_parallel_threshold;
            int            heap_parallel_workers = 1;

            /*
             * Select the number of workers based on the log of the size of
             * the relation.  This probably needs to be a good deal more
             * sophisticated, but we need something here for now.  Note that
             * the upper limit of the min_parallel_table_scan_size GUC is
             * chosen to prevent overflow here.
             *
             * 根据表的大小计算出需要开多大的并行。
             */
            heap_parallel_threshold = Max(min_parallel_table_scan_size, 1);
            while (heap_pages >= (BlockNumber) (heap_parallel_threshold * 3))
            {
                heap_parallel_workers++;
                heap_parallel_threshold *= 3;
                if (heap_parallel_threshold > INT_MAX / 3)
                    break;        /* avoid overflow */
            }

            parallel_workers = heap_parallel_workers;
        }

        if (index_pages >= 0)
        {
            int            index_parallel_workers = 1;
            int            index_parallel_threshold;

            /* same calculation as for heap_pages above */
            index_parallel_threshold = Max(min_parallel_index_scan_size, 1);
            while (index_pages >= (BlockNumber) (index_parallel_threshold * 3))
            {
                index_parallel_workers++;
                index_parallel_threshold *= 3;
                if (index_parallel_threshold > INT_MAX / 3)
                    break;        /* avoid overflow */
            }

            if (parallel_workers > 0)
                parallel_workers = Min(parallel_workers, index_parallel_workers);
            else
                parallel_workers = index_parallel_workers;
        }
    }

    /*
     * In no case use more than max_parallel_workers_per_gather workers.
     *
     * 根据计算出的并行度值,与max_parallel_workers_per_gather参数比较,取小的。就是需要开启的并行度。  
     */
    parallel_workers = Min(parallel_workers, max_parallel_workers_per_gather);

    return parallel_workers;
}

force_parallel_mode (enum)
强制开启并行

Allows the use of parallel queries for testing purposes even in cases where no performance benefit is expected. The allowed values of force_parallel_mode are off (use parallel mode only when it is expected to improve performance), on (force parallel query for all queries for which it is thought to be safe), and regress (like on, but with additional behavior changes as explained below).

More specifically, setting this value to on will add a Gather node to the top of any query plan for which this appears to be safe, so that the query runs inside of a parallel worker. Even when a parallel worker is not available or cannot be used, operations such as starting a subtransaction that would be prohibited in a parallel query context will be prohibited unless the planner believes that this will cause the query to fail. If failures or unexpected results occur when this option is set, some functions used by the query may need to be marked PARALLEL UNSAFE (or, possibly, PARALLEL RESTRICTED).

Setting this value to regress has all of the same effects as setting it to on plus some additional effects that are intended to facilitate automated regression testing. Normally, messages from a parallel worker include a context line indicating that, but a setting of regresssuppresses this line so that the output is the same as in non-parallel execution. Also, the Gather nodes added to plans by this setting are hidden in EXPLAIN output so that the output matches what would be obtained if this setting were turned off.

parallel_workers (integer) 
表级参数,可以在建表时设置,也可以后期设置。

-- 设置表级并行度
alter table test set (parallel_workers=0);

-- 关闭表的并行
alter table test set (parallel_workers=0);

-- 重置参数,那么在create_plain_partial_paths中会通过表的pages计算出一个合理的并行度
alter table test reset (parallel_workers);

19.4. Resource Consumption 
19.7. Query Planning 
PostgreSQL 9.6 并行计算 优化器算法浅析 

实验

PostgreSQL · 实现分析 · PostgreSQL 10.0 并行查询和外部表的结合 
PgSQL · 特性分析 · PostgreSQL 9.6 让多核并行起来 
PostgreSQL 如何让 列存(外部列存) 并行起来 
PostgreSQL并行查询介绍 
PostgreSQL Parallel Execution 

分享到: 更多