绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
传奇公司Snowflake(2):半结构化数据,细微之处见真功
2022-03-10 15:05:36

按照重要程度来说,也许我应该讲一下Snowflake如何做数据分发和共享。但是,在写这部分前,我需要自己真正上传一些数据到Snowflake中,这样才能去体验如何共享数据。

那本文我们先真正上传并分析一些数据吧。

我们知道,随着JSON的流行,半结构化(Semi-Structured)数据对于数据分析也越来越重要,那我们就先来看看Snowflake对于半结构化数据的处理吧。

场景

近恰好了解了一下“易观第四届OLAP算法大赛”(在 Apache DolphinScheduler 社区中熟识了好几位来自易观的朋友),参见

算法大赛评委亲授通关秘籍,报名倒计时!

本次大赛的主题是:“事件分析”。“事件分析”功能的使用频次极高。事件分析模型主要用于分析用户打开 APP、注册、支付订单等在应用上的行为,通过触发用户数、触发次数等基础指标度量用户行为,也支持指标运算、构建复杂的指标衡量业务过程。

一共有三道测试题,让我们来分别用Snowflake解决:

  • 场景1:多指标多维度分析;
  • 场景2:多指标多维度分析,并计算任意维度小计、合计值;
  • 场景3:多指标多维度分析,计算任意维度小计、合计值,并支持关联用户属性数据。

了解数据

首先,我们先看看测试数据是什么样子的。

测试数据解压后,一共两个文本文件,分别为:

  • olap_2020_profile_test.dat: 用户 profile 数据, 大小为: 161M
  • olap_2020_event_test.dat: event数据,大小为: 3G

对于profile数据,其有2列:

  • 1、用户ID(distinct_id),Long类型 与event表的对应
  • 2、内容明细,json格式,其内容类似于
    {
        "age": 39,
        "fq15": 1,
        "fq16": 1,
        "fq17": 1,
        "fq2": 1,
        "fq5": 1,
        "fq6": 1,
        "total_amount": 10019.16,
        "total_visit_days": 184
    }

对于event数据,其有6列:

  • 1、用户ID(distinct_id),Long类型
  • 2、时间戳(xwhen),毫秒级别UNIXTIME,Long类型
  • 3、事件CODE(xwhat),字符串类型,包含startUp、login、searchGoods等多个事件
  • 4、事件ID(xwhat_id) ,与事件code一一对应,Int类型。
  • 5、日期(ds),事件发生的日期YYYYMMDD格式
  • 6、内容明细,json格式。不同事件、记录包含不同属性,包含字符串、浮点、整形3种类型的数据。总属性个数 30-50个左右,其内容类似于
     {
        "app_version": "V1.2",
        "city": "杭州",
        "commodityname": "大闸蟹",
        "commoditynumber": 34,
        "country": "中国",
        "firstcommodity": "生鲜",
        "os": "Android",
        "os_version": "Android 6.0.1",
        "price": 755,
        "province": "浙江",
        "secondcommodity": "海鲜速食"
    }

上传数据

接下来我们需要把数据从本地导入到Snowflake中。也许我们反应是写个Java程序来解析文本文件(TSV格式),然后通过JDBC批量插入到Snowflake中。

但是,我们知道,当导入大量数据时,JDBC、ODBC往往是慢的方法。数据库一般都有更快速的批量加载CSV文件的功能。让我们使用这个功能吧。

Snowflake有多种使用方式:

  • Web Interface: 前文中用的就是Web上的界面
  • SnowSQL: 命令行工具
  • Connectors & Drivers: 面向开发的一些连接器
    • Snowflake Connector for Python
    • Snowflake Connector for Spark
    • Snowflake Connector for Kafka
    • Node.js Driver
    • Go Snowflake Driver
    • .NET Driver
    • JDBC Driver
    • ODBC Driver

今天我们来使用其命令行工具 SnowSQL 来操作。用过一些SQL命令行工具的朋友肯定对SnowSQL不会陌生。不过一个细节是SnowSQL居然在命令行里也做了“自动补全”的功能(如下图):

首先,我们把profile文本上传到 Snowflake 的Stage区 (Snowflake中,我们可以使用Snowflake内置的Stage区,也可以挂载外部的Amazon S3等对象存储)

put file:///Users/wubaoqi/Downloads/olap_2020_test/olap_2020_profile_test.dat @~/mytest;

返回了结果信息:

  • source: olap_2020_profile_test.dat
  • target: olap_2020_profile_test.dat.gz
  • source_size: 168441847
  • target_size: 18826148
  • target_compression: GZIP
  • status: UPLOADED

可以看出Snowflake对我们上传的文本文件,先做了GZIP压缩后再存储。

第二步,创建Snowflake中的表

create table profile (
   distinct_id BIGINT,
   content OBJECT
)

这里注意,Snowflake中支持3种类型的半结构化数据,其类型分别是:

  • VARIANT
  • OBJECT
  • ARRAY

VARIANT可以存储其它数据类型,包含OBJECT或ARRAY本身,只要其大小压缩后小于16MB就可以。OBJECT和ARRAY是VARIANT的一种特例,类似于JSON中的对象和数组。而且VARIANT、OBJECT、ARRAY都支持嵌套。所以,其表达能力是很强的。

关于半结构化的数据类型,详细可以参考: https://docs.snowflake.com/en/sql-reference/>

第三步,把数据从Stage存储中加载到表格中。

copy into profile from @~/mytest/olap_2020_profile_test.dat file_format = (type=csv, FIELD_DELIMITER='\t');

得到了:629,612 行数据,耗时63秒(注意:这里使用了Snowflake小的计算VW,所以会慢些)

同样的过程,我们导入 event 表, 得到 12,444,499 行数据,耗时 173 秒

场景1:多指标多维度分析

题目: 查询20200701-20200707 的行为数据,按ds进行分组统计uv、pv,并计算合计值。

这个不需要读取json中的内容,所以比较简单:

select ds, 
       count(distinct distinct_id) as uv,
       count(distinct_id) as pv
from event
where ds between '20200701' and '20200707'
group by grouping sets ((), ds)
order by ds asc

获得结果 (8 Rows produced. Time Elapsed: 3.056s), 注: NULL值代表该行为小计或总计

[公式]

场景2:多指标多维度分析,并计算任意维度小计、合计值

题目: 查询20200701-20200707 的行为数据,按ds +os+os_version 进行分组统计uv,pv ,并计算总计以及os的小计值与合计值。

这个只涉及 event 表中的OBJECT字段。

select content:os::string as os,
       content:os_version::string as os_version,
       ds,
       count(distinct distinct_id) as uv,
       count(distinct_id) as pv
from event
where ds between '20200701' and '20200707'
group by grouping sets (
    (content:os::string, content:os_version::string, ds),
    (content:os::string, ds),
    content:os::string,
    ()
)
order by os asc, os_version asc, ds asc

获得结果 (45 Rows produced. Time Elapsed: 4.190s), 注: NULL值代表该行为小计或总计

[公式]

这里可以得知,Snowflake中,如果想取出OBJECT类型中的某个值,可以用“:”这个操作符,比如 “content:os” 获取OBJECT的key为os的值。 注意这里返回的仍然是一个类似 JsValue 的值,如果我们需要转化为字符串类型,我们可以写为: content:os::string

场景3:多指标多维度分析,计算任意维度小计、合计值,并支持关联用户属性数据

题目: 计算20200701-20200707行为数据中事件=’addtoshoppingcart’.按 ds+用户表的lib进行分组统计UV以及price的汇总值。

select profile.content:lib::STRING as lib,
       event.ds,
       count(distinct event.distinct_id) as uv,
       sum(event.content:price::NUMBER(35, 3)) as "sum(price)"
from event left outer join profile
on event.distinct_id = profile.distinct_id
where ds between '20200701' and '20200707' and event.xwhat='addtoshoppingcart'
group by 1,2
order by lib asc nulls first, ds asc

这里稍微复杂的有了一个JOIN语句,得到结果:(42 Rows produced. Time Elapsed: 2.041s), 注: NULL值代表该行为小计或总计

[公式]

场景小结

至此,顺利完成该场景的测试验证。整个过程还是比较快和方便的,而且使用Snowflake来处理JSON确实挺方便的(后面会比较和Spark的JSON处理区别)

然后,还是看看Snowflake的账单情况 (用了 0.83 credit)

然后看一下这两个表的存储空间使用 (共占用了177MB,看来是算的压缩后的占用空间)

对比Apache Spark的JSON相关语法

对于JSON数据的处理,我们看到当Snowflake加载CSV时,我们可以直接设置目标列为 OBJECT 类型,然后就可以使用类似于: content:os::string 的方式来获取值了。

对于Spark来说,一般我们都是把JSON数据存为 STRING 类型。然后,我们可以使用 GET_JSON_OBJECT() 函数来获取

对于event表中的json:

 {
    "app_version": "V1.2",
    "city": "杭州",
    "commodityname": "大闸蟹",
    "commoditynumber": 34,
    "country": "中国",
    "firstcommodity": "生鲜",
    "os": "Android",
    "os_version": "Android 6.0.1",
    "price": 755,
    "province": "浙江",
    "secondcommodity": "海鲜速食"
}

我们可以用

SELECT GET_JSON_OBJECT(content, '$.os') AS `os`
FROM event

可以看到:我们需要用类似 JSON Path的语法来定位到对应元素, 另外,这里看着不需要额外加入 CAST AS STRING 语句, 是因为这个函数返回的内容都变为String了,当我们要获取price这个数值时,我们则需要额外的CAST

SELECT GET_JSON_OBJECT(content, '$.os') AS `os`,
      CAST(GET_JSON_OBJECT(content, '$.price') AS LONG) AS `price`
FROM event

也许你会说, Snowflake加载CSV时,指定了 OBJECT 类型,而不是先存为STRING。 那我们看看SPARK是否能同样的把STRING存为 MAP 等类型?

首先尝试:

SELECT CAST(content AS MAP) AS content
FROM event

报错: DataType map is not supported. 发现,我们需要明确指定 MAP的具体key 和 value的类型。

尝试二:

SELECT CAST(content AS MAP<STRING, STRING>) AS content
FROM event

报错: cannot resolve `content`' due to data type mismatch: cannot cast string to map<string,string>。 哦,原来我的数据里,有的value是数值类型,不全是STRING类型,导致错误。而Spark中并没有一种类型能直接同时代表 INT 和 STRING。

换个思路,如果我们不能用MAP,是否可以用STRUCT 这种自定义类型? 但是我又不知道该把这个JSON写为啥 STRUCT 结构。幸好,spark 2.4以后的版本加入了: SCHEMA_OF_JSON 这个函数来获取该列可能的数据结构。

SELECT SCHEMA_OF_JSON(content) AS content
FROM event

仍然出错,报错误:

cannot resolve 'schemaofjson(`content`)' due to data type mismatch: The input json should be a string literal and not null; however, got `content`.

这个错误有点费解,后来终于明白, SCHEMA_OF_JSON 这个函数的参数只能是STRING常量,不能是STRING类型的列。

SELECT schema_of_Json('{"app_version": "V1.2", "city": "杭州", "commodityname": "大闸蟹", "commoditynumber": 34, "country": "中国", "firstcommodity": "生鲜", "os": "Android", "os_version": "Android 6.0.1", "price": 755, "province": "浙江", "secondcommodity": "海鲜速食"}')
FROM event

OK,终,得到了这个JSON String对应的Schema应该为:

struct<
  app_version:string,
  city:string,
  commodityname:string,
  commoditynumber:bigint,
  country:string,
  firstcommodity:string,
  os:string,
  os_version:string,
  price:bigint,
  province:string,
  secondcommodity:string
>

哇,不容易。那接下来我们终于可以像Snowflake那样来访问数据了(直接把content由STRING CAST 为这个复杂的STRUCT是无法成功的,这里用了一个 Spark的专用函数 FROM_JSON)。

SELECT content.os, content.price
FROM (
    SELECT FROM_JSON(content, 'struct<app_version:string,city:string,commodityname:string,commoditynumber:bigint,country:string,firstcommodity:string,os:string,os_version:string,price:bigint,province:string,secondcommodity:string>') as content
    FROM event
)

终,我们终于把JSON转为了Spark中的Struct,然后终于可以像Snowflake那么来方便操作半结构数据了。但是整个过程却总有人有一种:从入门到放弃的感觉

Snowflake自动索引半结构化数据,加速查询速度

上面,我们只是说了Snowflake对于半结构化数据处理的语法非常方便。但是光语法方便并不能说其是 JSON Native 的。因为我们也可以通过一些辅助程序,把这种语法转为通过上面SPARK/Hive等的 GET_JSON_OBJECT来实现。但是调用这么多 GET_JSON_OBJECT,以及冗余的类型转化,对于性能影响巨大,所以,不能期望SPARK中的 GET_JSON_OBJECT 能有多快。

而对于Snowflake,其理念是:为客户省心。这意味着,尽可能的让客户少操心一些细节。所以,Snowflake中是不会让客户手工指定主键,也不会需要手工建立索引的。但是这些索引又是使得终查询更快的关键。那怎么办呢? Snowflake自动替客户建立合适的索引,对于半结构化数据也不例外。

对于这些实现细节,我们可以参考一下Snowflake的论文《The Snowflake Elastic Data Warehouse》: http://pages.cs.wisc.edu/~yxy/cs839-s20/papers/snowflake.pdf

文中有介绍其对于半结构化数据的支持:

存取部分,利用内部存储格式使得从 VARIANT(以及OBJECT、ARRAY)中存取数据都非常高效,

  • 无需额外的复制
  • A child element is just a pointer inside the parent element
  • Extraction is often followed by a cast of the resulting VARIANT value to a standard SQL type。 同时,其内部存储格式也使得这些类型转化非常高效

Snowflake在存储Variant数据时,会大量利用算法来统计Variant数据中的模式信息,并利用历史查询记录,推算出该Variant列中,哪些访问路径是常见的,把这些列单独存为物理列(列式存储),而当查询的时候,则再把这些列合并到VARIANT中。

值得借鉴的是其测试方法: Snowflake为了测试其对半结构化数据的自动类型推导和自动索引是有效的,其对于TPC-H同样的测试数据集做了两个不同的存储实验: 一种是各个列都存为原始的数据类型,另一种是把所有的列组装成一个 VARIANT(OBJECT) 类型,然后测试其性能区别。 据论文数据,大多数查询的性能区别都小于 10% ! Very Impressive!

总结

半结构数据(比如JSON)等越来越流行,也越来越重要,而对于JSON类型的数据分析,我们能做些什么?

当我们使用Spark中的GET_JSON_OBJECT来处理JSON数据时,其实我们也没有觉得有啥不妥。但是,没有对比就没有伤害。我们要时常多了解一下业界是否有更好的处理方式。

在细节方面,我们不能只是一味问客户:您需要什么? 而也要向Snowflake学习,更多的问自己:如果我们的目标就是更好为客户服务,我们能为客户做些什么?


来源 https://zhuanlan.zhihu.com/p/260686089

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Snowflake
创建时间:2022-03-04 16:28:17
Snowflake
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • itt0918
    专家
戳我,来吐槽~