流视图

流视图(continuous view)是PipelineDB的基础概念抽象。流视图跟普通的数据库视图非常相似,但它是将流和表中的数据组合后作为输入并进行实时增量更新。

流数据一旦被流视图读取后就会被销毁,流数据不会存储在任何地方。只有诸如 SELECT * FROM that_view 查询返回的结果才会被持久化,也就是说,流视图可以被视为高吞吐量、实时的物化视图。

创建流视图

流视图可以通过将PostgreSQL的 action 参数设为 materialize 来创建。语法如下:

CREATE VIEW name [WITH (action=materialize [, ...])]  AS query

注解

action 默认值为 materialize,创建流视图时可以省略 action 参数。只要对流视图执行了 select from 操作,PipelineDB就会为 CREATE VIEW 赋值 action=materialize

query 是PostgreSQL SELECT 语句的子集:

SELECT [ DISTINCT [ ON ( expression [, ...] ) ] ]
    expression [ [ AS ] output_name ] [, ...]
    [ FROM from_item [, ...] ]
    [ WHERE condition ]
    [ GROUP BY expression [, ...] ]

where from_item can be one of:

    stream_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
    table_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
    from_item [ NATURAL ] join_type from_item [ ON join_condition ]

注解

本节提到了流的概念,流跟数据表类似,并且是流视图和流转换 FROM 子句的数据源。关于流的解释,在 流(foreign table) 一节中有更详细的解释,您目前可以认为流是只支持添加(append-only)的数据表。

expression

参考PostgreSQL 表达式分组查询说明
output_name
一个可选的表达式标识符。
condition
任何结果为 boolean 类型的表达式。任何不满足条件的数据都会被从输出中过滤掉。如果原始数据在替换变量引用后的结果为 true,则认为其满足条件。

注解

上述说明只涉及到了创建流视图的语法。要了解更多关于上述查询的语义细节,请查看 PostgreSQL SELECT 文档

删除(drop)流视图

使用 DROP VIEW 指令从系统中删除流视图,语法如下:

DROP VIEW name

这将从系统中移除流视图相关的所有资源。

截断(truncate)流视图

通过 truncate_continuous_view 指令删除流视图中的所有所有数据,保留流视图本身。使用方法如下:

SELECT truncate_continuous_view('name');

这个指令将有效地移除流视图中的所有行式数据,与 PostgreSQL截断 指令类似。

查看流视图

可以通过下列查询语句查看当前系统中的所有流视图:

SELECT * FROM pipelinedb.views;

数据检索

由于流视图跟普通的数据很类似,检索数据只需要执行 SELECT 即可:

SELECT * FROM some_continuous_view
user event_count
a 10
b 20
c 30

任何 SELECT 语句在流视图中都是有效的,这让您可以基于持续更新的内容进行更加精细化的分析:

SELECT t.name, sum(v.value) + sum(t.table_value) AS total
FROM some_continuous_view v JOIN some_table t ON v.id = t.id GROUP BY t.name
name total
usman 10
jeff 20
derek 30

存活时间(TTL)

PipelineDB可以通过指定一个时间类型列并且设置存活时间(TTL)来清理流视图中的过期数据。

存活时间(TTL)可以通过 ttlttl_column 参数来设定。过期数据会在 “reaper” 阶段被 DELETE。任何 ttl_column 值小于 当前时间减去 ttl 的数据都会被删除。下面的指令创建了一个过期时间为 1个月 的流视图:

CREATE VIEW v_ttl WITH (ttl = '1 month', ttl_column = 'minute') AS
  SELECT minute(arrival_timestamp), COUNT(*) FROM some_stream GROUP BY minute;

要注意的是,TTL只会暗示数据库执行 reaper 操作,不保证数据在过期时被立刻物理删除。

如果您想保已过期的数据不被读到,您应该在查询时通过 WHERE 子句来对数据进行过滤。

修改TTL

可以通过 pipelinedb.set_ttl 添加、修改和移除TTL:

pipelinedb.set_ttl ( cv_name, ttl, ttl_column )

Update the given continuous view’s TTL with the given paramters. ttl is an interval expressed as a string (e.g. '1 day'), and ttl_column is the name of a timestamp-based column.

Passing NULL for both the ttl and ttl_column parameters will effectively remove a TTL from the given continuous view. Note that a TTL cannot be modified on or removed from a sliding-window continuous view.

激活和中止

由于流视图会持续处理流数据,所以在不停止PipelineDB的时候启动和停止这个处理过程是很实用的。比如,一个流视图造成了未预估的系统负载或抛出异常,我们很需要在问题修复前让流视图暂时停止工作。

activatedeactivate 函数提供了上述粒度的控制,就像”播放”和”pause”。当流视图处于 active 状态,它们将实时读取流数据并对数据进行增量更新。相反,处于 inactive 的流视图将会停止前面的工作。

函数只需要传入一个流视图或流转换的名称:

SELECT pipelinedb.activate('continuous_view_or_transform');
SELECT pipelinedb.deactivate('continuous_view_or_transform');

流转换 也可以被激活或中止。

重要

当流查询(视图或转换)处于中止(inactive)状态的这段时间里,所有写入到流(foreign table)中的数据将不会再被读到,即使流查询被重新激活(activate)。

查看 操作 获取更多信息。

示例

让我们通过下列示例来学习每个流视图完成的功能。

重要

需要明确理解的一点是:PipelineDB只会将诸如 SELECT * FROM my_cv 的查询结果以及少量的源信息进行持久化存储。这是一个比较新的概念,但却是流视图的灵魂!

值得注意的是,即使流视图读取了流中数以万亿计的数据,经过分组聚合后,在PipelineDB中可能也只存在1条行数据。

CREATE VIEW avg_of_forever AS SELECT AVG(x) FROM one_trillion_events_stream;

referrer的UV

CREATE VIEW uniques AS
SELECT date_trunc('day', arrival_timestamp) AS day,
  referrer, COUNT(DISTINCT user_id)
FROM users_stream GROUP BY day, referrer;

线性回归

CREATE VIEW lreg AS
SELECT date_trunc('minute', arrival_timestamp) AS minute,
  regr_slope(y, x) AS mx,
  regr_intercept(y, x) AS b
FROM datapoints_stream GROUP BY minute;

最近5分钟的广告访问量

CREATE VIEW imps AS
  SELECT COUNT(*) FROM imps_stream
WHERE (arrival_timestamp > clock_timestamp() - interval '5 minutes');

服务请求延时的90、95和99分位数

CREATE VIEW latency AS
  SELECT percentile_cont(array[90, 95, 99]) WITHIN GROUP (ORDER BY latency)
FROM latency_stream;

希望您能喜欢关于流视图的说明!接下来,您可能需要了解 流(foreign table) 是如何工作的。