绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
Azure Synapse Analysis 开箱 Blog
2022-03-30 16:25:28

    上一篇为大家整体介绍了 Azure Synapse Analysis 和 Synapse Link,本篇开始为大家介绍前面介绍的 DIY 方案中的部分,Cosmos DB Change Feed。我们来回顾一下整个架构:

   对 OLTP 中的数据向下游数据服务进行同步时通常有两种做法,全量同步和增量同步,Cosmos DB 作为 OLTP 数据源,其内置的 ChangeFeed 功能可以帮助用户捕捉数据库中的增量变化数据,从而提升同步效率。ChangeFeed 通过将增量数据通过 SDK 暴露访问节点,用户可以通过 SDK 获取增量变化数据,同时其支持断点(CheckPoint)用户可以自由选择获取增量数据的起始位置。Cosmos Database 支持多种数据库引擎 API 接口,如 SQL,Mongo,Cassandra 等。过去一年的时间 Cosmos DB 对 ChangeFeed 的支持范围进行了更新,从 Day1 的只支持 SQL API 和 .NET SDK 到现在支持多种数据库引擎 API 以及多种开发语言的支持,为开发人员提供了一致的体验使产品获得更好的兼容性。


 

         本文以快速入手为目标,我们选择了 SQL API + Python 的方式为大家进行演示。在整个演示过程中我们先来设计一下原始数据表格的 Schema,我们创建一个商品价目表,结构如下,其中包含商品 ID,QUANTITY 数量, PRICE 价格。

{
  'id': <string>,
  'quantity': <int>,
  'price': <int>
}

        下面我们开始准备 Cosmos 环境:

1. 创建 Cosmos DB

参考:https://docs.microsoft.com/en-us/azure/cosmos-db/create-cosmosdb-resources-portal ,大家需要注意在创建时候选择 API 为 Core(SQL)。

2. 开启 Synapse Link 后续使用

参考:https://docs.microsoft.com/en-us/azure/cosmos-db/configure-synapse-link

3. 记录 Cosmos DB 访问密钥

参考:https://docs.microsoft.com/en-us/azure/cosmos-db/secure-access-to-data,按照文档获取 Master Key 并替换后续代码中 Config.py 中的 master_key 参数。

4. 准备 Config.py 配置文件,其中 host 在 Azure Portal 中选中你所创建的 Cosmos DB,在 Overview 页面 URI 中获取,master_key 替换为为步骤 3 记录的密钥,database_id 和 container_id 可以自行设定,比如可以命名为 demo。

复制代码
import os

settings = {
    'host': os.environ.get('ACCOUNT_HOST', '***'),
    'master_key': os.environ.get('ACCOUNT_KEY', '***'),
    'database_id': os.environ.get('COSMOS_DATABASE', '***'),
    'container_id': os.environ.get('COSMOS_CONTAINER', '***'),
}
复制代码

5. 创建 Database 和 Container

复制代码
import azure.cosmos.documents as documents
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.exceptions as exceptions
import azure.cosmos.partition_key as partition_key
import datetime
import uuid

import config

HOST = config.settings['host']
MASTER_KEY = config.settings['master_key']
DATABASE_ID = config.settings['database_id']
CONTAINER_ID = config.settings['container_demo_id']

# Create Cosmos Client
client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY})

# setup database for this sample
try:
    db = client.create_database(id=DATABASE_ID)
except exceptions.CosmosResourceExistsError:
    raise RuntimeError("Database with id '{}' already exists".format(DATABASE_ID))

# setup container for this sample
try:
    container = db.create_container(
        id=CONTAINER_ID,
        partition_key=partition_key.PartitionKey(path='/id', kind=documents.PartitionKind.Hash)
    )
    print('Container with id \'{0}\' created'.format(CONTAINER_ID))

except exceptions.CosmosResourceExistsError:
    raise RuntimeError("Container with id '{}' already exists".format(CONTAINER_ID))
复制代码

6. 生成仿真演示数据,演示中插入了 10 条演示数据

复制代码
def create_items(container, size):
    print('Creating Items')

    for i in range(1, size):
        c = str(uuid.uuid4())
        item_definition = {'id': c,
                           'quantity': 100,
                           'price': 20
                                }

        created_item = container.create_item(body=item_definition)

# Create Cosmos Client
client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY})

# create items
container = client.get_database_client(DATABASE_ID).get_container_client(CONTAINER_ID)

# insert on item
create_items(container, 11)
复制代码

7. 通过 ChangeFeed SDK 读取增量变更数据,在 query_items_change_feed 函数中带入了参数 is_start_from_beginning 表示从头开始读取增量变化数据

复制代码
# Create Cosmos Client
client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY})

# create items
container = client.get_database_client(DATABASE_ID).get_container_client(CONTAINER_ID)

# read changefeed
response = container.query_items_change_feed(is_start_from_beginning=True)
for doc in response:
    print(doc)
复制代码

8. 通过断点读取 ChangeFeed 增量变更数据, 通过获取签署 ChangeFeed 返回中的 etag 来标记下一次读取的起始位置,演示中通过再次插入两条记录来出发变更。

复制代码
# read checkpoint
continuation = container.client_connection.last_response_headers['etag']

# insert on item
create_items(container, 3)

# read changefeed with continuation
response = container.query_items_change_feed(continuation,partition_key_range_id=0)
for doc in response:
    print(doc)
复制代码

        到此为止,我们基础的通过 SDK 获取增量变化数据已经走通了,回到开篇的架构中我们需要将增量数据持续的更新到下游的 OLAP 数据仓库中,一种做法我们将上述代码逻辑进一步完善将其跑在虚拟机中持续执行。一种方法可以直接通过 Azure Function 来托管上述抽取增量变化数据逻辑。下一篇 Blog 中会为大家介绍通过 Function 服务的实现,通过 Function 服务我们可以实现代码运行环境的托管,按需执行(通过定期轮询的方式),以及通过内置连接器简化代码开发。

        后给大家分享一些学习资料,上述演示中只是简单的带大家了解 Cosmos ChangeFeed,大家可以参考下述连接了解更多内容。除此之外大家需要注意 Cosmos DB ChangeFeed 目前还不支持捕获 Delete 条目,大家需要通过一些 Workaround 来实现,这里就不做赘述,感兴趣的小伙伴可以访问下面 ChangeFeed 介绍链接。好消息是对于 Delete 的支持目前已经在支持计划中,预计今年下半年会支持(https://azure.microsoft.com/en-us/updates/change-feed-with-full-database-operations-for-azure-cosmos-db/

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Azure Synapse Analytics
创建时间:2022-03-30 11:28:42
Azure Synapse Analytics
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • LCR_
    专家
戳我,来吐槽~