流(foreign table)

流是一种允许客户端将时序数据写入 流视图 的抽象管道。流里面的一行数据(或者简单称作 event),与数据表中的行数据是很相似的,并且二者的写入也是完全一致的。然而,流和数据表的语义是完全不同的。

换言之,event 只会在被所有的 流视图 消费完之前“存在”于流中。即使这样,用户仍然不能直接从流中 SELECT 数据。流唯一的作用就是充当 流视图 的输入。

流在PipelineDB中是作为 pipelinedb 外部服务 管理下的 外部表 存在的。创建外部表的语法跟创建普通的PostgreSQL数据表类似:

CREATE FOREIGN TABLE stream_name ( [
   { column_name data_type [ COLLATE collation ] } [, ... ]
] )
SERVER pipelinedb;
stream_name
要创建的流的名称
column_name
新表中列的名称
data_type
列的数据类型,可以声明为数组。您可以查看 内置函数PostgreSQL类型支持 了解更多PipelineDB支持的数据类型。
COLLATE collation
COLLATE 子句可以给指定列(必须是可排序的数据类型)赋予排序规则。若未指定,则使用默认的排序规则。

可以通过 code:ALTER STREAM 给流添加列:

postgres=# ALTER FOREIGN TABLE stream ADD COLUMN x integer;
ALTER FOREIGN TABLE

注解

流中的列是不可删除的。

可以通过 DROP FOREIGN TABLE 指令删除流。下面是一个基于流创建流视图的例子。

postgres=# CREATE FOREIGN TABLE stream (x integer, y integer) SERVER pipelinedb;
CREATE FOREIGN TABLE
postgres=# CREATE VIEW v AS SELECT sum(x + y) FROM stream;
CREATE VIEW

写入数据到流

INSERT写入

使用PostgreSQL的 INSERT 语句就可以向流中写入数据,语法如下:

INSERT INTO stream_name ( column_name [, ...] )
  { VALUES ( expression [, ...] ) [, ...] | query }

query 是一个 SELECT 查询。

下面是一些示例:

可以向流中写入单条数据:

INSERT INTO stream (x, y, z) VALUES (0, 1, 2);
INSERT INTO json_stream (payload) VALUES (
  '{"key": "value", "arr": [92, 12, 100, 200], "obj": { "nested": "value" } }'
);

也可以批量写入以提高性能:

INSERT INTO stream (x, y, z) VALUES (0, 1, 2), (3, 4, 5), (6, 7, 8)
(9, 10, 11), (12, 13, 14), (15, 16, 17), (18, 19, 20), (21, 22, 23), (24, 25, 26);

同时也支持在写入时使用表达式。

INSERT INTO geo_stream (id, coords) VALUES (42, a_function(-72.09, 41.40));

INSERT INTO udf_stream (result) VALUES (my_user_defined_function('foo'));

INSERT INTO str_stream (encoded, location) VALUES
  (encode('encode me', 'base64'), position('needle' in 'haystack'));

INSERT INTO rad_stream (circle, sphere) VALUES
  (pi() * pow(11.2, 2), 4 / 3 * pi() * pow(11.2, 3));

-- Subselects into streams are also supported
INSERT INTO ss_stream (x) SELECT generate_series(1, 10) AS x;

INSERT INTO tab_stream (x) SELECT x FROM some_table;

预写入

可以通过预写入来减小流写入时的网络负载:

PREPARE write_to_stream AS INSERT INTO stream (x, y, z) VALUES ($1, $2, $3);
EXECUTE write_to_stream(0, 1, 2);
EXECUTE write_to_stream(3, 4, 5);
EXECUTE write_to_stream(6, 7, 8);

COPY写入

也可以通过 COPY 的方式将文件写入到流中:

COPY stream (data) FROM '/some/file.csv'

COPY 在向流中回填归档数据时是很有用的。下面的指令演示了如何将S3中压缩的归档数据写入到PipelineDB:

aws s3 cp s3://bucket/logfile.gz - | gunzip | pipeline -c "COPY stream (data) FROM STDIN"

其它客户端

鉴于PipelineDB是PostgreSQL的插件,可以借助任何PostgreSQL客户端向流中写入数据(可能大多数SQL数据库的客户端可可以),所以没有必要手动构建流的写入。您可以查看 客户端 来具体了解。

Output Streams

流输出让我们可以将流数据的变化动态、增量地更新到流视图或流转换。流输出同普通的PipelineDB流一样,可以成为流视图或流转换的数据源,它可以在流视图或流转换中通过 output_of 函数进行调用。

在流视图中,流输出中的每行数据都包含 oldnew 元组,用以体现流视图的变化。如果执行了 写入old 元组为空,如果是 删除,(目前只可能发生出现在滑动窗口元组超过窗口范围的情况下) new 元组为空。

下面通过操作示例来说明这些概念。创建一个简单的流视图,只对流中的一列进行求和:

CREATE VIEW v_sum AS SELECT sum(x) FROM stream;

设计一个场景:在流输出每次触发sum的时候,若sum的变化超过10,则将这次增量记录下来。我们可以创建另一个以 v_sum 的流输出为数据源的流视图来轻松完成这个构想:

CREATE VIEW v_deltas AS SELECT abs((new).sum - (old).sum) AS delta
  FROM output_of('v_sum')
  WHERE abs((new).sum - (old).sum) > 10;

注解

oldnew 元组必须用括号包起来。

查看 流转换输出到流 了解更多流输出在流转换中的应用。

基于滑动窗口的流输出

对于不存在滑动窗口的流视图,流输出只是在流视图的结果发生变化时执行写入操作。然而基于滑动窗口的流视图的结果也是依赖时间的,它的流输出会自动随着时间写入到结果中。也就是说,即使没有新的写入,基于滑动窗口的流视图的结果也会更新。

Delta Streams

在写入到流视图输出中 oldnew 元组之外还有一个 delta 元组也会被提交到流视图的变化中。delta 元组中包含 old 元组 和 old 元组的“差值”。在sum这类聚合操作中,delta 就只是 (new).sum - (old).sum 的标量,就像前面示例中的那样。

代码及结果如下:

postgres=# CREATE VIEW v AS SELECT COUNT(*) FROM stream;
CREATE VIEW
postgres=# CREATE VIEW v_real_deltas AS SELECT (delta).sum FROM output_of('v');
CREATE VIEW
postgres=# INSERT INTO stream (x) VALUES (1);
INSERT 0 1
postgres=# SELECT * FROM v_real_deltas;
sum
-----
   1
(1 row)
postgres=# INSERT INTO stream (x) VALUES (2);
INSERT 0 1
postgres=# INSERT INTO stream (x) VALUES (3);
INSERT 0 1
postgres=# SELECT * FROM v_real_deltas;
sum
-----
   1
   2
   3
(3 rows)

如您所见,v_real_deltas 记录了每次插入后值的变化,只是 sum 这种操作比较简单。 delta 最吸引人之处是支持所有的聚合,甚至能与 组合 一起使用来高效聚合不同粒度和分组下的流视图。

来看一个更有趣的例子,有一个计算每分钟去重后用户数的流视图:

CREATE VIEW uniques_1m AS
  SELECT minute(arrival_timestamp) AS ts, COUNT(DISTINCT user_id) AS uniques
FROM s GROUP BY ts;

为了归档以及提升性能,我们想在每个固定的周期后将流视图中的数据聚合为小时粒度。显然我们不可能通过将每分钟的 COUNT(DISTINCT) 相加来得到一个小时的 COUNT(DISTINCT),因为这种操作需要整个时间段内的明细。但是我们将流视图中分钟级别的 delta 组合 成小时级别的数据。

CREATE VIEW uniques_hourly AS
  SELECT hour((new).ts) AS ts, combine((delta).uniques) AS uniques
FROM output_of('uniques_1m') GROUP BY ts;

uniques_hourly 流视图目前包含小时级别的去重用户数,并且 结果也同基于原始数据计算小时粒度的去重用户数完全一致

pipelinedb.stream_targets

有时您可能只想更新基于同一个流的多个流视图中的一个或几个,可以临时调整 pipelinedb.stream_targets 来修改流的目标视图,这样结算结果就只会更新到指定的流视图中:

postgres=# CREATE VIEW v0 AS SELECT COUNT(*) FROM stream;
CREATE VIEW
postgres=# CREATE VIEW v1 AS SELECT COUNT(*) FROM stream;
CREATE VIEW
postgres=# INSERT INTO stream (x) VALUES (1);
INSERT 0 1
postgres=# SET pipelinedb.stream_targets TO v0;
SET
postgres=# INSERT INTO stream (x) VALUES (1);
INSERT 0 1
postgres=# SET pipelinedb.stream_targets TO DEFAULT;
SET
postgres=# INSERT INTO stream (x) VALUES (1);
INSERT 0 1
postgres=# SELECT count FROM v0;
 count
-------
     3
(1 row)

postgres=# SELECT count FROM v1;
 count
-------
     2
(1 row)

postgres=#

Arrival Ordering

您可以将 arrival ordering 用于 event 排序,也就是说 event 在到达PipelineDB服务时都带有一个 arrival_timestamp 属性,这个属性可以在 流视图 中诸如 滑动窗口 之类的操作中起辅助作用 。

Event Expiration

event 在到达PipelineDB时带有一个对应所有需要读取该 event 的流视图的位图。当一个流视图读完 event 后,当某个流视图读完该 event 后,它对应的那一位会从0变成1。当整个位图都变成1后,该 event 会被销毁。


现在您已经知道了 流视图 的相关信息以及如何向流中写入数据,是时候学习PipelineDB丰富的 内置函数 了!