应用集成

Apache Kafka

PipelineDB支持从Kafka的topic中提取数据到流中。相关的功能集成在 pipeline_kafka 插件中。pipeline_kafkaCOPY 方式将Kafka中的数据写入到流中。

注解

pipeline_kafka 插件是官方支持的但没有打包到PipelineDB中,因此需要单独安装。可直接访问位于github中的 项目仓库README.md 中说明了插件的安装方法。

pipeline_kafka 通过共享内存来同步后台worker间的状态,所以必须作为共享库被载入。您可以在 pipelinedb.conf 配置文件中将添加到配置中。如果已添加了其它共享库,以逗号分隔将其依次添加即可:

shared_preload_libraries = pipeline_kafka

在客户端中创建插件:

postgres=# CREATE EXTENSION pipeline_kafka;
CREATE EXTENSION

使用 pipeline_kafka 前必须配置好Kafka Broker。

pipeline_kafka.add_broker ( hostname text )

hostname 是格式为 <host>[:<port>] 的字符。多个Broker可以使用 pipeline_kafka.add_broker 添加多次。

消费Kafka消息

pipeline_kafka.consume_begin ( topic text, stream text, format := ‘text’, delimiter := E’\t’, quote := NULL, escape := NULL, batchsize := 1000, maxbytes := 32000000, parallelism := 1, start_offset := NULL )

parallelism 是后台将Kafka Topic数据消费到流中的worker进程数。输出的流必须提前通过 CREATE STREAM 创建。指定Topic中所有分区的数据会分摊给每个woker进程进行处理。

可选的 format, delimiter, escapequote 参数类似于 COPY 指令中的 FORMAT, DELIMITER ESCAPEQUOTE,除此之外,pipeline_kafka 还额外地支持 json 格式,插件会将Kafka消息转换为JSON对象。

batchsize 控制了Kafka客户端的 batch_size 参数。客户端会在消费并缓存了 batch_size 条消息后会以 COPY 的形式写入到流中,一直循环此过程。

maxbytes 控制了Kafka客户端的 fetch.message.max.bytes 参数。客户端会在消费并缓存了 maxbytes 大小的消息后会以 COPY 的形式写入到流中,一直循环此过程。

start_offset 指定Kafka Topic分区消费的起点offset。

pipeline_kafka 会将Kafka消费的进度保存在数据库中。如果起始offset为 NULL,客户端会从数据库记录的offset开始消费,若没有记录,则会以 offset='latest' 方式开始消费。start_offset为-1时,会以 offset='latest' 方式消费;start_offset为-2时,会以 offset='earliest' 方式开始消费。使用其它的offset是很荒诞的,因为这些offset与分区无关。

注解

译者注:如果想指定所有分区的起点offset,直接通过start_offset肯定是不行的,可以手动修改pipeline_kafka.offsets中分区的offset,然后以 start_offset := NULL 的方式启动消费。

pipeline_kafka.consume_begin ( )

启动当前已生成的所有消费者,而不是指定某个topic到stream的消费任务。

pipeline_kafka.consume_end ( topic text, stream text )

停止某个topic到stream的消费任务。

pipeline_kafka.consume_end ( )

停止当前所有的消费任务。

生产Kafka消息

0.9.1 新版功能.

pipeline_kafka.produce_message ( topic text, message bytea, partition := NULL, key := NULL )

生产单条 message 到指定 topic 中,partitionkey 都是可选的。默认情况下,分区是未指定的,所以由Kafka分区函数决定消息写入哪个分区。如果您想将消息写入指定分区,指定将分区对应的 integer 赋值给partition参数即可。key 是消息中的一个键,用于kafka分区函数计算消息应写入的分区。

pipeline_kafka.emit_tuple ( topic, partition, key )

这是一个可用于将元组以json格式写入kafka中的触发器函数。它只能作为 AFTER INSERT OR UPDATEFOR EACH ROW 触发器使用。在 UPDATE 情况下,新增的元组会被写入。topic必须指定,partitionkey 是可选的。由于这是一个触发器函数,所有参数必须以字符串传递,并且不能指定keyword参数。如果您只想指定 topickey 而不想指定分区,赋值 'partition := -1' 即可。key 是元组中列名,用于消息写入到Topic时的分区计算。

Metadata

pipeline_kafka 使用多张表来维系Kafka客户端状态的记录,以支持系统重启后的断点再续:

pipeline_kafka.consumers

存储了每个 stream-topic 消费者的源信息,在 pipeline_kafka.consume_begin 时自动创建。

pipeline_kafka.brokers

存储了消费者可连接的所有Kafka Broker。

pipeline_kafka.offsets

存储Kafka Topic消费位移(offset),这样消费者就可以在上次消费中止的offset处重新开始消费。


注解

查看 SQL on Kafka 深入了解Kafka和PipelineDB的集成。

Amazon Kinesis

PipelineDB也支持集成Amazon Kinesis。这个功能由 pipeline_kinesis 插件提供。插件通过 AWS SDK 管理消费者,并将数据 COPY 到PipelineDB流中。

pipeline_kinesisgithub仓库 中的 README.md 文件中包含了插件安装说明。

使用插件前必须在客户端显式载入:

postgres=# CREATE EXTENSION pipeline_kinesis;
CREATE EXTENSION

您必须先为管道配置好kinesis数据的endpoint:

pipeline_kinesis.add_endpoint( name text, region text, credfile text := NULL, url text := NULL )

name 是endpoint的唯一标识符。region 是AWS地区的字符形式的标识符,比如 us-east-1us-west-2

credfile 是一个可选参数,它可以覆盖AWS证书的默认文件路径。

url 是一个可选参数,它可以指定其它的(非AWS的)kinesis服务。这在测试如 kinesalite 之类的本地kinesis服务时是非常有用的。

消费kinesis

pipeline_kinesis.consume_begin ( endpoint text, stream text, relation text, format text := ‘text’, delimiter text := E’\t’, quote text := NULL, escape text := NULL, batchsize int := 1000, parallelism int := 1, start_offset int := NULL )

endpoint 生成一个kinesis逻辑消费者组来消费kinesis 流数据,并将数据 COPY 到PipelineDB流中。

parallelism 用于指定每个消费者的后台worker进程数从而实现载入均衡。⚠️由于插件内部是串行的,所以不需要指定分片的数量。默认值1已经够用了,除非消费进度开始落后。

format, delimiter, escapequote 是可选参数,用于控制 COPY 到PipelineDB中数据的格式。

batchsize 会传入 AWS SDK 并控制 Kinesis GetRecords 中的 Limit 参数。

start_offset 是插件用于记录流消费起点的参数。-1表示从尾开始读,-2表示从头开始读。对应 TRIM_HORIZONLATEST,详细信息见 Kinesis GetShardIterator

pipeline_kinesis.consume_end (endpoint text, stream text, relation text)

停止指定消费者的所有worker进程。

pipeline_kinesis.consume_begin()

启动所有已创建的消费者。

pipeline_kinesis.consume_end()

停止所有消费者的worker进程。

Metadata

pipeline_kinesis 使用多张表记录消费信息:

pipeline_kinesis.endpoints

存储每个所有通过 kinesis_add_endpoint 创建的endpoint。

pipeline_kinsesis.consumers

存储每个通过 kinesis_consume_begin 创建的消费者的源信息。

pipeline_kinsesis.seqnums

存储每个消费者的切片源数据,也就是血清(seqnums)。