PostgreSQL/Greenplum自定义函数

自定义函数可以用sql、python、C等语言实现,不同的语言有不同的实现方式,下面主要看python和C的自定义函数。
这里,Greenplum对应PostgreSQL 8.4。

python自定义函数

首先是用python实现。自定义函数作为数据库的一种扩展,可以及方面的扩展数据库的功能,尤其是它能够实现SQL不方便或不能实现的功能。自定义函数分为:简单的自定义函数,访问数据库的自定义函数、触发器自定义函数等。

PL/Python数据类型映射

postgresql -> python
boolean -> bool
smallint/int -> int
bigint/oid -> long(python 2) int(python 3)
real/double -> float
numeric -> decimal
bytes -> str

44.3. Data Values 

PL/Python返回类型

返回简单类型

PL/Python参数在被转换成相应的类型之后,被传递给python代码。结果被传递回来,然后转换成相应的postgresql类型,进而获得返回值。

返回一个记录

返回一个记录,可以使用:
序列或值列表,采用与返回记录中的字段相同的顺序;
带有键值的字典,与返回记录中的字段相匹配;
具有属性的一个类型或类型实例,与返回记录中的字段相匹配;

返回集合

当返回一个集合时,可以选择以下3种:
返回一张列表或者返回类型的任何其他序列;
返回一个迭代器或生成器;
从一个循环中产生返回值;

示例

-- 创建扩展
create language plpythonu;

-- 简单函数
create or replace function add_one(i int)
	returns int 
as $$
	return i+1;
$$ language plpythonu;

-- 返回记录
-- 1.使用一个实例
create or replace function userinfo(
	inout username name,
	out user_id oid,
	out is_superuser boolean)
as $$
	class PGUser:
		def __init__(self,username,user_id,is_superuser):
			self.username = username
			self.user_id = user_id
			self.is_superuser = is_superuser
	
	u = plpy.execute("""select usename,usesysid,usesuper
			from pg_user
			where usename = '%s'""" % username)[0]; 

	user = PGUser(u['usename'],u['usesysid'],u['usesuper'])
	return user
$$ language plpythonu;
--后面的[0]是为了提取结果中的第一行,因为plpy.execute会返回一张结果列表]

-- 2.使用字典
create or replace function userinfo(
	inout username name,
	out user_id oid,
	out is_superuser boolean)
as $$
	u = plpy.execute("""
		select usename,usesysid,usesuper
		from pg_user
		where usename = '%s'""" % username)[0]

	return {'username':u['usename'],'user_id':u['usesysid'],'is_superuser':u['usesuper']}
$$ language plpythonu;

-- 3.使用三元组
create or replace function userinfo(
	inout username name,
	out user_id oid,
	out is_superuser boolean)
as $$
	u = plpy.execute("""
		select usename,usesysid,usesuper
		from pg_user
		where usename = '%s'""" % username)[0]

	return (u['usename'],u['usesysid'],u['usesuper'])
$$ language plpythonu;

-- 调用 select * from userinfo('gpadmin');

-- 返回一个集合
-- 生成所有偶数
-- 1.返回一列整数
create or replace function even_number_from_list(up_to int)
	returns setof int
as $$
	return range(0,up_to,2);
$$ language plpythonu;

-- 2.返回表
create or replace function even_number_from_generator(up_to int)
	returns table(even int,odd int)
as $$
	return ((i,i+1) for i in xrange(0,up_to,2))
$$ language plpythonu;

-- 3.返回集合
create or replace function even_numbers_with_yield(up_to int, out even int,out odd int)
	returns setof record
as $$
	for i in xrange(0,up_to,2):
		yield i,i+1
$$ language plpythonu;

-- 3.对于集合中的任意一行,可以返回不同类型
create or replace function birthdates(out name text,out birthdate date)
	returns setof record
as $$
	return (
		{'name':'bob','birthdate':'1980-1-1'},
		{'name':'mary','birthdate':'1990-1-1'},
	);
$$ language plpythonu;

访问数据库

python中的plpy包提供访问数据库的功能。下面介绍它的几个主要函数。

plpy.execute(query [, max-rows])

Calling 

1
plpy.execute

 with a query string and an optional row limit argument causes that query to be run and the result to be returned in a result object.
The result object emulates a list or dictionary object. The result object can be accessed by row number and column name. For example:
该函数返回一个结果对象,该结果对象模拟一个字典列表,一行一个字典。

-- 执行查询,得到结果对象
rv = plpy.execute("SELECT * FROM my_table", 5)

-- 得到第i(从0开始)行的my_column字段的结果
foo = rv[i]["my_column"]

The result object has these additional methods:

nrows()

Returns the number of rows processed by the command. Note that this is not necessarily the same as the number of rows returned. For example, an UPDATE command will set this value but won't return any rows (unless RETURNING is used).
得到该命令处理的行数。

status()

The 

1
SPI_execute()

 return value.

colnames()
coltypes()
coltypmods()

Return a list of column names, list of column type OIDs, and list of type-specific type modifiers for the columns, respectively.

__str__()

The standard __str__ method is defined so that it is possible for example to debug query execution results using plpy.debug(rv).

Note that calling plpy.execute will cause the entire result set to be read into memory. Only use that function when you are sure that the result set will be relatively small. If you don't want to risk excessive memory usage when fetching large results, use plpy.cursor rather than plpy.execute.
执行plpy.execute会将所有的结果读入到内存。在使用该函数前,请确保得到的结果集比较小。如果结果集比较大,请使用plpy.cursor。

plpy.prepare(query [, argtypes])
plpy.execute(plan [, arguments [, max-rows]])

1
plpy.prepare

 prepares the execution plan for a query. It is called with a query string and a list of parameter types, if you have parameter references in the query.

-- text is the type of the variable you will be passing for $1. The second argument is optional if you don't want to pass any parameters to the query.
-- 准备查询,解析查询字符串,将其转换成一个查询树,通过优化查询树,产生最佳的查询计划,并返回 prepared_query对象。
plan = plpy.prepare("SELECT last_name FROM my_users WHERE first_name = $1", ["text"])

-- Pass the plan as the first argument (instead of the query string), and a list of values to substitute into the query as the second argument. 
-- The second argument is optional if the query does not expect any parameters. The third argument is the optional row limit as before.
rv = plpy.execute(plan, ["name"], 5)

缓存查询

In order to make effective use of this across function calls one needs to use one of the persistent storage dictionaries SD or GD

CREATE FUNCTION usesavedplan() RETURNS trigger AS $$
    if "plan" in SD:
        plan = SD["plan"]
    else:
        plan = plpy.prepare("SELECT 1")
        SD["plan"] = plan
    # rest of function
$$ LANGUAGE plpythonu;

The global dictionary SD is available to store data between function calls. This variable is private static data. The global dictionary GD is public data, available to all Python functions within a session.
Each function gets its own execution environment in the Python interpreter, so that global data and function arguments from 

1
myfunc

 are not available to

1
myfunc2

. The exception is the data in the GD dictionary.
SD[]和GD[]里的值智能存在于一个单一数据库会话中,所以当有一个长期存活的连接时,才可以进行缓存。

plpy.cursor(query)
plpy.cursor(plan [, arguments])

The plpy.cursor function accepts the same arguments as plpy.execute (except for the row limit) and returns a cursor object, which allows you to process large result sets in smaller chunks. As with plpy.execute, either a query string or a plan object along with a list of arguments can be used.

The cursor object provides a fetch method that accepts an integer parameter and returns a result object. Each time you call fetch, the returned object will contain the next batch of rows, never larger than the parameter value. Once all rows are exhausted, fetch starts returning an empty result object.

以下为两种处理一个大表数据的方法

CREATE or replace FUNCTION count_odd_iterator() RETURNS integer AS $$
odd = 0
for row in plpy.cursor("select num from largetable"):
    if row['num'] % 2:
         odd += 1
return odd
$$ LANGUAGE plpythonu;

CREATE or replace FUNCTION count_odd_fetch(batch_size integer) RETURNS integer AS $$
odd = 0
cursor = plpy.cursor("select num from largetable")
while True:
    rows = cursor.fetch(batch_size)
    if not rows:
        break
    for row in rows:
        if row['num'] % 2:
            odd += 1
return odd
$$ LANGUAGE plpythonu;

CREATE or replace FUNCTION count_odd_prepared() RETURNS integer AS $$
odd = 0
plan = plpy.prepare("select num from largetable where num % $1 <> 0", ["integer"])
rows = list(plpy.cursor(plan, [2]))

return len(rows)
$$ LANGUAGE plpythonu;

44.7. Database Access 
PL/Python – Python Procedural Language 

C自定义函数

一个例子

#include "postgres.h"
#include "fmgr.h"
#include "utils/array.h"
#include "catalog/pg_type.h"

PG_MODULE_MAGIC;

/*
 * 基本加法
 */
PG_FUNCTION_INFO_V1(add_ab);
Datum
add_ab(PG_FUNCTION_ARGS){
    int32 arg_a = PG_GETARG_INT32(0);
    int32 arg_b = PG_GETARG_INT32(1);

    PG_RETURN_INT32(arg_a + arg_b);
}

/*
 * 处理NULL参数
 * 1. 当出现任一参数为NULL时,确保参数仍被调用;
 * 2. 有效处理NULL参数,将NULL参数转换为0,当两个参数均为空时返回NULL
 */
PG_FUNCTION_INFO_V1(add_ab_null);
Datum
add_ab_null(PG_FUNCTION_ARGS){
    int32 not_null = 0;
    int32 sum = 0;

    if(!PG_ARGISNULL(0)){
        sum += PG_GETARG_INT32(0);
        not_null = 1;
    }
    if(!PG_ARGISNULL(1)){
        sum += PG_GETARG_INT32(1);
        not_null = 1;
    }
    if(not_null){
        PG_RETURN_INT32(sum);
    }
    PG_RETURN_NULL();
}

/*
 * 在参数数组中累加所有非空元素
 *
 */
PG_FUNCTION_INFO_V1(add_int32_array);
Datum
add_int32_array(PG_FUNCTION_ARGS){
    ArrayType *input_array;

    int32 sum = 0;
    bool not_null = false;
    Datum *datums;
    bool *nulls;
    int count;
    int i;

    input_array = PG_GETARG_ARRAYTYPE_P(0);
    Assert(ARR_ELEMTYPE(input_array) == INT4OID);

    if(ARR_NDIM(input_array) > 1)
        ereport(ERROR,
                (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
                 errmsg("1-dimensional array needed")));

    deconstruct_array(input_array,
                    INT4OID,
                    4,
                    true,
                    'i',
                    &datums,&nulls,&count);

    for(i=0;i<count;i++){
        if(nulls[i])
            continue;

        sum += DatumGetInt32(datums[i]);
        not_null = true;
    }

    if(not_null)
        PG_RETURN_INT32(sum);
    PG_RETURN_NULL();
}

编译链接

$ cc -fpic -c add_func.c -I/opt/pgsql/include/server/
$ cc -shared -o add_func.so add_func.o

创建函数

testDB=#
create or replace function add(int,int)
returns int
as '/home/postgres/sql_prac/Greenplum/functions/c/add_func', 'add_ab'
language c strict;

testDB=# select add(1,2);

testDB=# 
create or replace function add_null(int,int)
returns int
as '/home/postgres/sql_prac/Greenplum/functions/c/add_func', 'add_ab_null'
language c strict;

testDB=# select add_null(1,NULL);

testDB=# 
create or replace function add_arr(int[]) 
returns int
as '/home/postgres/sql_prac/Greenplum/functions/c/add_func', 'add_int32_array'
language c strict;

testDB=# select add_arr('{1,2,3}');
 add_arr 
---------
       6
(1 row)

testDB=# select add_arr('{1,2,NULL}');
 add_arr 
---------
       3
(1 row)

testDB=# select add_arr(ARRAY[1,2,3]);
 add_arr 
---------
       6
(1 row)

testDB=# select add_arr(ARRAY[1,2,NULL]);
 add_arr 
---------
       3
(1 row)

testDB=# select add_arr(ARRAY[NULL::int]);
 add_arr 
---------
        
(1 row)

-- 检测二维数组
testDB=# select add_arr('{{1,2,3},{4,5,6}}');
ERROR:  1-dimensional array needed

-- 支持VARIADIC函数
testDB=#
create or replace function add_variadic(VARIADIC a int[])
returns int
as '/home/postgres/sql_prac/Greenplum/functions/c/add_func', 'add_int32_array'
language c strict;
testDB=# select add_variadic(1,2,3);
 add_variadic 
--------------
            6
(1 row)

testDB=# select add_variadic(NULL); 
 add_variadic 
--------------
             
(1 row)

testDB=# select add_variadic(1,2,NULL);
 add_variadic 
--------------
            3
(1 row)

一些语法

PG_MODULE_MAGIC
服务器使用该模块,能够确保服务器不会加载不同PostgreSQL版本所编译的代码。

PG_FUNCTION_INFO_V1
将PostgreSQL函数定义为Version 1调用约定函数,如果没有该行,函数会被当做旧式的Version 0函数

PG_FUNCTION_ARGS
函数参数

PG_GETARG_<type>
参数类型

PG_RETURN_<type>
返回参数类型

查看自定义函数

testDB=# \df
                        List of functions
 Schema | Name | Result data type | Argument data types |  Type  
--------+------+------------------+---------------------+--------
 public | add  | integer          | integer, integer    | normal
(1 row)

testDB=# \x on
Expanded display is on.
testDB=# select * from pg_proc where proname='add';
-[ RECORD 1 ]---+----------------------------------------------------
proname         | add
pronamespace    | 2200
proowner        | 10
prolang         | 13
procost         | 1
prorows         | 0
provariadic     | 0
proisagg        | f
proiswindow     | f
prosecdef       | f
proisstrict     | t
proretset       | f
provolatile     | v
pronargs        | 2
pronargdefaults | 0
prorettype      | 23
proargtypes     | 23 23
proallargtypes  | 
proargmodes     | 
proargnames     | 
proargdefaults  | 
prosrc          | add_ab
probin          | /home/gpdba/sql_prac/Greenplum/functions/c/add_func
proconfig       | 
proacl          | 
prodataaccess   | n
proexeclocation | a

复合类型参数

#include "postgres.h"
#include "executor/executor.h"  /* for GetAttributeByName() */

#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif

PG_FUNCTION_INFO_V1(c_overpaid);

Datum
c_overpaid(PG_FUNCTION_ARGS)
{
    HeapTupleHeader  t = PG_GETARG_HEAPTUPLEHEADER(0);
    int32            limit = PG_GETARG_INT32(1);
    bool isnull;
    Datum salary;

    salary = GetAttributeByName(t, "salary", &isnull);
    if (isnull)
        PG_RETURN_BOOL(false);

    /* 另外,可能更希望将PG_RETURN_NULL()用在null薪水上 */

    PG_RETURN_BOOL(DatumGetInt32(salary) > limit);
}

GetAttributeByName 是 PostgreSQL 系统函数,用来返回当前记录的字段。
它有三个参数:
类型为 HeapTupleHeader 的传入函数的参数、字段名称、一个确定字段是否为NULL的返回参数。
GetAttributeByName 函数返回一个Datum值,可以用对应的DatumGetXXX()宏把它转换成合适的数据类型。

在PG中申明该函数

create function c_overpaid(emp,integer) returns boolean
AS 'DIRECTORY/funcs', 'c_overpaid'
LANGUAGE C STRICT;

返回行(一行)

制作一个符合类型的数据值(一行)有两种方法:
1. 从一个 Datum 值数组里制作
2. 从一个可以传递给该行的字段类型的输入转换函数的C字符串数组里制作。
上述无论使用哪种方式,都需要先为行结构获取或者制作一个 TupleDesc 描述符。在使用 Datums 的时候,给 BlessTupleDesc 传递这个 TupleDesc 然后为每行调用 heap_form_tuple。在使用C字符串的时候,给 TupleDescGetAttlnMetadata 传递 TupleDesc,然后为每行调用 BuildTupleFromCStrings。 

设置所需要的 TupleDesc:

TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo,
                                   Oid *resultTypeId,
                                   TupleDesc *resultTupleDesc)

把传递给调用函数自己的fcinfo传递给它(要求使用版本-1 的调用习惯)。 resultTypeId可以声明为NULL或者 接收函数的结果类型OID的局部变量地址(指针)。 resultTupleDesc应该是一个局部的TupleDesc变量地址(指针)。检查结果是否TYPEFUNC_COMPOSITE;如是,resultTupleDesc就已经填充好需要的TupleDesc了。 如果不是,你可以报告一个类似"返回记录的函数在一个不接受记录的环境中被调用"的错误。

基于类型 OID 获取一个TupleDesc:
TupleDesc TypeGetTupleDesc(Oid typeoid, List *colaliases)
它可以用于给一个基本类型或者一个复合类型获取TupleDesc。 不过它不能处理返回record的函数,并且不能解析多态的类型。

一旦有了一个 TupleDesc,那么调用:
TupleDesc BlessTupleDesc(TupleDesc tupdesc)

制作一个 HeapTuple:
HeapTuple heap_form_tuple(TupleDesc tupdesc, Datum *values, bool *isnull)
它把数据以 Datum 的形式交给用户。

当使用C字符串时,使用:
HeapTuple BuildTupleFromCStrings(AttInMetadata *attinmeta, char **values)
制作一个 HeapTuple,以C字符串的形式给出用户数据。values是一个 C 字符串的数组,返回行的每个字段对应其中一个。 每个 C 字符串都应该是字段数据类型的输入函数预期的形式。 为了从其中一个字段中返回一个NULL, values数组中对应的指针应该设置为NULL。 这个函数将会需要为你返回的每个行调用一次。

一旦制作了一个从你的函数返回的行,那么该行必须转换成一个 Datum,使用:
HeapTupleGetDatum(HeapTuple tuple)
把一个 HeapTuple 转换为一个有效的 Datum。如果只返回一行,那么这个 Datum 可以用于直接返回,或者是它可以用作在一个返回集合的函数里的当前返回值。

#include "postgres.h"
#include "fmgr.h"
#include "utils/array.h"
#include "catalog/pg_type.h"

Datum
c_reverse_tuple(PG_FUNCTION_ARGS){
    HeapTupleHeader th;
    int32 a,b,c;
    bool aisnull,bisnull,cisnull;
    
    TupleDesc resultTupleDesc;
    oid resultTypeId;
    Datum retvals[4];
    bool retnulls[4];
    HeapTuple rettuple;
    
    //get the tuple header of 1st argument
    th = PG_GETARG_HEAPTUPLEHEADER(0);
    //get argument Datum's and convert them to int32
    a = DatumGetInt32(GetAttributeByName(th,"a",&aisnull));
    b = DatumGetInt32(GetAttributeByName(th,"b",&bisnull));
    c = DatumGetInt32(GetAttributeByName(th,"c",&cisnull));
    
    //debug: report the extracted field values
    ereport(INFO,(errmsg("arg: (a: %d,b: %d,c: %d)",a,b,c)));
    
    //set up tuple descriptos for result info
    get_call_result_type(fcinfo,&resultTypeId,&resultTupleDesc);
    //check that SQL function definition is set up to return arecord
    Assert(resultTypeId == TYPEFUNC_COMPOSITE);
    //make the tuple descriptor known to postgres as valid return type
    BlessTupleDesc(resultTupleDesc);
    
    retvals[0] = Int32GetDatum(c);
    retvals[1] = Int32GetDatum(b);
    retvals[2] = Int32GetDatum(a);
    retvals[3] = Int32GetDatum(retvals[0]*resvals[1]+retvals[2]);
    
    retnulls[0] = aisnull;
    retnulls[1] = bisnull;
    retnulls[2] = cisnull;
    retnulls[3] = aisnull || bisnull || cisnull;
    
    rettuple = heap_form_tuple(resultTupleDesc,retvals,retnulls);
    
    PG_RETURN_DATUM(HeapTupleGetDatum(rettuple));
} 

返回集合(多行)

一个返回集合的函数(SRF)通常为它返回的每个项都调用一次。因此SRF必须保存足够的状态用于记住它正在做的事情以及每次调用的时候返回下一个项。表函数API提供了 FuncCallContext 结构用于帮助控制这个过程。 fcinfo->flinfo->fn_extra 用于保存一个跨越多次调用的指向 FuncCallContext 的指针。

typedef struct
{
    
 /*
     * 前面已经被调用的次数
     * 初始的时候,call_cntr 被 SRF_FIRSTCALL_INIT() 置为 0,
 *并且每次你调用 SRF_RETURN_NEXT() 的时候都递增
     */
    uint32 call_cntr;
    
/*
     * 可选的最大调用数量
     * 这里的 max_calls 只是为了方便,设置它也是可选的。
     * 如果没有设置,你必须提供可选的方法来知道函数何时结束。
     */ 
    uint32 max_calls;
     
/*
     * 指向结果槽位的可选指针
     * 这个数据类型已经过时,只用于向下兼容。也就是那些使用已废弃的TupleDescGetSlot()的用户定义 SRF
     */
    TupleTableSlot *slot;

/*
     * 可选的指向用户提供的杂项环境信息的指针
     * user_fctx 用做一个指向你自己的结构的指针,包含任意提供给你的函数的调用间的环境信息
     */
    void *user_fctx;
    
 
 /*
     * 可选的指向包含属性类型输入元信息的结构数组的指针
     * attinmeta 用于在返回行的时候(也就是说返回复合数据类型)
     * 在只返回基本(也就是标量)数据类型的时候并不需要。
     * 只有在你准备用 BuildTupleFromCStrings() 创建返回行的时候才需要它。
     */
 
    AttInMetadata *attinmeta;

 /*
     * 用于必须在多次调用间存活的结构的内存环境
     * multi_call_memory_ctx 是由 SRF_FIRSTCALL_INIT() 为你设置的,并且由 SRF_RETURN_DONE() 用于清理。
     * 它是用于存放任何需要跨越多次调用 SRF 之间重复使用的内存。
     */
    MemoryContext multi_call_memory_ctx;
    
/*
     * 可选的指针,指向包含行描述的结构
     * tuple_desc 用于返回行(也就是说复合数据类型)并且只是在你想使用 heap_form_tuple() 而不是 BuildTupleFromCStrings() 制作行的时候需要。
     * 请注意这里存储的 TupleDesc 指针通常应该先用 BlessTupleDesc() 处理。
     */
    TupleDesc tuple_desc;

} FuncCallContext;

一个SRF使用自动操作 FuncCallContext 结构(可以通过 fn_extra找到)的若干个函数和宏,使用:
SRF_IS_FIRSTCALL()
判断函数是不是第一次调用还是后继的调用,只有在第一次调用的时候,使用:
SRF_FIRSTCALL_INIT()
初始化 FuncCallContext。在每次函数调用时(包括第一次),使用:
SRF_PERCALL_SETUP()
上述函数是为 FuncCallContext 做恰当的设置以及清理任何前面的轮里面剩下的已返回的数据。

如果函数有数据要返回,使用:
SRF_RETURN_NEXT(funcctx, result)

返回给调用者(result 必须是个 Datum,要么是单个值,要么是想前面介绍的那样准备的行)。最后,如果函数结束了数据返回,使用:
SRF_RETURN_DONE(funcctx)
清理并结束SRF。

在 SRF 被调用时的内存环境是一个临时环境,在调用之前将会被清理掉。这意味着不需要 pfree 所有 palloc 的东西,它会自动消失。不过,如果想分配任何跨越调用存在的数据结构,那就需要把它们放在其他什么地方。被 multi_call_memory_ctx 引用的环境适合用于保存那些需要知道SRF结束前都存活的数据。在大多数情况下,这意味着在第一次调用设置的时候应该切换到 multi_call_meory_ctx。

#include "postgres.h"
#include "executor/executor.h"  /* for GetAttributeByName() */
#include "funcapi.h"

PG_MODULE_MAGIC;

PG_FUNCTION_INFO_V1(retcomposite);

Datum
retcomposite(PG_FUNCTION_ARGS)
{
    FuncCallContext     *funcctx;
    int                  call_cntr;
    int                  max_calls;
    TupleDesc            tupdesc;
    AttInMetadata       *attinmeta;

    /* 只是在第一次调用函数的时候干的事情 */
    if (SRF_IS_FIRSTCALL())
    {
        MemoryContext   oldcontext;

        /* 创建一个函数环境,用于在调用间保持住 */
        funcctx = SRF_FIRSTCALL_INIT();

        /* 切换到适合多次函数调用的内存环境 */
        oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

        /* 要返回的行总数 */
        funcctx->max_calls = PG_GETARG_UINT32(0);

        /* 为了结果类型制作一个行描述 */
        if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
            ereport(ERROR,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("function returning record called in context "
                            "that cannot accept type record")));

        /*
         * 生成稍后从裸 C 字符串生成行的属性元数据
         */
        attinmeta = TupleDescGetAttInMetadata(tupdesc);
        funcctx->attinmeta = attinmeta;

        MemoryContextSwitchTo(oldcontext);
    }

    /* 每次函数调用都要做的事情 */
    funcctx = SRF_PERCALL_SETUP();

    call_cntr = funcctx->call_cntr;
    max_calls = funcctx->max_calls;
    attinmeta = funcctx->attinmeta;

    if (call_cntr < max_calls)    /* 在还有需要发送的东西时继续处理 */
    {
        char       **values;
        HeapTuple    tuple;
        Datum        result;

        /*
         * 准备一个数值数组用于版本的返回行
         * 它应该是一个C字符串数组,稍后可以被合适的类型输入函数处理。
         */

        values = (char **) palloc(3 * sizeof(char *));
        values[0] = (char *) palloc(16 * sizeof(char));
        values[1] = (char *) palloc(16 * sizeof(char));
        values[2] = (char *) palloc(16 * sizeof(char));

        snprintf(values[0], 16, "%d", 1 * PG_GETARG_INT32(1));
        snprintf(values[1], 16, "%d", 2 * PG_GETARG_INT32(1));
        snprintf(values[2], 16, "%d", 3 * PG_GETARG_INT32(1));

        /* 制作一个行 */
        tuple = BuildTupleFromCStrings(attinmeta, values);

        /* 把行做成 datum  */
        result = HeapTupleGetDatum(tuple);

        /* 清理(这些实际上并非必要) */
        pfree(values[0]);
        pfree(values[1]);
        pfree(values[2]);
        pfree(values);

        SRF_RETURN_NEXT(funcctx, result);
    }
    else    /* 在没有数据残留的时候干的事情 */
    {
        SRF_RETURN_DONE(funcctx);
    }
}

申明函数

CREATE TYPE __retcomposite AS (f1 integer, f2 integer, f3 integer);

CREATE OR REPLACE FUNCTION retcomposite(integer, integer)
    RETURNS SETOF __retcomposite
    AS '/home/gpdba/sql_prac/Greenplum/functions/c/35.9.9', 'retcomposite'
    LANGUAGE C IMMUTABLE STRICT;

另一种方法
CREATE OR REPLACE FUNCTION retcomposite(IN integer, IN integer,
    OUT f1 integer, OUT f2 integer, OUT f3 integer)
    RETURNS SETOF record
    AS '/home/gpdba/sql_prac/Greenplum/functions/c/35.9.9', 'retcomposite'
    LANGUAGE C IMMUTABLE STRICT;

多态参数和返回类型

C 语言函数可以声明为接受和返回多态的类型anyelement,anyarray, anynonarray, anyenum和anyrange。 参阅第 35.2.5 节获取有关多态函数的更详细解释。 如果函数参数或者返回类型定义为多态类型, 那么函数的作者就无法预先知道他将收到的参数,以及需要返回的数据。 在fmgr.h里有两个过程,可以让版本-1 的 C 函数知道它的参数的确切数据类型以及 它需要返回的数据类型。这两个过程叫get_fn_expr_rettype(FmgrInfo *flinfo)和 get_fn_expr_argtype(FmgrInfo *flinfo, int argnum)。 它们返回结果或者参数的类型 OID,如果这些信息不可获取,则返回 InvalidOid 。 结构flinfo通常是以fcinfo->flinfo进行访问的。 参数argnum是以 0 为基的。 get_call_result_type也可以替代get_fn_expr_rettype。 还有get_fn_expr_variadic用于找出可变参数是否已经合并到了数组中。 对于VARIADIC "any"函数是最有用的, 因为这样的合并总是发生在可变参数接受普通数组类型的时候。

写一个函数接受任意类型的一个元素,并且返回该类型的一个一维数组:

#include "postgres.h"
#include "executor/executor.h"  /* for GetAttributeByName() */
#include "funcapi.h"
#include "utils/array.h"

PG_MODULE_MAGIC;

PG_FUNCTION_INFO_V1(make_array);
Datum
make_array(PG_FUNCTION_ARGS)
{
    ArrayType  *result;
    Oid         element_type = get_fn_expr_argtype(fcinfo->flinfo, 0);
    Datum       element;
    bool        isnull;
    int16       typlen;
    bool        typbyval;
    char        typalign;
    int         ndims;
    int         dims[MAXDIM];
    int         lbs[MAXDIM];

    if (!OidIsValid(element_type))
        elog(ERROR, "could not determine data type of input");


    /* 获取提供的元素(要小心其为NULL的情况) */
    isnull = PG_ARGISNULL(0);
    if (isnull)
        element = (Datum) 0;
    else
        element = PG_GETARG_DATUM(0);

    /* 维数是1 */
    ndims = 1;
    /* 有1个元素 */
    dims[0] = 1;
    /* 数组下界是1 */

    lbs[0] = 1;

    /* 获取有关元素类型需要的信息 */
    get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign);

    /* 然后制作数组 */
    result = construct_md_array(&element, &isnull, ndims, dims, lbs,
                                element_type, typlen, typbyval, typalign);

    PG_RETURN_ARRAYTYPE_P(result);
}

申明函数

CREATE FUNCTION make_array(anyelement) RETURNS anyarray
    AS 'DIRECTORY/funcs', 'make_array'
    LANGUAGE C IMMUTABLE;

35.9. C-语言函数 

使用SPI

spi用来在自定义函数中运行sql查询。下面是一个完整的例子(在Greenplum中)。

#include "postgres.h"
#include "fmgr.h"
#include "executor/spi.h"
#include "utils/builtins.h"

#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif

PG_FUNCTION_INFO_V1(execq);
Datum
execq(PG_FUNCTION_ARGS)
{
    char *command;
    int ret,cnt;
    uint64 proc;
    
    /* Convert given text object to a C string */
    command = text_to_cstring(PG_GETARG_TEXT_P(0));
    cnt = PG_GETARG_INT32(1);
    
    SPI_connect();
    
    ret = SPI_exec(command, cnt);
    
    proc = SPI_processed;
    /*
     * If some rows were fetched, print them via elog(INFO).
     */
    if (ret > 0 && SPI_tuptable != NULL)
    {
        TupleDesc tupdesc = SPI_tuptable->tupdesc;
        SPITupleTable *tuptable = SPI_tuptable;
        char buf[8192];
        uint64 j;
        
        for (j = 0; j < proc; j++)
        {
            HeapTuple tuple = tuptable->vals[j];
            int i;
            
            for (i = 1, buf[0] = 0; i <= tupdesc->natts; i++)
                snprintf(buf + strlen (buf), sizeof(buf) - strlen(buf), " %s%s",
                        SPI_getvalue(tuple, tupdesc, i), 
                        (i == tupdesc->natts) ? " " : " |");
            //elog(INFO, "EXECQ: %s", buf);
            ereport(INFO,(errmsg("ROW: %s",buf)));
        }   
    }   
    
    SPI_finish();
    pfree(command);

    PG_RETURN_INT32(proc);
}

编译

cc -fpic -c spi_exeq.c -I/home/gpdba/greenplum/include/postgresql/server;
cc -shared -o spi_exeq.so spi_exeq.o;

把so文件分发到各节点
scp spi_exeq.so gpdba@node1:/home/gpdba/sql_prac/Greenplum/functions/c;
scp spi_exeq.so gpdba@node2:/home/gpdba/sql_prac/Greenplum/functions/c;
scp spi_exeq.so gpdba@node3:/home/gpdba/sql_prac/Greenplum/functions/c;
scp spi_exeq.so gpdba@node5:/home/gpdba/sql_prac/Greenplum/functions/c;

申明函数

CREATE or replace FUNCTION execq(text, integer) RETURNS int8
    AS '/home/gpdba/sql_prac/Greenplum/functions/c/spi_exeq','execq'
        LANGUAGE C STRICT;

使用

testDB=# SELECT execq('CREATE TABLE a (x integer)', 0);
NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Greenplum Database data distribution key for this table.
HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
CONTEXT:  SQL statement "CREATE TABLE a (x integer)"
 execq 
-------
     0
(1 row)
testDB=# \d
                List of relations
 Schema |   Name    |   Type   | Owner | Storage 
--------+-----------+----------+-------+---------
 public | a         | table    | gpdba | heap
testDB=#  INSERT INTO a VALUES (execq('INSERT INTO a VALUES (0)', 0));
INSERT 0 1
testDB=# SELECT execq('INSERT INTO a SELECT x + 2 FROM a', 1);
 execq 
-------
     2
(1 row)

testDB=#  SELECT execq('SELECT * FROM a', 10);
INFO:  ROW:  1
INFO:  ROW:  2
INFO:  ROW:  3
INFO:  ROW:  0
 execq 
-------
     4
(1 row)

testDB=# DELETE FROM a;
DELETE 4
testDB=#  INSERT INTO a VALUES (execq('SELECT * FROM a', 0) + 1);
INSERT 0 1
testDB=#  SELECT * FROM a;
 x 
---
 1
(1 row)

testDB=#  INSERT INTO a VALUES (execq('SELECT * FROM a', 0) + 1);
INFO:  ROW:  1
INSERT 0 1
testDB=# SELECT * FROM a;
 x 
---
 1
 2
(2 rows)

46.5. 例子