流聚合

PipelineDB最核心的追求之一就是 促进高性能的连续聚合,所以聚合函数毫无疑问是PipelineDB的核心功能。连续聚合在大多数通用场景是非常有用的,它使得PipelineDB中持久化的数据始终与写入的数据保持同步。它可以通过一定的硬件实现稳定和高吞度量的服务。

连续聚合是随着流视图新 event 的生成实时 增量更新的。对于如 countsum 之类的简单聚合,我们很容易理解结果是如何增量更新的–将新值累加到已有结果上而已。

但对更加复杂的聚合而言,如 avg, stddev, percentile_cont 等,需要更优化的架构来支持高效的增量更新,PipelineDB在内部自动实现了这些复杂的逻辑。

下面是所有PipelineDB支持的聚合函数的说明。有一些函数与标准的聚合函数有略微的差别以高效处理源源不断的流式数据,文中已经标注出了这部分函数的区别。

注解

阅读下面的说明前建议您想阅读 PostgreSQL聚合函数 文档。


Bloom Filter Aggregates

bloom_agg ( expression )

将所有输入的值添加到 Bloom Filter

bloom_agg ( expression , p , n )

将所有输入的值添加到Bloom filter中并且根据给定参数计算大小。p 是期待的假阳率,n 是期望添加的唯一元素数量。

bloom_union_agg ( bloom filter )

对所有输入的Bloom filters取并集。

bloom_intersection_agg ( bloom filter )

对所有输入的Bloom filters取交集。

其它与Bloom filters相关的函数可查看 Bloom Filter函数

Frequency Tracking Aggregates

freq_agg ( expression )

将所有输入的值添加到 Count-Min Sketch 中,实现实时高效计算所有输入的频数。

freq_agg ( expression, epsilon, p )

同上,可以给 cmsketch 指定 epsilonp 参数。 epsilon 决定 cmsketch 可接受的错误率,默认为 0.002 (0.2%)。p 代表可信度,默认为 0.995 (99.5%)。epsilonp 越小,cmsketch 的空间占用越小,反之亦然。

freq_merge_agg ( count-min sketch )

对所有输入的cms取并集。

其它与Count-Min sketches相关的函数可查看 Frequency函数

Top-K Aggregates

topk_agg ( expression , k )

根据输入的参数 k 对输入的 expressionTop-K,每添加一个元素,其对应的值会加1。

topk_agg (expression, k, weight )

同上,但会为 expression 赋予指定权重(而不是默认的1)。

topk_merge_agg ( topk )

将所有的 topk 合并到一个 topk 中。

查看 Top-K 函数 了解更多 topk 相关函数。

HyperLogLog Aggregates

hll_agg ( expression )

将所有输入的 expression 添加到 HyperLogLog 中。

hll_agg ( expression, p )

将所有输入的 expression 添加到 HyperLogLog 中,指定参数 pp 越大,HyperLogLog 的偏差越小,内存占用越大。

hll_union_agg ( hyperloglog )

对所有输入的 HyperLogLogs 取并集。

查看 HyperLogLog函数 了解更多 HyperLogLog 相关函数。

Distribution Aggregates

dist_agg ( expression )

将所有输入的值添加到 T-Digest 中来追溯数据分布。

dist_agg ( expression, compression )

同上,但使用给定的 compression 构建底层 tdigestcompression 必须是 [20, 1000] 中的整数。compression 越大,tdigest 空间占用越高,精确度也越高。

查看 Distribution函数 了解更多 tdigest 相关函数。

Miscellaneous Aggregates

bucket_agg ( expression , bucket_id )

根据 bucket_id 为每个输入的 expression 添加4字节的哈希值。在任意给定时间,每个哈希值可能只在一个桶中出现一次。因此,这些桶可以被认定为输入的 expressions 的排它散列集。

bucket_agg ( expression , bucket_id , timestamp )

同上,但允许通过 timestamp 表达式来决定桶的条目顺序。也就是说,只有一个值 最后的 条目才会使它切换到别的桶中。

查看 Miscellaneous函数 了解更多 bucket_agg 相关函数。

exact_count_distinct ( expression )

对给定的 expression 进行精确的去重计数。由于流视图中的 count distinct 内部使用HyperLogLog以提高效率,如果不能接受Hyperloglog产生的小概率误差,可以使用 exact_count_distinct

重要

exact_count_distinct 必须存储所有去重后的值以确定值的唯一性,所以不建议在基数很大的情况下使用。

first_values ( n ) WITHIN GROUP (ORDER BY sort_expression)

一个有序的聚合集合,按序保存前 n 个值。

注解

可见 PipelineDB特有函数,其中介绍了与Bloom filters, Count-min sketches, HyperLogLogs 以及 T-Digests相关的非聚合函数。您可以查看 概率数据结构和算法 了解其原理及使用方法。

keyed_max ( key, value )

返回与 “最高的” key 相关的 value

keyed_min ( key, value )

返回与 “最低的” key 相关的 value

set_agg ( expression )

将所有输入值加入到集合中。

查看 Miscellaneous函数 了解更多可用于操作集合的函数。


Combine

由于PipelineDB实现增量更新聚合后的值,所以它可以将当前数据之外的信息同已有的聚合相结合。比如,我们无法在不加权的情况下通过对各个子区间的平均值取平均值来算出总区间的平均值。

对于这类操作,PipelineDB提供 combine 聚合,具体功能如下:

combine ( aggregate column )

给定一个聚合列,将所有值结合到单个值中,如同所有独立的聚合任务的输入同时输入到了当前的聚合任务中。

注解

combine 只能作用与流视图的聚合列上。

下面是一些例子:

postgres=# CREATE VIEW v AS SELECT g::integer, AVG(x::integer) FROM stream GROUP BY g;
CREATE VIEW
postgres=# INSERT INTO stream (g, x) VALUES (0, 10), (0, 10), (0, 10), (0, 10), (0, 10);
INSERT 0 5
postgres=# INSERT INTO stream (g, x) VALUES (1, 20);
INSERT 0 1
postgres=# SELECT * FROM v;
 g |         avg
---+---------------------
 0 | 10.0000000000000000
 1 | 20.0000000000000000
(2 rows)

postgres=# SELECT avg(avg) FROM v;
         avg
---------------------
 15.0000000000000000
(1 row)

postgres=# -- 上面这种求平均值的方式没有考虑到:10写入了5次,而20只写入了1次。
postgres=#
postgres=# SELECT combine(avg) FROM v;
       combine
---------------------
 11.6666666666666667
(1 row)

postgres=# -- 如上所示,combine(avg)的计算方式是(10*5+20*1)/(5+1),而不仅仅是(10+20)/2。

General Aggregates

array_agg ( expression )

将包括null在内的所有值连接到数组中。

avg ( expression )

取平均值。

bit_and ( expression )

对所有非空输入执行 按位与 操作,无输入的情况下按照null处理。

bit_or ( expression )

对所有非空输入执行 按位或 操作,无输入的情况下按照null处理。

bool_and ( expression )

输入全部为true时才返回true,否则为false。

bool_or ( expression )

只要输入中有一个为true,就返回true。

count ( * )

计算行数。

count ( DISTINCT expression)

expression 去重后的行数。

注解

在连续不断的流上进行去重计数需要无限大的内存,所以流视图通过 HyperLogLog 以常数空间和时间的代价完成去重计数任务,代价就是会有小概率的误差。通常情况下,PipelineDB的 HyperLogLog 有大约0.81%的误差。比如,当去重后的实际数量为 1000 时,count distinct 的结果可能为 1008

count ( expression )

expression 的非空个数。

every ( expression )

bool_and

json_agg ( expression )

将结果聚合为JSON数组。

json_object_agg ( key, value )

将键值对聚合为JSON。

jsonb_agg ( expression )

将结果聚合为JSONB数组。

jsonb_object_agg ( key, value )

将键值对聚合为JSONB。

max ( expression )

expression 取最大值。

min ( expression )

expression 取最小值。

string_agg ( expression, delimiter )

expressiondelimiter 连接为字符串。

sum ( expression )

对所有输入的 expression 求和。

Statistical Aggregates

corr ( y, x )

相关系数。

covar_pop ( y, x )

总体协方差。

covar_samp ( y, x )

样本协方差。

regr_avgx ( y, x )

自变量平均值:(sum(x)/N)

regr_avgy ( y, x )

自变量平均值:(sum(y)/N)

regr_count ( y, x )

x,y都不为空的数量。

regr_intercept ( y, x )

基于(x,y),按照最小二乘法拟合得到的方程的y轴截距。

regr_r2 ( y, x )

相关系数的平方。

regr_slope ( y, x )

基于(x,y),按照最小二乘法拟合得到的方程的斜率。

regr_sxx ( y, x )

x的离差平方和,即:sum(X^2) - sum(X)^2/N

regr_sxy ( y, x )

x,y的离差积和,即:sum(X*Y) - sum(X) * sum(Y)/N

regr_syy ( y, x )

y的离差平方和,即:sum(Y^2) - sum(Y)^2/N

stddev ( expression )

样本标准差。

stddev_pop ( expression )

总体标准差。

variance ( expression )

样本方差,即样本标准差的平方。

var_pop ( expression )

整体方差,即整体标准差的平方。


Ordered-set Aggregates

ordered-set 聚合对输入进行排序并得到结果,所以用 WITHIN GROUP 语句。语法如下:

aggregate_name ( [ expression [ , ... ] ] ) WITHIN GROUP ( order_by_clause )

下面是一些例子:

计算99分位数:

SELECT percentile_cont(0.99) WITHIN GROUP (ORDER BY value) FROM some_table;

应用于流视图:

CREATE VIEW percentile AS
SELECT percentile_cont(0.99) WITHIN GROUP (ORDER BY value::float8)
FROM some_stream;

percentile_cont ( fraction )

流式百分位数:返回对应排序中特定分位的值。如有必要,可在相邻的输入项间进行插入值。

percentile_cont ( array of fractions )

多重流式百分位数:返回与分数参数形状匹配的结果数组,将每个非空元素替换为其对应的百分位数。

注解

在连续不断的流上计算百分位数需要无限大的内存,因此当流视图使用 percentile_cont 的两种形式时,都使用 T-Digest 来计算高精度的百分位数。通常,流视图中的百分位数越接近 [0, 1) 的上下界,结果就越精确。


Hypothetical-set Aggregates

hypothetical-set 聚合通过一个 表达式 在上下文中进行计算。比如,rank(2) 会计算 2rank,无论输入的是什么。

hypothetical-set聚合通过 WITHIN GROUP 语句来定义输入,语法如下:

aggregate_name ( [ expression [ , ... ] ] ) WITHIN GROUP ( order_by_clause )

下面是一些hypothetical-set聚合在流视图上应用的例子:

CREATE VIEW continuous_rank AS
SELECT rank(42) WITHIN GROUP (ORDER BY value::float8)
FROM some_stream;

流视图将不断更新 42rank

rank ( arguments )

行的rank,存在并列的情况。

dense_rank ( arguments )

行的rank,不存在并列的情况。

注解

在连续不断的流上计算 dense_rank 需要无限大的内存,所以流视图通过 HyperLogLog 以常数时间和空间实现 dense_rank,代价就是小概率的误差。通常PipelineDB 实现的 HyperLogLog 存在大约0.2%的误差。换言之,在流视图中,当真正的rank值为1000时,计算出的结果可能为998。

percent_rank ( arguments )

假定行的分位数,0到1。

cume_dist ( arguments )

假定行的相对分位数,1/N到1。


不支持的聚合

mode ( )

PipelineDB未来的版本将实现一种实时的模式估计算法,但目前还不支持。

percentile_disc ( arguments )

给定一个输入百分位数(比如0.99),percentile_disc 返回该百分位数内输入集中的第一个值。实际上这需要对输入集进行排序,这在连续不断的流中显然是不切实际的,甚至不允许我们使用 percentile_cont 中的那种高精度估计算法。

xmlagg ( xml )

:(

<aggregate_name> (DISTINCT expression)

在通常的聚合中,只有 count 聚合函数支持 DISTINCT。在未来的版本中,我们可能让 Bloom Filter 也支持 DISTINCT

注解

鉴于PipelineDB已被Confluent收购,所有原来规划的新功能只能指望社区来实现了。