Greenplum负载管理之Resource Queue

Greenpum的负载管理主要用到resource queue,resource queue是在GP 4.0 以后加入的特性,主要实现查询、资源的管理。

配置

一般资源队列配置

$ vi postgresql.conf
...
#---------------------------------------------------------------------------
# RESOURCE SCHEDULING
#---------------------------------------------------------------------------

max_resource_queues = 150       # no. of resource queues to create.
#max_resource_portals_per_transaction = 64  # no. of portals per backend.
#resource_select_only = on      # resource lock SELECT queries only.
#resource_cleanup_gangs_on_wait = on    # Cleanup idle reader gangs before
                                        # resource lockwait.
gp_resqueue_memory_policy = 'eager_free'    # memory request based queueing. 
                                    # eager_free, auto or none
...

max_resource_queues – Sets the maximum number of resource queues.  
配置资源队列的最大数量
max_resource_portals_per_transaction – Sets the maximum number of simultaneously open cursors allowed per transaction. Note that an open cursor will hold an active query slot in a resource queue.  
配置一个事务中同时可打开的最大游标数量
resource_select_only – If set to on, then SELECT, SELECT INTO, CREATE TABLE AS SELECT, and DECLARE CURSOR commands are evaluated. If set to off INSERT, UPDATE, and DELETE commands will be evaluated as well.  
如果配置为on,则则只统计 select 语句信息
resource_cleanup_gangs_on_wait – Cleans up idle segment worker processes before taking a slot in the resource queue.  
在使用resource queue 之前,清空闲置的worker进程
stats_queue_level – Enables statistics collection on resource queue usage, which can then be viewed by querying the pg_stat_resqueues system view.  
开启统计

以下配置用于内存使用

gp_resqueue_memory_policy – Enables Greenplum memory management features. In Greenplum Database 4.2 and later, the distribution algorithm eager_free takes advantage of the fact that not all operators execute at the same time. The query plan is divided into stages and Greenplum Database eagerly frees memory allocated to a previous stage at the end of that stage's execution, then allocates the eagerly freed memory to the new stage.  
GP的内存使用策略,当配置为 eager_free 的时候,不是所有的操作符同时执行,而是分阶段执行,前一阶段执行完毕以后,释放所占用的内存,供后面阶段使用。
statement_mem and max_statement_mem – Used to allocate memory to a particular query at runtime (override the default allocation assigned by the resource queue). max_statement_mem is set by database superusers to prevent regular database users from over-allocation.
gp_vmem_protect_limit – Sets the upper boundary that all query processes can consume that should not exceed the amount of physical memory of a segment host. When a segment host reaches this limit during query execution, the queries that cause the limit to be exceeded will be cancelled.
所有查询进程所占用的内存不能超过这个值
gp_vmem_idle_resource_timeout and gp_vmem_protect_segworker_cache_limit – used to free memory on segment hosts held by idle database processes. Administrators may want to adjust these settings on systems with lots of concurrency. 
shared_buffers – Sets the amount of memory a Greenplum server instance uses for shared memory buffers. This setting must be at least 128 kilobytes and at least 16 kilobytes times max_connections. The value must not exceed the operating system shared memory maximum allocation request size, shmmax on Linux.  
一个GP实例的共享内存

以下配置用于查询优先级

以下的配置参数必须在所有master和segment节点中的postgresql.conf文件
gp_resqueue_priority – The query prioritization feature is enabled by default.  
查询优先级默认启动
gp_resqueue_priority_sweeper_interval – Sets the interval at which CPU usage is recalculated for all active statements. The default value for this parameter should be sufficient for typical database operations.  
重新计算CPU使用的间隔时间
gp_resqueue_priority_cpucores_per_segment – Specifies the number of CPU cores allocated per segment instance. The default value is 4 for the master and segments. For Greenplum Data Computing Appliance Version 2, the default value is 4 for segments and 25 for the master.  
指定每个segment实例的CPU内核数

有关配置选项的文件

src/backend/utils/misc/guc.c
src/backend/utils/misc/guc_gp.c

资源队列使用

资源队列主要为了降低并发query之间的资源争抢,为一种资源管理方法。支持的资源隔离级别:
active_statements  该queue同时可以运行的query数量;
max_cost                资源组内所有正在运行的query的成本的最大值;
cost_overcommit   当系统空闲时,是否允许该queue的query总cost超出设定的max_cost;
min_cost                 对于该值的query不计入该queue的cost成本,也不排队,而是直接执行;
mem_limit              为队列中单个segment query允许的最大statement运行内存;
priority                   用于平衡各个queue之间的CPU争抢使用,分为5个等级,分别为:MIN|LOW|MEDIUM|HIGH|MAX,每个等级设定了相应的weight,间隔一定的时间判断使用的资源是否达到了weight,然后改queue的query使用pg_usleep进行抑制;

设置资源队列

testDB=# \h create resource queue
Command:     CREATE RESOURCE QUEUE
Description: create a new resource queue for workload management
Syntax:
CREATE RESOURCE QUEUE name WITH (queue_attribute=value [, ... ]) 
where queue_attribute is:
   ACTIVE_STATEMENTS=integer
        [ MAX_COST=float [COST_OVERCOMMIT={TRUE|FALSE}] ]
        [ MIN_COST=float ]
        [ PRIORITY={MIN|LOW|MEDIUM|HIGH|MAX} ]
        [ MEMORY_LIMIT='memory_units' ]
|  MAX_COST=float [ COST_OVERCOMMIT={TRUE|FALSE} ] 
        [ ACTIVE_STATEMENTS=integer ]
        [ MIN_COST=float ]
        [ PRIORITY={MIN|LOW|MEDIUM|HIGH|MAX} ]
        [ MEMORY_LIMIT='memory_units' ]

-- 创建一个队列只有限制最大的活动SQL数
testDB=# create resource queue myqueue with (ACTIVE_STATEMENTS=20);

-- 创建一个队列加上内存限制,如果同时设置了一下两个参数,则每个语句允许的内存为 MEMORY_LIMIT/ACTIVE_STATEMENTS = 10MB
testDB=# create resource queue myqueue1 with (ACTIVE_STATEMENTS=20,MEMORY_LIMIT='200MB');

-- 设置最大查询代价
testDB=# create resource queue myqueue2 with (MAX_COST=3000);

-- 设置最大查询代价为3000,不允许超过该代价,最小查询代价为500
testDB=# create resource queue myqueue3 with (MAX_COST=3000,COST_OVERCOMMIT=FALSE,MIN_COST=500);
-- 当COST_OVERCOMMIT为false时,只要超过cost这个值就会报错,当为true时,在当前的资源队列中,没有其他SQL运行的时候,这个SQL也会执行。

-- 设置最大活动SQL数与最大资源代价
testDB=# create resource queue myqueue4 with (ACTIVE_STATEMENTS=30,MAX_COST=5000);

-- 设置最大活动SQL数与优先级
testDB=# create resource queue myqueue5 with (ACTIVE_STATEMENTS=5,PRIORITY=MAX);

-- 删除资源队列
testDB=# drop resource queue myqueue;

testDB=# \h alter resource queue
Command:     ALTER RESOURCE QUEUE
Description: change attributes of a resource queue
Syntax:
ALTER RESOURCE QUEUE name WITH ( queue_attribute=value [, ... ] ) 
where queue_attribute is:
   ACTIVE_STATEMENTS=integer
   MEMORY_LIMIT='memory_units'
   MAX_COST=float
   COST_OVERCOMMIT={TRUE|FALSE}
   MIN_COST=float
   PRIORITY={MIN|LOW|MEDIUM|HIGH|MAX}

ALTER RESOURCE QUEUE name WITHOUT ( queue_attribute=value [, ... ] ) 
where queue_attribute is:
   ACTIVE_STATEMENTS
   MEMORY_LIMIT
   MAX_COST
   COST_OVERCOMMIT
   MIN_COST

   Note: A resource queue must have either an ACTIVE_STATEMENTS 
   or a MAX_COST value. Do not remove both these queue_attributes 
   from a resource queue.

查看资源队列

-- 查看资源队列配置情况,每一项配置占一行
testDB=# select * from pg_resqueue_attributes;
  rsqname   |      resname      | ressetting | restypid 
------------+-------------------+------------+----------
 myqueue4   | active_statements | 30         |        1
 myqueue4   | max_cost          | 5000       |        2
 myqueue4   | min_cost          | 0          |        3
 myqueue4   | cost_overcommit   | 0          |        4
 myqueue4   | priority          | medium     |        5
 myqueue4   | memory_limit      | -1         |        6
 myqueue    | active_statements | 20         |        1
 myqueue    | max_cost          | -1         |        2
 myqueue    | min_cost          | 0          |        3
 myqueue    | cost_overcommit   | 0          |        4
 myqueue    | priority          | medium     |        5
 myqueue    | memory_limit      | -1         |        6
 pg_default | active_statements | 20         |        1
 pg_default | max_cost          | -1         |        2
 pg_default | min_cost          | 0          |        3
 pg_default | cost_overcommit   | 0          |        4
 pg_default | priority          | medium     |        5
 pg_default | memory_limit      | -1         |        6

-- 查看现有资源队列使用情况,利用PG
testDB=# select * from pg_resqueue_status;
  rsqname   | rsqcountlimit | rsqcountvalue | rsqcostlimit | rsqcostvalue | rsqwaiters | rsqholders 
------------+---------------+---------------+--------------+--------------+------------+------------
 myqueue4   |            30 |             0 |         5000 |            0 |          0 |          0
 myqueue    |            20 |             0 |           -1 |              |          0 |          0
 myqueue1   |            20 |             0 |           -1 |              |          0 |          0
 myqueue2   |            -1 |               |         3000 |            0 |          0 |          0
 myqueue5   |             5 |             0 |           -1 |              |          0 |          0
 pg_default |            20 |             0 |           -1 |              |          0 |          0
 myqueue3   |            -1 |               |         3000 |            0 |          0 |          0

-- 查看资源队列情况,利用GP
testDB=# select * from gp_toolkit.gp_resqueue_status;
 queueid |  rsqname   | rsqcountlimit | rsqcountvalue | rsqcostlimit | rsqcostvalue | rsqmemorylimit | rsqmemoryvalue | rsqwaiters | rsqholders 
---------+------------+---------------+---------------+--------------+--------------+----------------+----------------+------------+------------
   85864 | myqueue34  |            20 |             0 |           -1 |            0 |             -1 |              0 |          0 |          0
   86023 | myqueue91  |            20 |             0 |           -1 |            0 |             -1 |              0 |          0 |          0
   85843 | myqueue28  |            20 |             0 |           -1 |            0 |             -1 |              0 |          0 |          0

-- gp_toolki中,关于资源队列的视图
testDB=# \dv gp_toolkit.gp_resq*
                         List of relations
   Schema   |            Name            | Type |  Owner  | Storage 
------------+----------------------------+------+---------+---------
 gp_toolkit | gp_resq_activity           | view | gpadmin | none
 gp_toolkit | gp_resq_activity_by_queue  | view | gpadmin | none
 gp_toolkit | gp_resq_priority_backend   | view | gpadmin | none
 gp_toolkit | gp_resq_priority_statement | view | gpadmin | none
 gp_toolkit | gp_resq_role               | view | gpadmin | none
 gp_toolkit | gp_resqueue_status         | view | gpadmin | none

# 查看优先级
testDB=# SELECT * FROM gp_toolkit.gp_resq_priority_backend;

资源队列处理

  

资源队列的处理模型如上图,当一个 role 提交了一个查询的时候,计算这个查询的消耗是否超过了资源队列的限制。如果查询的资源消耗没有超过资源队列的限制,则查询立即被执行。如果查询超过了资源队列的限制(比如:超过了 active_statement 的限制),则这个查询等待,直到资源队列空闲。查询被加入到一个先进先出的队列当中。如果查询优先级启用了,周期性的计算进程所使用的资源。当一个 role 具有 superuser 属性时,则该 role 不受资源队列的限制,superuser 的查询总是立即被执行。

动态调整优先级

超级用户可以调整一条正在运行的查询的优先级,用到的函数为 gp_adjust_priority(session_id, statement_count, priority),通过这个函数,超级用户可以增加或降低任何查询的优先级。

testDB=# SELECT gp_adjust_priority(752, 24905, 'HIGH')

为了获得参数 session_id 和 statement_count,超级用户可以使用 gp_toolkit,其中有一个视图 gp_resq_priority_statement,通过这个视图可以查询上面两个查询。

testDB=# select * from gp_toolkit.gp_resq_priority_statement;
 rqpdatname | rqpusename | rqpsession | rqpcommand | rqppriority | rqpweight |                       rqpquery                       
------------+------------+------------+------------+-------------+-----------+------------------------------------------------------
 testDB     | tenant1    |      11218 |          1 | MEDIUM      |       500 | select fQ2_20()
 testDB     | tenant3    |      11088 |          4 | MEDIUM      |       500 | select fQ14_20()
 testDB     | tenant1    |      11380 |          1 | MEDIUM      |       500 | select fQ2_20()
 testDB     | tenant1    |      11317 |          1 | MEDIUM      |       500 | select fQ15_20()
 testDB     | tenant5    |      11330 |          1 | MAX         |   1000000 | select fQ14_20()
 testDB     | tenant1    |      11219 |          1 | MEDIUM      |       500 | select fQ3_20()

rqpsession对应 session_id 参数;
rqpcommand 对应 statement_count参数;
rqppriority 是当前的优先级。可以通过参数:MAX, HIGH, MEDIUM, LOW 设置优先级。

例子

-- 开启调试模式
testDB=# set gp_debug_resqueue_priority=on;  
testDB=# set client_min_messages ='debug'; 

-- 运行测试查询
testDB=# select pg_sleep(1000000);

-- 查询当前正在运行的query
testDB=# select * from gp_toolkit.gp_resq_priority_statement;
DEBUG2:  HandleClientWaitTimeout
DEBUG1:  Message type Q received by from libpq, len = 53
LOG:  statement: select * from gp_toolkit.gp_resq_priority_statement;
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=5)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=5)
 rqpdatname | rqpusename | rqpsession | rqpcommand | rqppriority | rqpweight |                       rqpquery                       
------------+------------+------------+------------+-------------+-----------+------------------------------------------------------
 testDB     | gpadmin    |      11976 |          4 | MAX         |   1000000 | select * from gp_toolkit.gp_resq_priority_statement;
 testDB     | gpadmin    |      11398 |          6 | MAX         |   1000000 | select pg_sleep(1000000);
(2 rows)

-- 调整优先级,可以使用别名(MAX, HIGH, MEDIUM, LOW, MIN),也可以直接使用数字
testDB=# select gp_adjust_priority(11398,6,'MIN'); 
DEBUG1:  Message type Q received by from libpq, len = 42
LOG:  statement: select gp_adjust_priority(11398,6,'MIN');
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)
LOG:  changing weight of (11398:6) from 1000000 to 100
DEBUG1:  Message type M received by from libpq, len = 212  (seg0 192.168.100.79:40000 pid=14477)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg0 192.168.100.79:40000 pid=14477)
DEBUG1:  Message type M received by from libpq, len = 212  (seg1 192.168.100.79:40001 pid=14479)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg1 192.168.100.79:40001 pid=14479)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg1 192.168.100.79:40001 pid=14479)
DEBUG1:  Message type M received by from libpq, len = 212  (seg2 192.168.100.79:40002 pid=14478)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg2 192.168.100.79:40002 pid=14478)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg2 192.168.100.79:40002 pid=14478)
DEBUG1:  Message type M received by from libpq, len = 212  (seg3 192.168.100.79:40003 pid=14480)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg3 192.168.100.79:40003 pid=14480)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg3 192.168.100.79:40003 pid=14480)
DEBUG1:  Message type M received by from libpq, len = 212  (seg4 192.168.100.80:40000 pid=14496)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg4 192.168.100.80:40000 pid=14496)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg4 192.168.100.80:40000 pid=14496)
DEBUG1:  Message type M received by from libpq, len = 212  (seg5 192.168.100.80:40001 pid=14498)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg5 192.168.100.80:40001 pid=14498)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg5 192.168.100.80:40001 pid=14498)
DEBUG1:  Message type M received by from libpq, len = 212  (seg6 192.168.100.80:40002 pid=14497)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg6 192.168.100.80:40002 pid=14497)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg6 192.168.100.80:40002 pid=14497)
DEBUG1:  Message type M received by from libpq, len = 212  (seg7 192.168.100.80:40003 pid=14499)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg7 192.168.100.80:40003 pid=14499)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg7 192.168.100.80:40003 pid=14499)
DEBUG1:  Message type M received by from libpq, len = 212  (seg8 192.168.100.81:40000 pid=14561)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg8 192.168.100.81:40000 pid=14561)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg8 192.168.100.81:40000 pid=14561)
DEBUG1:  Message type M received by from libpq, len = 212  (seg9 192.168.100.81:40001 pid=14559)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg9 192.168.100.81:40001 pid=14559)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg9 192.168.100.81:40001 pid=14559)
DEBUG1:  Message type M received by from libpq, len = 212  (seg11 192.168.100.81:40003 pid=14560)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg11 192.168.100.81:40003 pid=14560)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg11 192.168.100.81:40003 pid=14560)
DEBUG1:  Message type M received by from libpq, len = 212  (seg12 192.168.100.82:40000 pid=14494)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg12 192.168.100.82:40000 pid=14494)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg12 192.168.100.82:40000 pid=14494)
DEBUG1:  Message type M received by from libpq, len = 212  (seg13 192.168.100.82:40001 pid=14496)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg13 192.168.100.82:40001 pid=14496)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg13 192.168.100.82:40001 pid=14496)
DEBUG1:  Message type M received by from libpq, len = 212  (seg14 192.168.100.82:40002 pid=14497)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg14 192.168.100.82:40002 pid=14497)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg14 192.168.100.82:40002 pid=14497)
DEBUG1:  Message type M received by from libpq, len = 212  (seg15 192.168.100.82:40003 pid=14495)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg15 192.168.100.82:40003 pid=14495)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg15 192.168.100.82:40003 pid=14495)
DEBUG1:  Message type M received by from libpq, len = 212  (seg16 192.168.100.83:40000 pid=14514)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg16 192.168.100.83:40000 pid=14514)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg16 192.168.100.83:40000 pid=14514)
DEBUG1:  Message type M received by from libpq, len = 212  (seg17 192.168.100.83:40001 pid=14513)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg17 192.168.100.83:40001 pid=14513)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg17 192.168.100.83:40001 pid=14513)
DEBUG1:  Message type M received by from libpq, len = 212  (seg18 192.168.100.83:40002 pid=14516)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg18 192.168.100.83:40002 pid=14516)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg18 192.168.100.83:40002 pid=14516)
DEBUG1:  Message type M received by from libpq, len = 212  (seg19 192.168.100.83:40003 pid=14515)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg19 192.168.100.83:40003 pid=14515)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg19 192.168.100.83:40003 pid=14515)
DEBUG1:  Message type M received by from libpq, len = 212  (seg20 192.168.100.84:40000 pid=14523)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg20 192.168.100.84:40000 pid=14523)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg20 192.168.100.84:40000 pid=14523)
DEBUG1:  Message type M received by from libpq, len = 212  (seg21 192.168.100.84:40001 pid=14524)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg21 192.168.100.84:40001 pid=14524)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg21 192.168.100.84:40001 pid=14524)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg0 192.168.100.79:40000 pid=14477)
DEBUG1:  Message type M received by from libpq, len = 212  (seg10 192.168.100.81:40002 pid=14562)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg10 192.168.100.81:40002 pid=14562)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg10 192.168.100.81:40002 pid=14562)
DEBUG1:  Message type M received by from libpq, len = 212  (seg22 192.168.100.84:40002 pid=14526)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg22 192.168.100.84:40002 pid=14526)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg22 192.168.100.84:40002 pid=14526)
DEBUG1:  Message type M received by from libpq, len = 212  (seg23 192.168.100.84:40003 pid=14525)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=9)  (seg23 192.168.100.84:40003 pid=14525)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)  (seg23 192.168.100.84:40003 pid=14525)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=9)
 gp_adjust_priority 
--------------------
                  1
(1 row)

-- testDB=# select * from gp_toolkit.gp_resq_priority_statement;
DEBUG2:  HandleClientWaitTimeout
DEBUG1:  Message type Q received by from libpq, len = 53
LOG:  statement: select * from gp_toolkit.gp_resq_priority_statement;
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=11)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=11)
 rqpdatname | rqpusename | rqpsession | rqpcommand | rqppriority | rqpweight |                       rqpquery                       
------------+------------+------------+------------+-------------+-----------+------------------------------------------------------
 testDB     | gpadmin    |      11976 |         10 | MAX         |   1000000 | select * from gp_toolkit.gp_resq_priority_statement;
 testDB     | gpadmin    |      11398 |          6 | MIN         |       100 | select pg_sleep(1000000);
(2 rows)
-- 可以看到测试查询语句的优先级已经修改为: MIN

-- 测试修改成一个数字
testDB=# select gp_adjust_priority(11398,6,800);             
DEBUG1:  Message type Q received by from libpq, len = 40
LOG:  statement: select gp_adjust_priority(11398,6,800);
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)
LOG:  changing weight of (11398:6) from 100 to 800
DEBUG1:  Message type M received by from libpq, len = 216  (seg0 192.168.100.79:40000 pid=14558)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg0 192.168.100.79:40000 pid=14558)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg0 192.168.100.79:40000 pid=14558)
DEBUG1:  Message type M received by from libpq, len = 216  (seg1 192.168.100.79:40001 pid=14557)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg1 192.168.100.79:40001 pid=14557)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg1 192.168.100.79:40001 pid=14557)
DEBUG1:  Message type M received by from libpq, len = 216  (seg2 192.168.100.79:40002 pid=14559)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg2 192.168.100.79:40002 pid=14559)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg2 192.168.100.79:40002 pid=14559)
DEBUG1:  Message type M received by from libpq, len = 216  (seg3 192.168.100.79:40003 pid=14560)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg3 192.168.100.79:40003 pid=14560)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg3 192.168.100.79:40003 pid=14560)
DEBUG1:  Message type M received by from libpq, len = 216  (seg4 192.168.100.80:40000 pid=14576)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg4 192.168.100.80:40000 pid=14576)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg4 192.168.100.80:40000 pid=14576)
DEBUG1:  Message type M received by from libpq, len = 216  (seg5 192.168.100.80:40001 pid=14579)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg5 192.168.100.80:40001 pid=14579)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg5 192.168.100.80:40001 pid=14579)
DEBUG1:  Message type M received by from libpq, len = 216  (seg6 192.168.100.80:40002 pid=14578)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg6 192.168.100.80:40002 pid=14578)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg6 192.168.100.80:40002 pid=14578)
DEBUG1:  Message type M received by from libpq, len = 216  (seg7 192.168.100.80:40003 pid=14577)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg7 192.168.100.80:40003 pid=14577)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg7 192.168.100.80:40003 pid=14577)
DEBUG1:  Message type M received by from libpq, len = 216  (seg8 192.168.100.81:40000 pid=14641)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg8 192.168.100.81:40000 pid=14641)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg8 192.168.100.81:40000 pid=14641)
DEBUG1:  Message type M received by from libpq, len = 216  (seg9 192.168.100.81:40001 pid=14640)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg9 192.168.100.81:40001 pid=14640)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg9 192.168.100.81:40001 pid=14640)
DEBUG1:  Message type M received by from libpq, len = 216  (seg10 192.168.100.81:40002 pid=14642)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg10 192.168.100.81:40002 pid=14642)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg10 192.168.100.81:40002 pid=14642)
DEBUG1:  Message type M received by from libpq, len = 216  (seg11 192.168.100.81:40003 pid=14643)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg11 192.168.100.81:40003 pid=14643)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg11 192.168.100.81:40003 pid=14643)
DEBUG1:  Message type M received by from libpq, len = 216  (seg12 192.168.100.82:40000 pid=14575)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg12 192.168.100.82:40000 pid=14575)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg12 192.168.100.82:40000 pid=14575)
DEBUG1:  Message type M received by from libpq, len = 216  (seg13 192.168.100.82:40001 pid=14578)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg13 192.168.100.82:40001 pid=14578)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg13 192.168.100.82:40001 pid=14578)
DEBUG1:  Message type M received by from libpq, len = 216  (seg16 192.168.100.83:40000 pid=14593)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg16 192.168.100.83:40000 pid=14593)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg16 192.168.100.83:40000 pid=14593)
DEBUG1:  Message type M received by from libpq, len = 216  (seg17 192.168.100.83:40001 pid=14594)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg17 192.168.100.83:40001 pid=14594)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg17 192.168.100.83:40001 pid=14594)
DEBUG1:  Message type M received by from libpq, len = 216  (seg18 192.168.100.83:40002 pid=14595)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg18 192.168.100.83:40002 pid=14595)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg18 192.168.100.83:40002 pid=14595)
DEBUG1:  Message type M received by from libpq, len = 216  (seg19 192.168.100.83:40003 pid=14596)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg19 192.168.100.83:40003 pid=14596)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg19 192.168.100.83:40003 pid=14596)
DEBUG1:  Message type M received by from libpq, len = 216  (seg20 192.168.100.84:40000 pid=14606)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg20 192.168.100.84:40000 pid=14606)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg20 192.168.100.84:40000 pid=14606)
DEBUG1:  Message type M received by from libpq, len = 216  (seg21 192.168.100.84:40001 pid=14604)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg21 192.168.100.84:40001 pid=14604)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg21 192.168.100.84:40001 pid=14604)
DEBUG1:  Message type M received by from libpq, len = 216  (seg22 192.168.100.84:40002 pid=14605)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg22 192.168.100.84:40002 pid=14605)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg22 192.168.100.84:40002 pid=14605)
DEBUG1:  Message type M received by from libpq, len = 216  (seg14 192.168.100.82:40002 pid=14576)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg14 192.168.100.82:40002 pid=14576)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg14 192.168.100.82:40002 pid=14576)
DEBUG1:  Message type M received by from libpq, len = 216  (seg15 192.168.100.82:40003 pid=14577)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg15 192.168.100.82:40003 pid=14577)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg15 192.168.100.82:40003 pid=14577)
DEBUG1:  Message type M received by from libpq, len = 216  (seg23 192.168.100.84:40003 pid=14607)
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=13)  (seg23 192.168.100.84:40003 pid=14607)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)  (seg23 192.168.100.84:40003 pid=14607)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=13)
 gp_adjust_priority 
--------------------
                  1
(1 row)

testDB=# select * from gp_toolkit.gp_resq_priority_statement;
DEBUG1:  Message type Q received by from libpq, len = 53
LOG:  statement: select * from gp_toolkit.gp_resq_priority_statement;
DEBUG1:  Inserted entry for query (sessionid=11976, commandcnt=15)
DEBUG1:  Deleted entry for query (sessionid=11976, commandcnt=15)
 rqpdatname | rqpusename | rqpsession | rqpcommand | rqppriority  | rqpweight |                       rqpquery                       
------------+------------+------------+------------+--------------+-----------+------------------------------------------------------
 testDB     | gpadmin    |      11976 |         14 | MAX          |   1000000 | select * from gp_toolkit.gp_resq_priority_statement;
 testDB     | gpadmin    |      11398 |          6 | NON-STANDARD |       800 | select pg_sleep(1000000);
(2 rows)
-- 已经修改为800,因为不是一个标准的等级,所以显示: NON-STANDARD。

查询

资源队列状态

-- 检查处于等待状态的查询
testDB=# SELECT * FROM gp_toolkit.gp_locks_on_resqueue WHERE lorwaiting='true';
 lorusename | lorrsqname | lorlocktype | lorobjid | lortransaction | lorpid | lormode | lorgranted | lorwaiting 
------------+------------+-------------+----------+----------------+--------+---------+------------+------------
(0 rows)

-- 查看活动语句,gp_resq_priority_statement 包含了所有正在执行的语句的优先级,会话等信息。
testDB=# select * from gp_toolkit.gp_resq_priority_statement ;
 rqpdatname | rqpusename | rqpsession | rqpcommand | rqppriority | rqpweight |                       rqpquery                        
------------+------------+------------+------------+-------------+-----------+-------------------------------------------------------
 testDB     | gpadmin    |       3575 |         14 | MAX         |   1000000 | select * from gp_toolkit.gp_resq_priority_statement ;
 testDB     | tenant1    |       4521 |          1 | MEDIUM      |       500 | select fQ2_50()
 testDB     | tenant1    |       4523 |          1 | MEDIUM      |       500 | select fQ15_50()

-- 查找当前处于活动或等待状态的语句
testDB=# SELECT rolname, rsqname, pid, granted, current_query, datname
FROM pg_roles, gp_toolkit.gp_resqueue_status, pg_locks, pg_stat_activity
WHERE pg_roles.rolresqueue=pg_locks.objid
AND pg_locks.objid=gp_toolkit.gp_resqueue_status.queueid
AND pg_stat_activity.procpid=pg_locks.pid;

查询取消

-- 查找当前正在运行的sql
testDB=# select * from pg_stat_activity where current_query <> '<IDLE> in transaction' order by xact_start;
-- 取消查询
testDB=# select pg_cancel_backend(1234);
-- 终止查询
testDB=# select pg_terminate_backend(1234);

源码分析

src/backend/postmaster/backoff.c

Query Prioritization

src/backend/command/resqueue.c

POSTGRES internals code for resource queues and locks.

// 从等待队列中唤醒一个进程
/*
 * ResProcLockRemoveSelfAndWakeup -- awaken any processses waiting on a resource lock.
 *
 * Notes:
 *	It always remove itself from the waitlist.
 *	Need to only awaken enough as many waiters as the resource controlled by
 *	the the lock should allow!
 */
void
ResProcLockRemoveSelfAndWakeup(LOCK *lock)
{
    PROC_QUEUE *waitQueue = &(lock->waitProcs);
    int            queue_size = waitQueue->size;
    PGPROC       *proc;
    uint32        hashcode;
    LWLockId    partitionLock;

    int            status;

    Assert(LWLockHeldExclusiveByMe(ResQueueLock));

    /*
     * XXX: This code is ugly and hard to read -- it should be a lot simpler,
     * especially when there are some odd cases (process sitting on its own
     * wait-queue).
     */

    Assert(queue_size >= 0);
    if (queue_size == 0)
    {
        return;
    }

    proc = (PGPROC *) MAKE_PTR(waitQueue->links.next);

    while (queue_size-- > 0)
    {
        /*
         * Get the portal we are waiting on, and then its set of increments.
         */
        ResPortalTag portalTag;
        ResPortalIncrement *incrementSet;

        /* Our own process may be on our wait-queue! */
        if (proc->pid == MyProc->pid)
        {
            PGPROC       *nextproc;

            nextproc = (PGPROC *) MAKE_PTR(proc->links.next);

            SHMQueueDelete(&(proc->links));
            (proc->waitLock->waitProcs.size)--;

            proc = nextproc;

            continue;
        }

        MemSet(&portalTag, 0, sizeof(ResPortalTag));
        portalTag.pid = proc->pid;
        portalTag.portalId = proc->waitPortalId;

        incrementSet = ResIncrementFind(&portalTag);
        if (!incrementSet)
        {
            hashcode = LockTagHashCode(&(lock->tag));
            partitionLock = LockHashPartitionLock(hashcode);

            LWLockRelease(partitionLock);
            elog(ERROR, "no increment data for  portal id %u and pid %d", proc->waitPortalId, proc->pid);
        }

        /*
         * See if it is ok to wake this guy. (note that the wakeup writes to
         * the wait list, and gives back a *new* next proc).
         */
        status = ResLockCheckLimit(lock, (PROCLOCK *) proc->waitProcLock, incrementSet, true);
        if (status == STATUS_OK)
        {
            ResGrantLock(lock, (PROCLOCK *) proc->waitProcLock);
            ResLockUpdateLimit(lock, (PROCLOCK *) proc->waitProcLock, incrementSet, true, false);

            proc = ResProcWakeup(proc, STATUS_OK);
        }
        else
        {
            /* Otherwise move on to the next guy. */
            proc = (PGPROC *) MAKE_PTR(proc->links.next);
        }
    }

    Assert(waitQueue->size >= 0);

    return;
}

// 唤醒一个进程
/*
 * ResProcWakeup -- wake a sleeping process.
 *
 * (could we just use ProcWakeup here?)
 *
 */
PGPROC *
ResProcWakeup(PGPROC *proc, int waitStatus)
{
	PGPROC	   *retProc;

	/* Proc should be sleeping ... */
	if (proc->links.prev == INVALID_OFFSET ||
		proc->links.next == INVALID_OFFSET)
		return NULL;

	/* Save next process before we zap the list link */
	retProc = (PGPROC *) MAKE_PTR(proc->links.next);

	/* Remove process from wait queue */
	SHMQueueDelete(&(proc->links));
	(proc->waitLock->waitProcs.size)--;

	/* Clean up process' state and pass it the ok/fail signal */
	proc->waitLock = NULL;
	proc->waitProcLock = NULL;
	proc->waitStatus = waitStatus;

	/* And awaken it */
	PGSemaphoreUnlock(&proc->sem);

	return retProc;
}

//  返回一个空的queue,用于初始化
/*
 * ResQueuehashNew -- return a new (empty) queue object to initialize.
 *
 * Notes
 *	The resource queue lightweight lock (ResQueueLock) *must* be held for
 *	this operation.
 *
 */
ResQueue
ResQueueHashNew(Oid queueid)
{
	bool		found;
	ResQueueData *queue;

	Assert(LWLockHeldExclusiveByMe(ResQueueLock));

	queue = (ResQueueData *)
		hash_search(ResQueueHash, (void *) &queueid, HASH_ENTER_NULL, &found);

	/* caller should test that the queue does not exist already */
	Assert(!found);

	if (!queue)
		return NULL;

	return (ResQueue) queue;
}

src/include/storage/proc.h

/*
 * Each backend has a PGPROC struct in shared memory.  There is also a list of
 * currently-unused PGPROC structs that will be reallocated to new backends.
 *
 * links: list link for any list the PGPROC is in.	When waiting for a lock,
 * the PGPROC is linked into that lock's waitProcs queue.  A recycled PGPROC
 * is linked into ProcGlobal's freeProcs list.
 *
 * Note: twophase.c also sets up a dummy PGPROC struct for each currently
 * prepared transaction.  These PGPROCs appear in the ProcArray data structure
 * so that the prepared transactions appear to be still running and are
 * correctly shown as holding locks.  A prepared transaction PGPROC can be
 * distinguished from a real one at need by the fact that it has pid == 0.
 * The semaphore and lock-activity fields in a prepared-xact PGPROC are unused,
 * but its myProcLocks[] lists are valid.
 *
 * 在shared memory当中,每一个backend有一个PGPROC
 */
struct PGPROC
{
	/* proc->links MUST BE FIRST IN STRUCT (see ProcSleep,ProcWakeup,etc) */
	SHM_QUEUE	links;			/* list link if process is in a list */

	PGSemaphoreData sem;		/* ONE semaphore to sleep on */
	int			waitStatus;		/* STATUS_WAITING, STATUS_OK or STATUS_ERROR */

	Latch		procLatch;		/* generic latch for process */

	LocalTransactionId lxid;	/* local id of top-level transaction currently
								 * being executed by this proc, if running;
								 * else InvalidLocalTransactionId */

	TransactionId xid;			/* id of top-level transaction currently being
								 * executed by this proc, if running and XID
								 * is assigned; else InvalidTransactionId */

	TransactionId xmin;			/* minimal running XID as it was when we were
								 * starting our xact, excluding LAZY VACUUM:
								 * vacuum must not remove tuples deleted by
								 * xid >= xmin ! */

	/*
	 * Distributed transaction information. This is only maintained on QE's
	 * and accessed by the backend itself, so this doesn't need to be
	 * protected by any lock. On QD currentGXact provides this info, hence
	 * redundant info is not maintained here for QD. In fact, it could be just
	 * a global variable in backend-private memory, but it seems useful to
	 * have this information available for debugging purposes.
	 */
	LocalDistribXactData localDistribXactData;

	int			pid;			/* This backend's process id, or 0 */
	BackendId	backendId;		/* This backend's backend ID (if assigned) */
	Oid			databaseId;		/* OID of database this backend is using */
	Oid			roleId;			/* OID of role using this backend */
    int         mppSessionId;   /* serial num of the qDisp process */
    int         mppLocalProcessSerial;  /* this backend's PGPROC serial num */
    bool		mppIsWriter;	/* The writer gang member, holder of locks */

	bool		inCommit;		/* true if within commit critical section */

	uint8		vacuumFlags;	/* vacuum-related flags, see above */

	/* Info about LWLock the process is currently waiting for, if any. */
	bool		lwWaiting;		/* true if waiting for an LW lock */
	bool		lwExclusive;	/* true if waiting for exclusive access */
	struct PGPROC *lwWaitLink;	/* next waiter for same LW lock */

	/* Info about lock the process is currently waiting for, if any. */
	/* waitLock and waitProcLock are NULL if not currently waiting. */
	LOCK	   *waitLock;		/* Lock object we're sleeping on ... */
	PROCLOCK   *waitProcLock;	/* Per-holder info for awaited lock */
	LOCKMODE	waitLockMode;	/* type of lock we're waiting for */
	LOCKMASK	heldLocks;		/* bitmask for lock types already held on this
								 * lock object by this backend */

	/*
	 * Info to allow us to wait for synchronous replication, if needed.
	 * waitLSN is InvalidXLogRecPtr if not waiting; set only by user backend.
	 * syncRepState must not be touched except by owning process or WALSender.
	 * syncRepLinks used only while holding SyncRepLock.
	 */
	XLogRecPtr	waitLSN;		/* waiting for this LSN or higher */
	int			syncRepState;	/* wait state for sync rep */
	SHM_QUEUE	syncRepLinks;	/* list link if process is in syncrep queue */

	/*
	 * All PROCLOCK objects for locks held or awaited by this backend are
	 * linked into one of these lists, according to the partition number of
	 * their lock.
	 */
	SHM_QUEUE	myProcLocks[NUM_LOCK_PARTITIONS];

	struct XidCache subxids;	/* cache for subtransaction XIDs */

	/*
	 * Info for Resource Scheduling, what portal (i.e statement) we might
	 * be waiting on.
	 */
	uint32		waitPortalId;	/* portal id we are waiting on */

	/*
	 * Information for our combocid-map (populated in writer/dispatcher backends only)
	 */
	uint32		combocid_map_count; /* how many entries in the map ? */

	int queryCommandId; /* command_id for the running query */

	bool serializableIsoLevel; /* true if proc has serializable isolation level set */

	bool inDropTransaction; /* true if proc is in vacuum drop transaction */

	/*
	 * Information for resource group
	 */
	bool		resWaiting;		/* true if waiting for an Resource Group lock */
};

src/include/storage/lock.h

/*
 * Per-locked-object lock information:
 *
 * tag -- uniquely identifies the object being locked
 * grantMask -- bitmask for all lock types currently granted on this object.
 * waitMask -- bitmask for all lock types currently awaited on this object.
 * procLocks -- list of PROCLOCK objects for this lock.
 * waitProcs -- queue of processes waiting for this lock.
 * requested -- count of each lock type currently requested on the lock
 *		(includes requests already granted!!).
 * nRequested -- total requested locks of all types.
 * granted -- count of each lock type currently granted on the lock.
 * nGranted -- total granted locks of all types.
 *
 * Note: these counts count 1 for each backend.  Internally to a backend,
 * there may be multiple grabs on a particular lock, but this is not reflected
 * into shared memory.
 */
typedef struct LOCK
{
	/* hash key */
	LOCKTAG		tag;			/* unique identifier of lockable object */

	/* data */
	LOCKMASK	grantMask;		/* bitmask for lock types already granted */
	LOCKMASK	waitMask;		/* bitmask for lock types awaited */
	SHM_QUEUE	procLocks;		/* list of PROCLOCK objects assoc. with lock */
	PROC_QUEUE	waitProcs;		/* list of PGPROC objects waiting on lock */
	int			requested[MAX_LOCKMODES];		/* counts of requested locks */
	int			nRequested;		/* total of requested[] array */
	int			granted[MAX_LOCKMODES]; /* counts of granted locks */
	int			nGranted;		/* total of granted[] array */
} LOCK;

src/backend/utils/resscheduler/resscheduler.c

POSTGRES resource scheduling management code.

//  初始化queue的各元素
/*
 * ResCreateQueue -- initialize the elements for a resource queue.
 *
 * Notes:
 *	It is expected that the appropriate lightweight lock is held before
 *	calling this - unless we are the startup process.
 *
 */
bool
ResCreateQueue(Oid queueid, Cost limits[NUM_RES_LIMIT_TYPES], bool overcommit,
			   float4 ignorelimit)
{

	ResQueue		queue;
	int				i;

	Assert(LWLockHeldExclusiveByMe(ResQueueLock));
	
	/* If the new queue pointer is NULL, then we are out of queueus. */
	if (ResScheduler->num_queues >= MaxResourceQueues)
		return false;

	/**
	 * Has an entry for this 
	 */
	
	queue = ResQueueHashNew(queueid);
	Assert(queue != NULL);
	
	/* Set queue oid and offset in the schedular array */
	queue->queueid = queueid;

	/* Set the number of limits 0 initially. */
	queue->num_limits = 0;

	/* Set overcommit.*/
	queue->overcommit = overcommit;

	/* Set ignore cost limit. */
	queue->ignorecostlimit = ignorelimit;

	/* Now run through all the possible limit types.*/
	for (i = 0 ; i < NUM_RES_LIMIT_TYPES; i++)
	{
		/*
		 * Specific initializations for the limit types (the current two are
		 * in fact the same).
		 */
		switch (i)
		{
			case RES_COUNT_LIMIT:
			case RES_COST_LIMIT:
			case RES_MEMORY_LIMIT:
				{
					queue->num_limits++;
					queue->limits[i].type = i;
					queue->limits[i].threshold_value = limits[i];
					queue->limits[i].current_value = 0;
					queue->limits[i].threshold_is_max = true;
					break;
				}
			default:
				elog(ERROR, "unknown resource limit type %d", i);
			break;
		}


	}
	ResScheduler->num_queues++;
	return true;
}

src/backend/storage/ipc/shmqueue.c

shared memory linked lists

//  删除一个 element
/*
 * SHMQueueDelete -- remove an element from the queue and
 *		close the links
 *
 */
void
SHMQueueDelete(SHM_QUEUE *queue)
{
	SHM_QUEUE  *nextElem = (SHM_QUEUE *) MAKE_PTR((queue)->next);
	SHM_QUEUE  *prevElem = (SHM_QUEUE *) MAKE_PTR((queue)->prev);

	Assert(SHM_PTR_VALID(queue));
	Assert(SHM_PTR_VALID(nextElem));
	Assert(SHM_PTR_VALID(prevElem));

#ifdef SHMQUEUE_DEBUG
	dumpQ(queue, "in SHMQueueDelete: begin");
#endif

	prevElem->next = (queue)->next;
	nextElem->prev = (queue)->prev;

	(queue)->prev = (queue)->next = INVALID_OFFSET;
}

src/include/utils/hsearch.h

exported definitions for utils/hash/dynahash.c; see notes therein
共享内存的三个结构体 HASHCTL HASHHDR HTAB

/* Parameter data structure for hash_create */
/* Only those fields indicated by hash_flags need be set */
typedef struct HASHCTL
{
	long		num_partitions; /* # partitions (must be power of 2) */
	long		ssize;			/* segment size */
	long		dsize;			/* (initial) directory size */
	long		max_dsize;		/* limit to dsize if dir size is limited */
	long		ffactor;		/* fill factor */
	Size		keysize;		/* hash key length in bytes */
	Size		entrysize;		/* total user element size in bytes */
	HashValueFunc hash;			/* hash function */
	HashCompareFunc match;		/* key comparison function */
	HashCopyFunc keycopy;		/* key copying function */
	HashAllocFunc alloc;		/* memory allocator */
	MemoryContext hcxt;			/* memory context to use for allocations */
	HASHHDR    *hctl;			/* location of header in shared mem */
} HASHCTL;

src/backend/utils/hash/dynahash.c

dynamic hash tables

/*
 * Header structure for a hash table --- contains all changeable info
 *
 * In a shared-memory hash table, the HASHHDR is in shared memory, while
 * each backend has a local HTAB struct.  For a non-shared table, there isn't
 * any functional difference between HASHHDR and HTAB, but we separate them
 * anyway to share code between shared and non-shared tables.
 * 
 * 定义了hash结构的目录
 */
struct HASHHDR
{
	/* In a partitioned table, take this lock to touch nentries or freeList */
	slock_t		mutex;			/* unused if not partitioned table */

	/* These fields change during entry addition/deletion */
	long		nentries;		/* number of entries in hash table */
	HASHELEMENT *freeList;		/* linked list of free elements */

	/* These fields can change, but not in a partitioned table */
	/* Also, dsize can't change in a shared table, even if unpartitioned */
	long		dsize;			/* directory size */
	long		nsegs;			/* number of allocated segments (&lt;= dsize) */
	uint32		max_bucket;		/* ID of maximum bucket in use */
	uint32		high_mask;		/* mask to modulo into entire table */
	uint32		low_mask;		/* mask to modulo into lower half of table */

	/* These fields are fixed at hashtable creation */
	Size		keysize;		/* hash key length in bytes */
	Size		entrysize;		/* total user element size in bytes */
	long		num_partitions; /* # partitions (must be power of 2), or 0 */
	long		ffactor;		/* target fill factor */
	long		max_dsize;		/* 'dsize' limit if directory is fixed size */
	long		ssize;			/* segment size --- must be power of 2 */
	int			sshift;			/* segment shift = log2(ssize) */
	int			nelem_alloc;	/* number of entries to allocate at once */

#ifdef HASH_STATISTICS

	/*
	 * Count statistics here.  NB: stats code doesn't bother with mutex, so
	 * counts could be corrupted a bit in a partitioned table.
	 */
	long		accesses;
	long		collisions;
#endif
};

/*
 * Top control structure for a hashtable --- in a shared table, each backend
 * has its own copy (OK since no fields change at runtime)
 * 
 * 定义了  hash table(动态、静态属性)
 */
struct HTAB
{
	HASHHDR    *hctl;			/* => shared control information */
	HASHSEGMENT *dir;			/* directory of segment starts */
	HashValueFunc hash;			/* hash function */
	HashCompareFunc match;		/* key comparison function */
	HashCopyFunc keycopy;		/* key copying function */
	HashAllocFunc alloc;		/* memory allocator */
	MemoryContext hcxt;			/* memory context if default allocator used */
	char	   *tabname;		/* table name (for error messages) */
	bool		isshared;		/* true if table is in shared memory */

	/* freezing a shared table isn't allowed, so we can keep state here */
	bool		frozen;			/* true = no more inserts allowed */

	/* We keep local copies of these fixed values to reduce contention */
	Size		keysize;		/* hash key length in bytes */
	long		ssize;			/* segment size --- must be power of 2 */
	int			sshift;			/* segment shift = log2(ssize) */
};

在源代码中开启调试

$ vi src/backend/postmaster/backoff.c
...
#define BACKOFF_DEBUG
...

$ vi src/backend/storage/ipc/shmqueue.c
...
#define SHMQUEUE_DEBUG
...

$ vi src/include/pg_config_manual.h 
...

/*
 * 添加
 * for src/backend/utils/resschedular/resschedular.c
 */
#ifndef RESLOCK_DEBUG
#define RESLOCK_DEBUG
#endif
...

测试

创建表并导入数据

$ psql -d testDB -f photoobjall.sql
$ psql -d testDB -f runtime.sql 
$ gpload -f photoobjall_50.yaml

设置命令行

testDB=# set client_min_messages="debug1";

# 赋权
testDB=# grant select on photoobjall to tenant1_1;
testDB=# grant select on photoobjall to tenant1_2;
testDB=# grant select on photoobjall to tenant1_3;

# 查询处于等待状态的sql
testDB=# select * from gp_toolkit.gp_locks_on_resqueue where lorwaiting='true';
# 查询当前正在运行的sql
testDB=# select * from gp_toolkit.gp_resq_priority_statement;

# 查看资源队列配置
testDB=# select * from pg_resqueue_attributes;

1. 测试最大活动SQL

# 创建资源队列和角色
testDB=# create resource queue myqueue1 with (ACTIVE_STATEMENTS=2);
testDB=# create role tenant1_1 createdb createrole createexttable inherit login encrypted password 'tenant1' resource queue myqueue1;
testDB=# create role tenant1_2 createdb createrole createexttable inherit login encrypted password 'tenant1' resource queue myqueue1;
testDB=# create role tenant1_3 createdb createrole createexttable inherit login encrypted password 'tenant1' resource queue myqueue1;

# 查询
SELECT pg_sleep(15);

2. 测试活动的sql的内存

testDB=# create resource queue myqueue2 with (ACTIVE_STATEMENTS=2, MEMORY_LIMIT='10MB');
testDB=# create role tenant2_1 createdb createrole createexttable inherit login encrypted password 'tenant2' resource queue myqueue2;
testDB=# create role tenant2_2 createdb createrole createexttable inherit login encrypted password 'tenant2' resource queue myqueue2;
testDB=# create role tenant2_3 createdb createrole createexttable inherit login encrypted password 'tenant2' resource queue myqueue2;

# 查询
SELECT pg_sleep(15);

3. 测试优先级

testDB=# create resource queue myqueue3 with (ACTIVE_STATEMENTS=5,PRIORITY=MAX);
testDB=# create role tenant3_1 createdb createrole createexttable inherit login encrypted password 'tenant3' resource queue myqueue3;
testDB=# create role tenant3_2 createdb createrole createexttable inherit login encrypted password 'tenant3' resource queue myqueue3;
testDB=# create role tenant3_3 createdb createrole createexttable inherit login encrypted password 'tenant3' resource queue myqueue3;

# 查询
SELECT pg_sleep(15);

4. 综合

greenplum-db/gpdb resource issues