应用集成¶
Apache Kafka¶
PipelineDB支持从Kafka的topic中提取数据到流中。相关的功能集成在 pipeline_kafka 插件中。pipeline_kafka 以 COPY 方式将Kafka中的数据写入到流中。
注解
pipeline_kafka 插件是官方支持的但没有打包到PipelineDB中,因此需要单独安装。可直接访问位于github中的 项目仓库,README.md
中说明了插件的安装方法。
pipeline_kafka 通过共享内存来同步后台worker间的状态,所以必须作为共享库被载入。您可以在 pipelinedb.conf
配置文件中将添加到配置中。如果已添加了其它共享库,以逗号分隔将其依次添加即可:
shared_preload_libraries = pipeline_kafka
在客户端中创建插件:
postgres=# CREATE EXTENSION pipeline_kafka;
CREATE EXTENSION
使用 pipeline_kafka 前必须配置好Kafka Broker。
pipeline_kafka.add_broker ( hostname text )
hostname 是格式为<host>[:<port>]
的字符。多个Broker可以使用 pipeline_kafka.add_broker 添加多次。
消费Kafka消息¶
pipeline_kafka.consume_begin ( topic text, stream text, format := ‘text’, delimiter := E’\t’, quote := NULL, escape := NULL, batchsize := 1000, maxbytes := 32000000, parallelism := 1, start_offset := NULL )
parallelism 是后台将Kafka Topic数据消费到流中的worker进程数。输出的流必须提前通过
CREATE STREAM
创建。指定Topic中所有分区的数据会分摊给每个woker进程进行处理。可选的 format, delimiter, escape 和 quote 参数类似于 COPY 指令中的
FORMAT
,DELIMITER
ESCAPE
和QUOTE
,除此之外,pipeline_kafka 还额外地支持 json 格式,插件会将Kafka消息转换为JSON对象。batchsize 控制了Kafka客户端的
batch_size
参数。客户端会在消费并缓存了batch_size
条消息后会以COPY
的形式写入到流中,一直循环此过程。maxbytes 控制了Kafka客户端的
fetch.message.max.bytes
参数。客户端会在消费并缓存了maxbytes
大小的消息后会以COPY
的形式写入到流中,一直循环此过程。start_offset 指定Kafka Topic分区消费的起点offset。
pipeline_kafka 会将Kafka消费的进度保存在数据库中。如果起始offset为 NULL
,客户端会从数据库记录的offset开始消费,若没有记录,则会以 offset='latest'
方式开始消费。start_offset为-1时,会以 offset='latest'
方式消费;start_offset为-2时,会以 offset='earliest'
方式开始消费。使用其它的offset是很荒诞的,因为这些offset与分区无关。
注解
译者注:如果想指定所有分区的起点offset,直接通过start_offset肯定是不行的,可以手动修改pipeline_kafka.offsets中分区的offset,然后以 start_offset := NULL
的方式启动消费。
pipeline_kafka.consume_begin ( )
启动当前已生成的所有消费者,而不是指定某个topic到stream的消费任务。
pipeline_kafka.consume_end ( topic text, stream text )
停止某个topic到stream的消费任务。
pipeline_kafka.consume_end ( )
停止当前所有的消费任务。
生产Kafka消息¶
0.9.1 新版功能.
pipeline_kafka.produce_message ( topic text, message bytea, partition := NULL, key := NULL )
生产单条 message 到指定 topic 中,partition 和 key 都是可选的。默认情况下,分区是未指定的,所以由Kafka分区函数决定消息写入哪个分区。如果您想将消息写入指定分区,指定将分区对应的integer
赋值给partition参数即可。key 是消息中的一个键,用于kafka分区函数计算消息应写入的分区。
pipeline_kafka.emit_tuple ( topic, partition, key )
这是一个可用于将元组以json格式写入kafka中的触发器函数。它只能作为AFTER INSERT OR UPDATE
和FOR EACH ROW
触发器使用。在UPDATE
情况下,新增的元组会被写入。topic必须指定,partition 和 key 是可选的。由于这是一个触发器函数,所有参数必须以字符串传递,并且不能指定keyword参数。如果您只想指定 topic 和 key 而不想指定分区,赋值'partition := -1'
即可。key 是元组中列名,用于消息写入到Topic时的分区计算。
Metadata¶
pipeline_kafka 使用多张表来维系Kafka客户端状态的记录,以支持系统重启后的断点再续:
pipeline_kafka.consumers
存储了每个 stream-topic 消费者的源信息,在 pipeline_kafka.consume_begin 时自动创建。
pipeline_kafka.brokers
存储了消费者可连接的所有Kafka Broker。
pipeline_kafka.offsets
存储Kafka Topic消费位移(offset),这样消费者就可以在上次消费中止的offset处重新开始消费。
注解
查看 SQL on Kafka 深入了解Kafka和PipelineDB的集成。
Amazon Kinesis¶
PipelineDB也支持集成Amazon Kinesis。这个功能由 pipeline_kinesis 插件提供。插件通过 AWS SDK 管理消费者,并将数据 COPY 到PipelineDB流中。
pipeline_kinesis 的 github仓库 中的 README.md
文件中包含了插件安装说明。
使用插件前必须在客户端显式载入:
postgres=# CREATE EXTENSION pipeline_kinesis;
CREATE EXTENSION
您必须先为管道配置好kinesis数据的endpoint:
pipeline_kinesis.add_endpoint( name text, region text, credfile text := NULL, url text := NULL )
name 是endpoint的唯一标识符。region 是AWS地区的字符形式的标识符,比如
us-east-1
和us-west-2
。credfile 是一个可选参数,它可以覆盖AWS证书的默认文件路径。
url 是一个可选参数,它可以指定其它的(非AWS的)kinesis服务。这在测试如 kinesalite 之类的本地kinesis服务时是非常有用的。
消费kinesis¶
pipeline_kinesis.consume_begin ( endpoint text, stream text, relation text, format text := ‘text’, delimiter text := E’\t’, quote text := NULL, escape text := NULL, batchsize int := 1000, parallelism int := 1, start_offset int := NULL )
在 endpoint 生成一个kinesis逻辑消费者组来消费kinesis 流数据,并将数据 COPY 到PipelineDB流中。
parallelism 用于指定每个消费者的后台worker进程数从而实现载入均衡。⚠️由于插件内部是串行的,所以不需要指定分片的数量。默认值1已经够用了,除非消费进度开始落后。
format, delimiter, escape 和 quote 是可选参数,用于控制 COPY 到PipelineDB中数据的格式。
batchsize 会传入 AWS SDK 并控制 Kinesis GetRecords 中的
Limit
参数。start_offset 是插件用于记录流消费起点的参数。-1表示从尾开始读,-2表示从头开始读。对应
TRIM_HORIZON
和LATEST
,详细信息见 Kinesis GetShardIterator。
pipeline_kinesis.consume_end (endpoint text, stream text, relation text)
停止指定消费者的所有worker进程。
pipeline_kinesis.consume_begin()
启动所有已创建的消费者。
pipeline_kinesis.consume_end()
停止所有消费者的worker进程。
Metadata¶
pipeline_kinesis 使用多张表记录消费信息:
pipeline_kinesis.endpoints
存储每个所有通过 kinesis_add_endpoint 创建的endpoint。
pipeline_kinsesis.consumers
存储每个通过 kinesis_consume_begin 创建的消费者的源信息。
pipeline_kinsesis.seqnums
存储每个消费者的切片源数据,也就是血清(seqnums)。