流转换¶
流转换可以在不存储时序的情况下对其进行实时转换。由于数据不存储数据,所以流转换不支持聚合操作。转换后的数据既可以作为另一个流的输入,也可以写入到外部数据存储中。
创建流转换¶
通过将 action
赋值为 transform
来声明流转换,下面是创建流转换的语法:
CREATE VIEW name (WITH action=transform [, outputfunc=function_name( arguments ) ]) AS query
query 是一个PostgreSQL SELECT
报表。
SELECT expression [ [ AS ] output_name ] [, ...]
[ FROM from_item [, ...] ]
[ WHERE condition ]
[ GROUP BY expression [, ...] ]
where any expression in the SELECT statement can't contain an aggregate and
from_item can be one of:
stream_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
table_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
from_item [ NATURAL ] join_type from_item [ ON join_condition ]
function_name 是一个用户传入的函数,它的返回类型为 trigger
,并且会作用到流转换的每一行输出上。
arguments 是一系列逗号分隔的参数,在触发器执行时传给函数,只能为字符串常量。
注解
您可以将流转换视为作用在流上每一行输出上的 触发器 函数,它会在内部以 AFTER INSERT FOR EACH ROW
的形式执行,所以不存在流转换中不会包含行输出的 OLD
和 NEW
行数据。
流转换输出到流¶
所有流转换都有与其对应的 输出流,这使得可以很容易地被其它流转换或流视图读到。流转换的输出流只包含被select的内容。
下面是将流转换同数据表进行join的例子:
CREATE VIEW t WITH (action=transform) AS
SELECT t.y FROM some_stream s JOIN some_table t ON s.x = t.x;
这个流转换将join后的结果写入到其对应的输出流中,可以通过 output_of
读到输出流中的数据:
CREATE VIEW v WITH (action=materialize) AS
SELECT sum(y) FROM output_of('t');
内置流转换输出函数¶
为了给流转换输出提供比输出流更高的灵活性,PipelineDB提供了一个基于触发器函数,用于处理转换流数据的接口。这个接口可以对接受的数据执行任意操作,包括写入到其它流中。
目前为止,PipelineDB只提供了1个内置触发器函数 pipelinedb.insert_into_stream
,它可以同流转换一起使用,将流转换的输出写入到参数列表中的各个流。用法如下:
CREATE VIEW t WITH (action=transform, outputfunc=pipelinedb.insert_into_stream('even_stream)) AS
SELECT x, y FROM stream WHERE mod(x, 2) = 0;
流转换将 x
为偶数的 (x, y)
的结果写入到 even_stream
中。
重要
传入 pipelinedb.insert_into_stream
的参数必须是系统中已存在的流(foreign table)。
创建自定义转换流输出函数¶
您也可以创建作用于流转换的自定义输出函数。比如您想将输出写入数据表中,可以通过如下操作实现:
CREATE TABLE t (user text, value int);
CREATE OR REPLACE FUNCTION insert_into_t()
RETURNS trigger AS
$$
BEGIN
INSERT INTO t (user, value) VALUES (NEW.user, NEW.value);
RETURN NEW;
END;
$$
LANGUAGE plpgsql;
CREATE VIEW ct WITH (action=transform, outputfunc=insert_into_t) AS
SELECT user::text, value::int FROM stream WHERE value > 100;