上一篇为大家整体介绍了 Azure Synapse Analysis 和 Synapse Link,本篇开始为大家介绍前面介绍的 DIY 方案中的部分,Cosmos DB Change Feed。我们来回顾一下整个架构:
对 OLTP 中的数据向下游数据服务进行同步时通常有两种做法,全量同步和增量同步,Cosmos DB 作为 OLTP 数据源,其内置的 ChangeFeed 功能可以帮助用户捕捉数据库中的增量变化数据,从而提升同步效率。ChangeFeed 通过将增量数据通过 SDK 暴露访问节点,用户可以通过 SDK 获取增量变化数据,同时其支持断点(CheckPoint)用户可以自由选择获取增量数据的起始位置。Cosmos Database 支持多种数据库引擎 API 接口,如 SQL,Mongo,Cassandra 等。过去一年的时间 Cosmos DB 对 ChangeFeed 的支持范围进行了更新,从 Day1 的只支持 SQL API 和 .NET SDK 到现在支持多种数据库引擎 API 以及多种开发语言的支持,为开发人员提供了一致的体验使产品获得更好的兼容性。
本文以快速入手为目标,我们选择了 SQL API + Python 的方式为大家进行演示。在整个演示过程中我们先来设计一下原始数据表格的 Schema,我们创建一个商品价目表,结构如下,其中包含商品 ID,QUANTITY 数量, PRICE 价格。
{ 'id': <string>, 'quantity': <int>, 'price': <int> }
下面我们开始准备 Cosmos 环境:
1. 创建 Cosmos DB
参考:https://docs.microsoft.com/en-us/azure/cosmos-db/create-cosmosdb-resources-portal ,大家需要注意在创建时候选择 API 为 Core(SQL)。
2. 开启 Synapse Link 后续使用
参考:https://docs.microsoft.com/en-us/azure/cosmos-db/configure-synapse-link
3. 记录 Cosmos DB 访问密钥
参考:https://docs.microsoft.com/en-us/azure/cosmos-db/secure-access-to-data,按照文档获取 Master Key 并替换后续代码中 Config.py 中的 master_key 参数。
4. 准备 Config.py 配置文件,其中 host 在 Azure Portal 中选中你所创建的 Cosmos DB,在 Overview 页面 URI 中获取,master_key 替换为为步骤 3 记录的密钥,database_id 和 container_id 可以自行设定,比如可以命名为 demo。
import os settings = { 'host': os.environ.get('ACCOUNT_HOST', '***'), 'master_key': os.environ.get('ACCOUNT_KEY', '***'), 'database_id': os.environ.get('COSMOS_DATABASE', '***'), 'container_id': os.environ.get('COSMOS_CONTAINER', '***'), }
5. 创建 Database 和 Container
import azure.cosmos.documents as documents import azure.cosmos.cosmos_client as cosmos_client import azure.cosmos.exceptions as exceptions import azure.cosmos.partition_key as partition_key import datetime import uuid import config HOST = config.settings['host'] MASTER_KEY = config.settings['master_key'] DATABASE_ID = config.settings['database_id'] CONTAINER_ID = config.settings['container_demo_id'] # Create Cosmos Client client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY}) # setup database for this sample try: db = client.create_database(id=DATABASE_ID) except exceptions.CosmosResourceExistsError: raise RuntimeError("Database with id '{}' already exists".format(DATABASE_ID)) # setup container for this sample try: container = db.create_container( id=CONTAINER_ID, partition_key=partition_key.PartitionKey(path='/id', kind=documents.PartitionKind.Hash) ) print('Container with id \'{0}\' created'.format(CONTAINER_ID)) except exceptions.CosmosResourceExistsError: raise RuntimeError("Container with id '{}' already exists".format(CONTAINER_ID))
6. 生成仿真演示数据,演示中插入了 10 条演示数据
def create_items(container, size): print('Creating Items') for i in range(1, size): c = str(uuid.uuid4()) item_definition = {'id': c, 'quantity': 100, 'price': 20 } created_item = container.create_item(body=item_definition) # Create Cosmos Client client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY}) # create items container = client.get_database_client(DATABASE_ID).get_container_client(CONTAINER_ID) # insert on item create_items(container, 11)
7. 通过 ChangeFeed SDK 读取增量变更数据,在 query_items_change_feed 函数中带入了参数 is_start_from_beginning 表示从头开始读取增量变化数据
# Create Cosmos Client client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY}) # create items container = client.get_database_client(DATABASE_ID).get_container_client(CONTAINER_ID) # read changefeed response = container.query_items_change_feed(is_start_from_beginning=True) for doc in response: print(doc)
8. 通过断点读取 ChangeFeed 增量变更数据, 通过获取签署 ChangeFeed 返回中的 etag 来标记下一次读取的起始位置,演示中通过再次插入两条记录来出发变更。
# read checkpoint continuation = container.client_connection.last_response_headers['etag'] # insert on item create_items(container, 3) # read changefeed with continuation response = container.query_items_change_feed(continuation,partition_key_range_id=0) for doc in response: print(doc)
到此为止,我们基础的通过 SDK 获取增量变化数据已经走通了,回到开篇的架构中我们需要将增量数据持续的更新到下游的 OLAP 数据仓库中,一种做法我们将上述代码逻辑进一步完善将其跑在虚拟机中持续执行。一种方法可以直接通过 Azure Function 来托管上述抽取增量变化数据逻辑。下一篇 Blog 中会为大家介绍通过 Function 服务的实现,通过 Function 服务我们可以实现代码运行环境的托管,按需执行(通过定期轮询的方式),以及通过内置连接器简化代码开发。
后给大家分享一些学习资料,上述演示中只是简单的带大家了解 Cosmos ChangeFeed,大家可以参考下述连接了解更多内容。除此之外大家需要注意 Cosmos DB ChangeFeed 目前还不支持捕获 Delete 条目,大家需要通过一些 Workaround 来实现,这里就不做赘述,感兴趣的小伙伴可以访问下面 ChangeFeed 介绍链接。好消息是对于 Delete 的支持目前已经在支持计划中,预计今年下半年会支持(https://azure.microsoft.com/en-us/updates/change-feed-with-full-database-operations-for-azure-cosmos-db/)
- https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed ChangeFeed 介绍
- https://docs.microsoft.com/en-us/python/api/azure-cosmos/azure.cosmos.container.containerproxy?view=azure-python#query-items-change-feed-partition-key-range-id-none--is-start-from-beginning-false--continuation-none--max-item-count-none----kwargs- Python ChangeFeed SDK 介绍