客户端

由于PipelineDB在PostgreSQL 10.1+和11.0+中是以插件方式运行的,它没有自己的客户端,直接使用PostgreSQL客户端即可。

下面将给您展示一些不同语言及客户端在PipelineDB中创建流视图的示例。

CREATE VIEW continuous view WITH (action=materialize) AS
      SELECT x::integer, COUNT(*) FROM stream GROUP BY x;

下面的程序向流中插入包含10个分组的10万条记录,并将结果输出:

Python

For this example in Python, you’ll need to have psycopg2 installed.

import psycopg2

conn = psycopg2.connect('dbname=test user=user host=localhost port=5432')
pipeline = conn.cursor()

create_stream = """
CREATE FOREIGN TABLE stream (x integer) SERVER pipelinedb
"""
pipeline.execute(create_stream)

create_cv = """
CREATE VIEW continuous_view WITH (action=materialize) AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x
"""
pipeline.execute(create_cv)
conn.commit()

rows = []

for n in range(100000):
    # 10 unique groupings
    x = n % 10
    rows.append({'x': x})

# Now write the rows to the stream
pipeline.executemany('INSERT INTO stream (x) VALUES (%(x)s)', rows)

# Now read the results
pipeline.execute('SELECT * FROM continuous_view')
rows = pipeline.fetchall()

for row in rows:
    x, count = row

    print x, count

pipeline.execute('DROP VIEW continuous_view')
pipeline.close()

Ruby

下面基于Ruby的例子使用 pg gem。

require 'pg'
pipeline = PGconn.connect("dbname='test' user='user' host='localhost' port=5432")

# This continuous view will perform 3 aggregations on page view traffic, grouped by url:
#
# total_count - count the number of total page views for each url
# uniques     - count the number of unique users for each url
# p99_latency - determine the 99th-percentile latency for each url

s = "
CREATE FOREIGN TABLE page_views (
        url text,
        cookie text,
        latency integer
) SERVER pipelinedb"
pipeline.exec(s)

q = "
CREATE VIEW v WITH (action=materialize) AS
SELECT
  url,
  count(*) AS total_count,
  count(DISTINCT cookie) AS uniques,
  percentile_cont(0.99) WITHIN GROUP (ORDER BY latency) AS p99_latency
FROM page_views GROUP BY url"

pipeline.exec(q)

for n in 1..10000 do
  # 10 unique urls
  url = '/some/url/%d' % (n % 10)

  # 1000 unique cookies
  cookie = '%032d' % (n % 1000)

  # latency uniformly distributed between 1 and 100
  latency = rand(101)

  # NOTE: it would be much faster to batch these into a single INSERT
  # statement, but for simplicity's sake let's do one at a time
  pipeline.exec(
  "INSERT INTO page_views (url, cookie, latency) VALUES ('%s', '%s', %d)"
        % [url, cookie, latency])
end

# The output of a continuous view can be queried like any other table or view
rows = pipeline.exec('SELECT * FROM v ORDER BY url')

rows.each do |row|
  puts row
end

# Clean up
pipeline.exec('DROP VIEW v')

Java

下面的例子需要先在 CLASSPATH 中安装 JDBC

import java.util.Properties;
import java.sql.*;

public class Example {

  static final String HOST = "localhost";
  static final String DATABASE = "test";
  static final String USER = "user";

  public static void main(String[] args) throws SQLException {

    // Connect to "test" database on port 5432
    String url = "jdbc:postgresql://" + HOST + ":5432/" + DATABASE;
    ResultSet  rs;
    Properties props = new Properties();

    props.setProperty("user", USER);
    Connection conn = DriverManager.getConnection(url, props);

    Statement stmt = conn.createStatement();
    stmt.executeUpdate(
      "CREATE FOREIGN TABLE stream (x integer) SERVER pipelinedb");
    stmt.executeUpdate(
      "CREATE VIEW v WITH (action=materialize) AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x");

    for (int i=0; i<100000; i++)
    {
      // 10 unique groupings
      int x = i % 10;

      // INSERT INTO stream (x) VALUES (x)
      stmt.addBatch("INSERT INTO stream (x) VALUES (" + Integer.toString(x) + ")");
    }

    stmt.executeBatch();

    rs = stmt.executeQuery("SELECT * FROM v");
    while (rs.next())
    {
      int id = rs.getInt("x");
      int count = rs.getInt("count");

      System.out.println(id + " = " + count);
    }

    // Clean up
    stmt.executeUpdate("DROP VIEW v");
    conn.close();
  }
}