绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
Azure Synapse Analysis 开箱 Blog
2022-03-30 16:23:43

     继续上一篇的内容,本文将为大家介绍将 ChangeFeed 抽取逻辑通过 Azure Function 服务实现,架构图如下,通过 Function 服务来执行 ChangeFeed 的读取,并将其转存至 DataLake 存储中。

    在前文后我们介绍了通过 Function 服务可以简化整个逻辑的代码开发,Azure Function 服务中原生已经内置了很多与 Azure 其他服务原生集成的连接器,可以帮助客户实现与上下游服务的对接,用户无需关注连接器的实现,通过框架的调用,用户可以直接可以通过对象访问到上下游服务中的数据,用户只需要关注业务逻辑即可。Cosmos Database 也在 Azure Function 的支持之中,并且内置的连接器采用的也是 ChangeFeed 来实现的,用户可以直接开箱即用实现 ChangeFeed 数据的读取,无需自己维护抽取逻辑代码。Azure Function 原生支持的连接器如下:

 

Type1.x2.x and higher1TriggerInputOutput
Blob storage
Cosmos DB
Event Grid 
Event Hubs 
HTTP & webhooks 
IoT Hub 
Microsoft Graph
Excel tables
  
Microsoft Graph
OneDrive files
  
Microsoft Graph
Outlook email
   
Microsoft Graph
events
 
Microsoft Graph
Auth tokens
   
Mobile Apps  
Notification Hubs   
Queue storage 
SendGrid  
Service Bus 
SignalR  
Table storage 
Timer  
Twilio  

         下面我们来开始操作:

1. 准备开发环境,建议使用 Visual Studio Code,其内置的 Azure Function 的开发扩展,可以方便开发

参考:https://docs.microsoft.com/en-us/azure/developer/python/tutorial-vs-code-serverless-python-01

2. 创建 Azure Function 项目,注意在选择 trigger 部分,选择 cosmos db

参考:https://docs.microsoft.com/en-us/azure/developer/python/tutorial-vs-code-serverless-python-02

3. 准备 function.json 配置文件,function.json 主要描述 function 与上下游数据连接的参数,其中下述配置中 type:cosmosDBTrigger 部分定义了 cosmos 的连接信息, databaseName,collectionName 替换为前面所创建的 cosmos db的名称,leaseCollectionName 是 function 用来维护租约和 CheckPoint。connectionStringSetting 参见后续 local.settings.json。type:blob 部分定义了 function 下游存储 Data Lake 的连接信息,其中 path 参数定义了 function 抽取增量变化数据在 Data Lake 中的存储路径,connection 参数参见后续 local.settings.json。另外 feedPollDelay 参数表示 function 服务轮询 ChangeFeed 数据的间隔,其单位为毫秒,在演示中建议大家可以设置为 60000,实际根据数据水线处理时间和数据更新需求来决定。

复制代码
{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "type": "cosmosDBTrigger",
      "name": "documents",
      "direction": "in",
      "leaseCollectionName": "<leasecollectionname>",
      "connectionStringSetting": "Cosmos_DOCUMENTDB",
      "databaseName": "<databasename>",
      "collectionName": "<collectionname>",
      "createLeaseCollectionIfNotExists": "true",
      "feedPollDelay": <pollinterval>
    },
    {
      "name": "outputblob",
      "type": "blob",
      "path": "<filesystemname>/{DateTime}.csv",
      "connection": "ChangeFeedResultStorage",
      "direction": "out"
    }
  ]
}
复制代码

4. 准备 local.settings.json,该配置文件中定义了在上述 function.json 中所引用的连接密钥参数, 其中 Cosmos_DOCUMENTDB 和 ChangeFeedResultStorage 内分别填入,Cosmos 和 Data Lake 的连接字符串,可以在 portal 中对应资源的 access 信息部分获取。

复制代码
{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "<connectionstring>",
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "Cosmos_DOCUMENTDB": "<cosmosconnectionstring>",
    "ChangeFeedResultStorage": "<datalakeconnectionstring>"
  }
}
复制代码

5. 准备业务逻辑代码 init.py,init.py 是 function 函数被拉起后的 entrypoint 函数,我们将前面介绍的抽取 changefeed 增量变化数据的代码逻辑定义其中,下述演示代码中通过调用 documents 和 outputblob 借助 function 内置的连接器实现对上下游数据访问,无需再自己开发集成代码。用户只需要开发自己的数据处理逻辑即可,演示中是将增量变化数据转存到 Data Lake 存储中。

复制代码
import logging
import json 
import csv
import io

import azure.functions as func

# read changefeed from cosmos, store as CSV file
def main(documents: func.DocumentList, outputblob: func.Out[str]) :    
    if documents:
        count = 0
        outputresult = io.StringIO()
        csv_writer = csv.writer(outputresult, quoting=csv.QUOTE_NONNUMERIC)
        for document in documents:
            if count == 0:
               header = json.loads(document.to_json()).keys()
               csv_writer.writerow(header)
               count += 1
            csv_writer.writerow(json.loads(document.to_json()).values())    
        outputblob.set(outputresult.getvalue())
复制代码

 6. 通过 VS Code Function 本地调试工具进行测试,可以仿真通过前面 blog 中的数据插入函数在 cosmos db 内插入一些新的数据,然后在 Data Lake 中是否转存成功。

参考:https://docs.microsoft.com/en-us/azure/developer/python/tutorial-vs-code-serverless-python-04?tabs=powershell

7. 将 Function 服务打包发布至 Azure Function 服务中,上述开发测试均在本地完成,测试无误后将代码正式发布至 Azure Function 服务

参考:https://docs.microsoft.com/en-us/azure/developer/python/tutorial-vs-code-serverless-python-05

        至此通过 Function 服务完成 ChangeFeed 读取及转存至 DataLake 已经完成。整个 Function 代码中 function.json 中针对不同连接器的参数说明大家可以参阅:https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-cosmosdb?tabs=csharp

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Azure Synapse Analytics
创建时间:2022-03-30 11:28:42
Azure Synapse Analytics
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • LCR_
    专家
戳我,来吐槽~