流转换

流转换可以在不存储时序的情况下对其进行实时转换。由于数据不存储数据,所以流转换不支持聚合操作。转换后的数据既可以作为另一个流的输入,也可以写入到外部数据存储中。

创建流转换

通过将 action 赋值为 transform 来声明流转换,下面是创建流转换的语法:

CREATE VIEW name (WITH action=transform [, outputfunc=function_name( arguments ) ]) AS query

query 是一个PostgreSQL SELECT 报表。

SELECT expression [ [ AS ] output_name ] [, ...]
    [ FROM from_item [, ...] ]
    [ WHERE condition ]
    [ GROUP BY expression [, ...] ]

where any expression in the SELECT statement can't contain an aggregate and
from_item can be one of:

    stream_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
    table_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
    from_item [ NATURAL ] join_type from_item [ ON join_condition ]

function_name 是一个用户传入的函数,它的返回类型为 trigger,并且会作用到流转换的每一行输出上。

arguments 是一系列逗号分隔的参数,在触发器执行时传给函数,只能为字符串常量。

注解

您可以将流转换视为作用在流上每一行输出上的 触发器 函数,它会在内部以 AFTER INSERT FOR EACH ROW 的形式执行,所以不存在流转换中不会包含行输出的 OLDNEW 行数据。

删除流转换

您可以使用 DROP VIEW 指令来来删除流转换,语法如下:

DROP VIEW continuous_transform;

这将移除所有与转换流相关资源。

查看流转换

您可以通过如下查询来查看当前系统中所有的流转换任务:

SELECT * FROM pipelinedb.transforms;

流转换输出到流

所有流转换都有与其对应的 输出流,这使得可以很容易地被其它流转换或流视图读到。流转换的输出流只包含被select的内容。

下面是将流转换同数据表进行join的例子:

CREATE VIEW t WITH (action=transform) AS
  SELECT t.y FROM some_stream s JOIN some_table t ON s.x = t.x;

这个流转换将join后的结果写入到其对应的输出流中,可以通过 output_of 读到输出流中的数据:

CREATE VIEW v WITH (action=materialize) AS
  SELECT sum(y) FROM output_of('t');

内置流转换输出函数

为了给流转换输出提供比输出流更高的灵活性,PipelineDB提供了一个基于触发器函数,用于处理转换流数据的接口。这个接口可以对接受的数据执行任意操作,包括写入到其它流中。

目前为止,PipelineDB只提供了1个内置触发器函数 pipelinedb.insert_into_stream,它可以同流转换一起使用,将流转换的输出写入到参数列表中的各个流。用法如下:

CREATE VIEW t WITH (action=transform, outputfunc=pipelinedb.insert_into_stream('even_stream)) AS
  SELECT x, y FROM stream WHERE mod(x, 2) = 0;

流转换将 x 为偶数的 (x, y) 的结果写入到 even_stream 中。

重要

传入 pipelinedb.insert_into_stream 的参数必须是系统中已存在的流(foreign table)。

创建自定义转换流输出函数

您也可以创建作用于流转换的自定义输出函数。比如您想将输出写入数据表中,可以通过如下操作实现:

CREATE TABLE t (user text, value int);

CREATE OR REPLACE FUNCTION insert_into_t()
  RETURNS trigger AS
  $$
  BEGIN
    INSERT INTO t (user, value) VALUES (NEW.user, NEW.value);
    RETURN NEW;
  END;
  $$
  LANGUAGE plpgsql;

CREATE VIEW ct WITH (action=transform, outputfunc=insert_into_t) AS
  SELECT user::text, value::int FROM stream WHERE value > 100;