1 使用GeminiDB for Cassandra流捕获表活动
1.1 功能介绍
当存储在GeminiDB for Cassandra集群中某张表的某项目发生变更时,其他的程序能够做出相应的变更,比如:一个应用程序更改了GeminiDB for Cassandra集群中的某行数据,另一个应用程序能够读取到这行数据变更并做出相应的动作。
GeminiDB for Cassandra支持这种类似的场景。流表捕获原表的变动,并存储24小时之后过期。客户端通过SDK可访问流表并获取数据修改前后的项目数据。
GeminiDB for Cassandra流是一种有关表中的项目修改的有序信息流,当启动某张表的流时,GeminiDB for Cassandra流表捕获原表的数据项目的更改信息。当应用程序在表中插入、更新、或者删除某条数据时,流表都会记录一条修改数据的流记录。流记录包含对表中某条数据所做的数据修改的相关信息,包含修改前后的新旧数据记录。原表中修改的每个项目,流表中的记录将按照对应的实际修改的顺序显示。
流表会实时的监控原表的记录,包括插入、删除、更新操作,以便能够在其他情况使用,但不包含DDL操作记录;流表使用GeminiDB for Cassandra的物化视图实现,遵循物化视图的相关限制,比如有必须先删除物化视图表之后才能删除原表,对应必须要先删除流表才能删除原表。
1.2 使用场景
主要用于Cassandra往大数据/ES(Elasticsearch)同步数据变化的场景,支撑客户对应业务开展。
1.3 功能特色
华为GeminiDB for Cassandra专有能力,原生Cassandra不支持。
2 GeminiDB for Cassandra流使用方法概述
GeminiDB for Cassandra原表与流表维护两个独立的表。在开启原表的流开关之后,访问原表并且有操作原表数据会记录到对应的流表中,要读取处理流表记录,通过访问流表,访问方式与数据库其他表的访问方式相同,见第四、五章节访问方法。
3 开启关闭流
启用流的方式:使用alter table KS.TABLE with stream_enabled=true 语句启用流,当前流支持新旧映像。
关闭流:使用alter table KS.TABLE with stream_enabled=false 可以随时禁用流。关闭流之后当前流表中的数据不会立即删除,24之后删除,原表中数据的变更不会再记录到流表中。举例:
CREATE TABLE ks.table ( id int, name text, addr text,age int,email text,PRIMARY KEY (name, id)); // 创建表
Alter TABLE ks.table with stream_enabled=true; // 开启流
Desc ks.table; // 查看流表是否创建
INSERT INTO ks.table (name , id , addr , age , email ) VALUES ('xiaoxin',31,'beijing1',33,'xiaoxin@163.com'); // 向原表写入数据
select * from ks."table$streaming"; // 查看流表是否有相应数据产生
Alter TABLE ks.table with stream_enabled=false; // 关闭流表
4 读取和处理流记录
应用程序要读取和处理流时,应用程序需要通过SDK连接到C*流表进行相应操作。
流表中的每条流记录均代表一个原表中数据的修改,每条流记录都会有一个时间信息,标识这条流产生的时间信息。每条流记录会在24小时后自动删除。
流表结构:
CREATE TABLE ks.table$streaming (
@shardID text,
@eventID timeuuid,
pk,
ck,
@newOldImage boolean,
@eventName text,
co1,
co2,
PRIMARY KEY (@shardID, @eventID, pk, ck, @newOldImage) //pk,ck,为原表的pk,ck, co1,co2是原表的普通列
);
如上,流表中包含几个特殊的字段:"@shardID"是分区键;"@eventID"是由插入时间生成的timeuuid,代表流数据产生的时间;"@newOldImage"代表新旧映像,0表示旧映像,1表示新映像;"@eventName"代表操作事件如"insert"、"update"、"delete"。
迭代处理流表的数据时请使用流表的分区加上时间戳范围访问。查询分区时使用,返回分区列表:
select stream_shards from system_schema.tables where keyspace_name='ks' and table_name='table';
例如:
cqlsh:ks> select stream_shards from system_schema.tables where keyspace_name='ks' and table_name='table1';
stream_shards
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
['-9223372036854775808', '-6148914691236517206', '-4611686018427387905', '-3843071682022823258', '-1537228672809129303', '-2', '1537228672809129299', '4611686018427387901', '5270498306774157603', '6148914691236517202', '7686143364045646492', '7686143364045646503']
(1 rows)
使用分区+时间遍历流表数据,范围查询时使用"@eventID"时间查询,每次默认返回的大小根据数据量大小决定,下次迭代使用时间继续往后迭代。例如:
select * From ks."table$streaming" where "@shardID" = '-9223372036854775808' and "@eventID" > a64a8340-e999-11e9-a7bd-cb5f001f61df limit 50;
5 接口说明
5.1 GetShardIterator
public static List<String> GetShardIterator(Cluster cluster, String keySpace, String tableName)
功能: 获取流表的shard信息
入参:
Cluster cluster:集群的cluster信息,连接数据库使用
String keySpace:要查询流数据的数据库名称
String tableName:要查询流数据的表名称
出参:
返回List<String> 一组shard集合,GetRecords接口中使用
5.2 GetRecords
public static StreamInfo GetRecords(Cluster cluster, String keySpace, TableEvent tableEvent)
功能: 获取流表的具体数据
入参:
Cluster cluster:集群的cluster信息,连接数据库使用
String keySpace:要查询流数据的数据库名称
TableEvent tableEvent:要查询流数据相关信息,TableEvent结构如下:
String table:要查询流数据的表名称
String shardID:要查询流数据的shardID
String eventID:要查询流数据的时间戳信息
int limitRow:要查询流数据的限制条数,没有指定情况下默认是100;
出参:
返回一组StreamInfo数据;具体结构如下:
String shardID:流数据的shardID
String table:流数据的原表名
List<RowInfo> columns:一组流数据的集合,RowInfo具体结构如下:
String eventID:流数据的时间戳信息
String operateType:操作类型,例如: INSERT、UPDATE、DELETE
List<DataItem> Keys: 流数据对应的原表的主键信息
List<DataItem> NewImage: 新映像的信息
List<DataItem> OldImage: 旧映像的信息
5.3 GetShardIterator
public static List<String> GetShardIterator(Session session, String keySpace, String tableName)
功能: 获取流表的shard信息
入参:
Session session:数据库集群的连接session,调用函数之后session需要调用者关闭
String keySpace:要查询流数据的数据库名称
String tableName:要查询流数据的表名称
出参:
返回List<String> 一组shard集合,GetRecords接口中使用
5.4 GetRecords
public static StreamInfo GetRecords(Session session, String keySpace, TableEvent tableEvent)
功能: 获取流表的具体数据
入参:
Session session:数据库集群的连接session,调用函数之后session需要调用者关闭;
String keySpace:要查询流数据的数据库名称;
TableEvent tableEvent:要查询流数据相关信息,TableEvent结构如下:
String table:要查询流数据的表名称;
String shardID:要查询流数据的shardID;
String eventID:要查询流数据的时间戳信息;
int limitRow:要查询流数据的限制条数,没有指定情况下默认是100;
List<ColumnMetadata> primaryKey: 要查询流数据的表的主键名称类型信息
出参:
返回一组StreamInfo数据;具体结构如下:
String shardID:流数据的shardID
String table:流数据的原表名
List<RowInfo> columns:一组流数据的集合,RowInfo具体结构如下:
String eventID:流数据的时间戳信息
String operateType:操作类型,例如: INSERT、UPDATE、DELETE
List<DataItem> Keys: 流数据对应的原表的主键信息
List<DataItem> NewImage: 新映像的信息
List<DataItem> OldImage: 旧映像的信息
5.5 GetRecords返回结果范例
{
"ShardID": "-4611686018427387905",
"Table": "tb1",
"Records": [{
"EventID": "52236080-efb5-11e9-9c62-49626763b3dc",
"OperateType": "INSERT",
"Keys": [{
"columnName": "name",
"value": "zhoujielun",
"type": "varchar"
}, {
"columnName": "id",
"value": 31,
"type": "int"
}],
"NewImage": [{
"columnName": "name",
"value": "zhoujielun",
"type": "varchar"
}, {
"columnName": "id",
"value": 31,
"type": "int"
}, {
"columnName": "addr",
"value": "宇宙中心",
"type": "varchar"
}, {
"columnName": "age",
"value": 33,
"type": "int"
}, {
"columnName": "email",
"value": "zhoujielun.com",
"type": "varchar"
}],
"OldImage": []
}, {
"EventID": "52255c50-efb5-11e9-9c62-49626763b3dc",
"OperateType": "UPDATE",
"Keys": [{
"columnName": "name",
"value": "zhoujielun",
"type": "varchar"
}, {
"columnName": "id",
"value": 32,
"type": "int"
}],
"NewImage": [{
"columnName": "name",
"value": "zhoujielun",
"type": "varchar"
}, {
"columnName": "id",
"value": 32,
"type": "int"
}, {
"columnName": "addr",
"value": "宇宙中心",
"type": "varchar"
}, {
"columnName": "age",
"value": 33,
"type": "int"
}, {
"columnName": "email",
"value": "zhoujielun.com",
"type": "varchar"
}],
"OldImage": [{
"columnName": "name",
"value": "zhoujielun",
"type": "varchar"
}, {
"columnName": "id",
"value": 32,
"type": "int"
}, {
"columnName": "addr",
"value": "宇宙中心",
"type": "varchar"
}, {
"columnName": "age",
"value": 33,
"type": "int"
}, {
"columnName": "email",
"value": "zhoujielun.com",
"type": "varchar"
}]
}, {
"EventID": "52261fa0-efb5-11e9-9c62-49626763b3dc",
"OperateType": "UPDATE",
"Keys": [{
"columnName": "name",
"value": "zhoujielun",
"type": "varchar"
}, {
"columnName": "id",
"value": 33,
"type": "int"
}],
"NewImage": [{
"columnName": "name",
"value": "zhoujielun",
"type": "varchar"
}, {
"columnName": "id",
"value": 33,
"type": "int"
}, {
"columnName": "addr",
"value": "宇宙中心",
"type": "varchar"
}, {
"columnName": "age",
"value": 33,
"type": "int"
}, {
"columnName": "email",
"value": "zhoujielun.com",
"type": "varchar"
}],
"OldImage": [{
"columnName": "name",
"value": "zhoujielun",
"type": "varchar"
}, {
"columnName": "id",
"value": 33,
"type": "int"
}, {
"columnName": "addr",
"value": "宇宙中心",
"type": "varchar"
}, {
"columnName": "age",
"value": 33,
"type": "int"
}, {
"columnName": "email",
"value": "zhoujielun.com",
"type": "varchar"
}]
}]
}
5.6 接口使用demo1
package com.huawei.hwcloud.stream;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnMetadata;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.huawei.hwcloud.stream.req.RowInfo;
import com.huawei.hwcloud.stream.req.StreamInfo;
import com.huawei.hwcloud.stream.req.TableEvent;
import java.util.List;
public class Main {
public static void main(String[] args) {
Cluster cluster = Cluster.builder().addContactPoint("xxx.95.xxx.201").withPort(9042).build();
// Cluster cluster = Cluster.builder().addContactPoint(endpoint).withPort(port).withCredentials(username, password).build();
List<ColumnMetadata> pm = cluster.getMetadata().getKeyspace("test").getTable("tb1").getPrimaryKey();
System.out.println(pm);
List streamShards = null;
try {
streamShards = StreamFetcher.GetShardIterator(cluster, "test", "tb1");
}
catch (Exception e) {
e.printStackTrace();
}
System.out.println(streamShards);
TableEvent tableEvent = new TableEvent();
tableEvent.setEventID("43e0eeb0-ee80-11e9-9c62-49626763b3dc");
tableEvent.setShardID("-4611686018427387905");
tableEvent.setTable("tb1");
tableEvent.setLimitRow(6);
StreamInfo streamInfo = null;
try {
streamInfo = StreamFetcher.GetRecords(cluster, "test", tableEvent);
}
catch (Exception e) {
e.printStackTrace();
}
Gson gson = new GsonBuilder().create();
String line = gson.toJson(streamInfo);
System.out.println(line);
System.out.println(streamInfo.getColumns().size());
for (RowInfo rowInfo: streamInfo.getColumns()) {
System.out.println(rowInfo.toString());
}
System.exit(0);
}
}
5.7 接口使用demo2
package com.huawei.hwcloud.stream;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.Session;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.huawei.hwcloud.stream.req.RowInfo;
import com.huawei.hwcloud.stream.req.StreamInfo;
import com.huawei.hwcloud.stream.req.TableEvent;
import com.huawei.hwcloud.stream.utils.WrapperCassandraSession;
import java.util.List;
public class Main2
{
public static void main(String[] args) {
Cluster cluster = Cluster.builder().addContactPoint("XXX.95.XXX.201").withPort(9042).build();
// Cluster cluster = Cluster.builder().addContactPoint(endpoint).withPort(port).withCredentials(username, password).build();
List<ColumnMetadata> pk = cluster.getMetadata().getKeyspace("test").getTable("tb1").getPrimaryKey();
System.out.println(pk);
Session session = cluster.connect();
List<String> streamShards = StreamFetcher.GetShardIterator(session, "test", "tb1");
System.out.println(streamShards);
TableEvent tableEvent = new TableEvent();
tableEvent.setEventID("43e0eeb0-ee80-11e9-9c62-49626763b3dc");
tableEvent.setShardID("-4611686018427387905");
tableEvent.setTable("tb1");
tableEvent.setLimitRow(6);
tableEvent.setPrimaryKey(pk);
StreamInfo streamInfo = StreamFetcher.GetRecords(session, "test", tableEvent);
Gson gson = new GsonBuilder().create();
String line = gson.toJson(streamInfo);
System.out.println(line);
System.out.println(streamInfo.getColumns().size());
for (RowInfo rowInfo: streamInfo.getColumns()) {
System.out.println(rowInfo.toString());
}
session.close();
System.exit(0);
}
}
6 功能约束
1)流表中的数据保留24小时。
2)流表中的数据会占用数据库的磁盘空间。
3)通过CQL语句不能创建带有"$streaming"后缀的流表。
4)流表可以通过drop MATERIALIZED VIEW ks."table$streaming";进行删除,流表使用物化视图实现,遵从物化视图的限制要求。
GeminiDB for Cassandra流功能介绍.docx
————————————————
版权声明:本文为CSDN博主「华为云开发者社区」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/devcloud/article/details/105642189
【华为云技术分享】GeminiDB for Cassandra 流功能介绍
分享好友
分享这个小栈给你的朋友们,一起进步吧。
订阅须知
• 所有用户可根据关注领域订阅专区或所有专区
• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询
• 专区发布评论属默认订阅所评论专区(除付费小栈外)
技术专家
查看更多- gaokeke123专家