流视图¶
流视图(continuous view)是PipelineDB的基础概念抽象。流视图跟普通的数据库视图非常相似,但它是将流和表中的数据组合后作为输入并进行实时增量更新。
流数据一旦被流视图读取后就会被销毁,流数据不会存储在任何地方。只有诸如 SELECT * FROM that_view
查询返回的结果才会被持久化,也就是说,流视图可以被视为高吞吐量、实时的物化视图。
创建流视图¶
流视图可以通过将PostgreSQL的 action
参数设为 materialize
来创建。语法如下:
CREATE VIEW name [WITH (action=materialize [, ...])] AS query
注解
action
默认值为 materialize
,创建流视图时可以省略 action
参数。只要对流视图执行了 select from 操作,PipelineDB就会为 CREATE VIEW
赋值 action=materialize
。
query 是PostgreSQL SELECT
语句的子集:
SELECT [ DISTINCT [ ON ( expression [, ...] ) ] ]
expression [ [ AS ] output_name ] [, ...]
[ FROM from_item [, ...] ]
[ WHERE condition ]
[ GROUP BY expression [, ...] ]
where from_item can be one of:
stream_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
table_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
from_item [ NATURAL ] join_type from_item [ ON join_condition ]
注解
本节提到了流的概念,流跟数据表类似,并且是流视图和流转换 FROM
子句的数据源。关于流的解释,在 流(foreign table) 一节中有更详细的解释,您目前可以认为流是只支持添加(append-only)的数据表。
expression
- output_name
- 一个可选的表达式标识符。
- condition
- 任何结果为
boolean
类型的表达式。任何不满足条件的数据都会被从输出中过滤掉。如果原始数据在替换变量引用后的结果为true
,则认为其满足条件。
注解
上述说明只涉及到了创建流视图的语法。要了解更多关于上述查询的语义细节,请查看 PostgreSQL SELECT 文档。
截断(truncate)流视图¶
通过 truncate_continuous_view
指令删除流视图中的所有所有数据,保留流视图本身。使用方法如下:
SELECT truncate_continuous_view('name');
这个指令将有效地移除流视图中的所有行式数据,与 PostgreSQL截断 指令类似。
数据检索¶
由于流视图跟普通的数据很类似,检索数据只需要执行 SELECT
即可:
SELECT * FROM some_continuous_view
user | event_count |
---|---|
a | 10 |
b | 20 |
c | 30 |
任何 SELECT
语句在流视图中都是有效的,这让您可以基于持续更新的内容进行更加精细化的分析:
SELECT t.name, sum(v.value) + sum(t.table_value) AS total
FROM some_continuous_view v JOIN some_table t ON v.id = t.id GROUP BY t.name
name | total |
---|---|
usman | 10 |
jeff | 20 |
derek | 30 |
存活时间(TTL)¶
PipelineDB可以通过指定一个时间类型列并且设置存活时间(TTL)来清理流视图中的过期数据。
存活时间(TTL)可以通过 ttl
和 ttl_column
参数来设定。过期数据会在 “reaper” 阶段被 DELETE
。任何 ttl_column
值小于 当前时间减去 ttl
的数据都会被删除。下面的指令创建了一个过期时间为 1个月 的流视图:
CREATE VIEW v_ttl WITH (ttl = '1 month', ttl_column = 'minute') AS
SELECT minute(arrival_timestamp), COUNT(*) FROM some_stream GROUP BY minute;
要注意的是,TTL只会暗示数据库执行 reaper 操作,不保证数据在过期时被立刻物理删除。
如果您想保已过期的数据不被读到,您应该在查询时通过 WHERE
子句来对数据进行过滤。
修改TTL¶
可以通过 pipelinedb.set_ttl 添加、修改和移除TTL:
pipelinedb.set_ttl ( cv_name, ttl, ttl_column )
Update the given continuous view’s TTL with the given paramters. ttl is an interval expressed as a string (e.g.
'1 day'
), and ttl_column is the name of a timestamp-based column.Passing
NULL
for both the ttl and ttl_column parameters will effectively remove a TTL from the given continuous view. Note that a TTL cannot be modified on or removed from a sliding-window continuous view.
激活和中止¶
由于流视图会持续处理流数据,所以在不停止PipelineDB的时候启动和停止这个处理过程是很实用的。比如,一个流视图造成了未预估的系统负载或抛出异常,我们很需要在问题修复前让流视图暂时停止工作。
activate
和 deactivate
函数提供了上述粒度的控制,就像”播放”和”pause”。当流视图处于 active 状态,它们将实时读取流数据并对数据进行增量更新。相反,处于 inactive 的流视图将会停止前面的工作。
函数只需要传入一个流视图或流转换的名称:
SELECT pipelinedb.activate('continuous_view_or_transform');
SELECT pipelinedb.deactivate('continuous_view_or_transform');
流转换 也可以被激活或中止。
重要
当流查询(视图或转换)处于中止(inactive)状态的这段时间里,所有写入到流(foreign table)中的数据将不会再被读到,即使流查询被重新激活(activate)。
查看 操作 获取更多信息。
示例¶
让我们通过下列示例来学习每个流视图完成的功能。
重要
需要明确理解的一点是:PipelineDB只会将诸如 SELECT * FROM my_cv
的查询结果以及少量的源信息进行持久化存储。这是一个比较新的概念,但却是流视图的灵魂!
值得注意的是,即使流视图读取了流中数以万亿计的数据,经过分组聚合后,在PipelineDB中可能也只存在1条行数据。
CREATE VIEW avg_of_forever AS SELECT AVG(x) FROM one_trillion_events_stream;
referrer的UV
CREATE VIEW uniques AS
SELECT date_trunc('day', arrival_timestamp) AS day,
referrer, COUNT(DISTINCT user_id)
FROM users_stream GROUP BY day, referrer;
线性回归
CREATE VIEW lreg AS
SELECT date_trunc('minute', arrival_timestamp) AS minute,
regr_slope(y, x) AS mx,
regr_intercept(y, x) AS b
FROM datapoints_stream GROUP BY minute;
最近5分钟的广告访问量
CREATE VIEW imps AS
SELECT COUNT(*) FROM imps_stream
WHERE (arrival_timestamp > clock_timestamp() - interval '5 minutes');
服务请求延时的90、95和99分位数
CREATE VIEW latency AS
SELECT percentile_cont(array[90, 95, 99]) WITHIN GROUP (ORDER BY latency)
FROM latency_stream;
希望您能喜欢关于流视图的说明!接下来,您可能需要了解 流(foreign table) 是如何工作的。