continuous view 是 pipelinedb的核心,类似一个view,但是数据是合并了stream以及table的数据输入数据,并且是
实时根据输入数据进行更新的
语法
CREATE CONTINUOUS VIEW name AS query
query是一个pg 的select 格式的语法,格式如下:
SELECT [ DISTINCT [ ON ( expression [, ...] ) ] ]
expression [ [ AS ] output_name ] [, ...]
[ FROM from_item [, ...] ]
[ WHERE condition ]
[ GROUP BY expression [, ...] ]
[ WINDOW window_name AS ( window_definition ) [, ...] ]
where from_item can be one of:
stream_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
table_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
from_item [ NATURAL ] join_type from_item [ ON join_condition ]
环境准备
项目使用docker运行同时结合hasura graphql 引擎
- docker-compose
version: '3.6'
services:
postgres:
image: pipelinedb/pipelinedb
ports:
- "5432:5432"
graphql-engine:
image: hasura/graphql-engine:v1.0.0-alpha06
ports:
- "8080:8080"
depends_on:
- "postgres"
command: >
/bin/sh -c "
graphql-engine --database-url postgres://pipeline:pipeline@postgres:5432/pipeline serve --enable-console;
"
数据源从基本数据tabley以及stream 获取(比较综合的例子)
- 创建基本表
CREATE TABLE userlogin (
id SERIAL PRIMARY KEY,
username text NOT NULL,
userid integer NOT NULL,
usertype text NOT NULL,
logintype text NOT NULL
);
```
* 创建stream:
```code
CREATE STREAM loginlogs (logintype text, userid integer);
- 创建continuous view
CREATE CONTINUOUS VIEW userloginview AS
select a.logintype,b.username,b.userid, b.logintype as logintype_ from loginlogs a join userlogin b
on a.userid=b.userid
- 插入数据&&查询
insert into loginlogs(logintype,userid) values ('mobile',333),('pc',333),('web',333)
select * from userloginview
- graphql 集成
- graphql 查询
- 说明
实际使用中我们的view一般都是一个聚合函数的操作,比如统计状态,异常信息排查,同时view 支持ttl 可以支持有效期控制
官方提供的一个比较有意思的demo
- 延迟百分比 90 95 99 延迟占比
CREATE CONTINUOUS VIEW latency AS
SELECT percentile_cont(array[90, 95, 99]) WITHIN GROUP (ORDER BY latency)
FROM latency_stream;
- 新5分钟广告的曝光
CREATE CONTINUOUS VIEW imps AS
SELECT COUNT(*) FROM imps_stream
WHERE (arrival_timestamp > clock_timestamp() - interval '5 minutes');