1. 概述
Spark是基于内存计算的通用大数据并行计算框架,内置多种组件,如批处理、流处理、机器学习和图处理。Hive是基于Hadoop的数据仓库,支持类SQL的命令查询,提升了Hadoop的易用性。Spark与Hive、Hadoop通常是搭配使用,利用Hive中的数据分区可以方便地管理和过滤数据,提高查询效率。
DolphinDB是C++编写的高性能分布式时序数据库,内置高吞吐低延迟的列式内存引擎,集成了功能强大的编程语言,支持类Python和SQL的脚本语言,可以直接在数据库中进行复杂的编程和运算。DolphinDB内部用Data Source来抽象分区数据。在Data Source之上,可以完成SQL,机器学习,批处理和流处理等计算任务。一个Data Source既可以是内置的数据库分区,也可以是外部数据。如果Data Source是内置数据库的分区,大部分计算都可以在本地完成,极大地提升了计算和查询效率。
本报告将对DolphinDB、Spark直接访问HDFS(Spark+Hadoop,下文称为Spark)、Spark通过Hive组件访问HDFS(Spark+Hive+Hadoop,下文称为Spark+Hive)三者进行性能对比测试。测试内容包括数据导入、磁盘空间占用、数据查询以及多用户并发查询。通过对比测试,我们能更深入的了解影响性能的主要因素,以及不同工具的佳应用场景。
2. 环境配置
2.1 硬件配置
本次测试使用了两台配置完全相同的服务器(机器1,机器2),各个配置参数如下:
主机:DELL PowerEdge R730xd
CPU:Intel Xeon(R) CPU E5-2650 v4(24核 48线程 2.20GHz)
内存:512 GB (32GB × 16, 2666 MHz)
硬盘:17T HDD (1.7T × 10, 222 MB/s 读取;210 MB/s 写入)
网络:万兆以太网
OS: CentOS Linux release 7.6.1810 (Core)
2.2 集群配置
测试的DolphinDB版本为Linux v0.95。测试集群的控制节点部署在机器1上,每台机器上各部署三个数据节点,共六个数据节点。每个数据节点配置8个worker,7个executor,24G内存。
测试的Spark版本为2.3.3,搭载Apache Hadoop 2.9.0。Hadoop与Spark配置为完全分布式模式,机器1为Master,并且在机器1、机器2上都具有Slave。Hive的版本是1.2.2,机器1、机器2上都具有Hive。元数据存储在机器1上的MySql数据库中。Spark 与Spark + Hive使用Standalone模式下的client 方式来提交应用。
测试时,DolphinDB、Spark、Spark+Hive均配置6块硬盘,不同并发数下使用的CPU、内存总和都相同,都是48个线程,144G内存。Spark与Spark+Hive使用的资源只是对于特定的应用,每个应用有6个executor,在多用户并发的情况下,Spark、Spark+Hive单个用户使用的资源会随着用户数量增多而减少。不同并发数下每个用户使用的资源如表1所示。
表1.Spark、Spark+Hive不同并发数下单用户使用的资源
3. 数据集及数据库设计
3.1 数据集
测试数据集是纽约证券交易所(NYSE)提供的TAQ数据集,包含 8000 多支股票在2007.08.01-2007.08.31一个月内的Level 1报价数据,包含交易时间, 股票代码, 买入价, 卖出价, 买入量, 卖出量等报价信息。数据集中-共有 65 亿(6,561,693,704)条报价记录,一个 CSV 中保存一个交易日的记录,该月共23个交易日,未压缩的 23个CSV 文件共计 277 GB。
数据来源:www.nyse.com/market-data…
3.2 数据库设计
表2. TAQ在各个系统中的数据类型。
在 DolphinDB database 中,我们按照date、symbol列组合分区,分区使用日期DATE来进行值分区,共23个分区,第二分区使用股票代码SYMBOL来进行范围分区,分区数量100个,每个分区大约120M左右。
Spark存储在HDFS上的数据以23个csv对应23个目录。Spark+Hive采用两层分区,层分区使用日期DATE列进行静态分区,第二层分区使用股票代码SYMBOL进行动态分区。
具体脚本见附录。
4. 数据导入和查询测试
4.1 数据导入测试
原始数据均匀地分布在两台服务器的6个硬盘上,这样可以充分利用集群中的所有资源。DolphinDB通过异步多节点的方式并行导入数据,Spark与Spark+Hive并行启动6个应用来读取数据,把数据存储到HDFS中。各个系统导入数据的时间如表3所示。各个系统中数据占用的磁盘空间如表4所示。数据导入脚本见附录。
表3. DolphinDB、Spark、Spark+Hive导入数据时间
表4. DolphinDB、Spark、Spark+Hive中数据占用的磁盘空间
DolphinDB的导入性能明显优于Spark和Spark+Hive,是Spark的4倍左右,是Spark + Hive的6倍左右。DolphinDB使用C++编写并且内部有很多优化,极大地利用了磁盘的IO。
DolphinDB占用的磁盘空间大于Spark与Spark+Hive,大约是他们的2倍,这是因为Spark和Spark+Hive在Hadoop上都使用Parquet格式,Parquet格式通过Spark写入到Hadoop上默认使用snappy压缩。
4.2 数据查询测试
为保证测试公平,每个查询语句要进行多次测试,每次测试之前均通过 Linux 系统命令分别清除系统的页面缓存、目录项缓存和硬盘缓存。DolphinDB还清除其内置缓存。
表5中的查询语句涵盖了大部分查询场景,包含分组、排序、条件、聚合计算、点查询、全表查询,用来评估DolphinDB、Spark、Spark+Hive在不同用户数量提交下的性能。
表5. DolphinDB、Spark、Spark+Hive查询语句
4.2.1 DolphinDB与Spark单用户查询测试
以下是DolphinDB与Spark单用户查询的结果,结果中的耗时为查询8次的平均用时。
表6. DolphinDB、Spark单用户查询结果
从结果可以看出,DolphinDB的查询性能是Spark+HDFS的200倍左右。查询Q1到Q6都是以DolphinDB的分区字段为过滤条件,DolphinDB只需要加载指定分区的数据,无需全表扫描,而Spark从Q1到Q6都需要全表扫描,耗费大量的时间。对于查询Q7,DolphinDB和Spark都需要全表扫描,但是DolphinDB只加载相关的列,无需加载所有列,而Spark则需要加载所有数据。由于Query运行时间被数据加载主导,DolphinDB和Spark的性能差距没有之前的查询语句的大。
4.2.2 DolphinDB与Spark+Hive单用户查询测试
由于DolphinDB的数据经过分区,且在查询的时候实现谓词下推,效率明显高于Spark。此处我们使用Spark搭载Hive组件来访问HDFS,对比DolphinDB和Spark+Hive的查询性能。以下是DolphinDB、Spark+Hive单用户查询的结果,结果中的耗时为查询8次的平均用时。
表7. DolphinDB、Spark+Hive单用户查询结果
结果显示,DolphinDB的查询性能明显优于Spark+Hive,是Spark+Hive的数十倍。与表6的结果相比,Spark+Hive的查询速度比Spark要快得多,DolphinDB具有的优势明显下降了很多。这是因为Hive对数据进行分区,且在查询语句的条件带有分区字段的时候,只加载部分数据,实现数据过滤,提高效率。查询语句Q7扫描全表的时候会出现内存溢出。
DolphinDB、Spark+Hive都对数据进行了分区,且在加载数据时都可以实现谓词下推,达到数据过滤的效果,但是DolphinDB的查询速度优于Spark+Hive。这是因为Spark+Hive区读取HDFS上的数据是不同系统之间的访问,数据要经过序列化、网络传输、反序列化的过程,非常耗时,从而影响性能。DolphinDB的大部分计算都在本地完成,减少了数据传输,因此更加高效。
4.2.3 DolphinDB与Spark计算能力对比
上面DolphinDB分别与Spark、Spark+Hive的查询性能对比,由于数据分区、查询时的数据过滤以及传输影响了Spark的性能,因此这里我们先把数据加载到内存中,再进行相关的计算,比较DolphinDB和Spark+Hive。我们省略了Spark+Hive,因为使用Hive只是为了数据过滤,读取HDFS上的数据更加高效,这里的测试数据已经在内存中。
表8是测试计算能力的语句。每次测试都包含两个语句,个语句是把数据加载到内存中,第二个语句是对内存中的数据进行计算。DolphinDB会自动缓存数据,Spark则通过自己的默认缓存机制重新创建一个临时表TmpTbl。
表8. DolphinDB与Spark计算能力对比语句
以下是DolphinDB与Spark计算能力的测试结果,结果中的耗时是测试5次的平均用时。
表9. DolphinDB与Spark计算能力测试结果
由于数据已经在内存中,对比表6,Spark使用的时间大幅度减少,但是DolphinDB的计算能力仍然比Spark优越。DolphinDB用C++编写,自己管理内存,比起Spark使用JVM来管理内存更加高效。另外,DolphinDB内置了更高效的算法,提高了计算性能。
DolphinDB的分布式计算以分区为单位,计算指定内存的数据。Spark加载整个HDFS上的块,一个数据块包含了具有不同symbol值的数据,虽然缓存,但是仍然要筛选,所以在Q1与Q2的比值较大。Spark计算时使用的广播变量是经过压缩的,传输到其他的executor上再解压影响性能。
4.2.4 多用户并发查询
我们使用表5中的查询语句,对DolphinDB、Spark、Spark+Hive进行多用户并发查询测试。以下是测试结果,结果中的耗时是查询8次的平均用时。
表10. DolphinDB、Spark、Spark+Hive多用户并发查询结果
图1. DolphinDB、Spark多用户查询结果对比
图2. DolphinDB、Spark+Hive多用户查询结果对比
从上面的结果可以看出,随着并发数量的增加,三者的查询时间逐渐增加。当达到8个用户并发的时候Spark性能较之前少量的用户并发情况下显著下降,Spark 在执行Q7的时候会导致worker死亡。Spark+ Hive在多用户访问的时候与DolphinDB一样,基本保持稳定,但是执行Q7查询语句的一直会出现内存溢出的异常。
Spark+ Hive的查询配置与Spark 一样,因为有分区的作用,并且可以过滤数据,查询数据量比较小,所以效率相对于Spark扫描全部数据比较好。
DolphinDB在并发查询中性能明显优于Spark 与Spark+ Hive,从上图可以看出在多用户并发访问情况下,随着用户数量的增加,DolphinDB相对于Spark 的优势几乎是线性增长,相对于Spark + Hive 的优势基本保持不变,体现了有数据分区在查询的时候实现数据过滤的重要性。
DolphinDB在多用户并发的情况下实现了多用户的数据共享,不像Spark 的数据只是针对于具体的应用。所以在8个并发用户的情况下,Spark 每个用户分配到的资源比较少,性能显著下降。DolphinDB的数据共享可以减少资源的使用,在有限的资源下,把更多的资源留给用户计算使用,提高用户并发的效率,增大用户的并发数量。
5. 小结
在数据的导入方面,DolphinDB可以并行加载,Spark与Spark+Hive 则通过多个应用同时加载来导入数据。DolphinDB的导入速度是Spark 和Spark+ Hive 的4-6倍。在磁盘空间上,DolphinDB占用的磁盘空间是Spark与Spark+ Hive在Hadoop上占用的磁盘空间的两倍左右,Spark与Spark + Hive使用了snappy压缩。
在数据的SQL查询方面,DolphinDB的优势更加明显。优势主要来自四个方面:(1)本地化计算,(2)分区过滤,(3)优化的内存计算,(4)跨会话的数据共享。在单用户查询情况下,DolphinDB的查询速度是Spark的几倍到上百倍,是Spark+ Hive 的几十倍。Spark 读取HDFS 是不同的系统之间的调用,其中包含了数据的序列化,网络,反序列化非常消耗时间,且占据很多的资源。DolphinDB的SQL查询大部分是本地化计算,大幅减少了数据传输和加载的时间。Spark+ Hive 相对与Spark速度提升很大,主要是因为Spark + Hive只扫描相关分区的数据,实现了数据的过滤。在剔除本地化和分区过滤的因素后(即所有数据已经在内存中),DolphinDB的计算能力仍然优于Spark数倍。DolphinDB基于分区的分布式计算效率很高,且对内存的管理比Spark基于JVM的管理更加。Spark的多用户并发会随着用户数量的增多效率逐渐下降,在查询大数据量的时候用户过多导致worker 死亡。Spark + Hive的多用户并发相对比较稳定,但是加载数据过大会出现内存溢出错误。 多用户情况下, DolphinDB可以实现数据的共享,从而减少加载数据使用的资源,查询速度是Spark的数百倍,是Spark+Hive 的几十倍。随着用户数量的增加,DolphinDB相对于Spark的性能优势更加明显。涉及到分区查询的情况下,Spark+ Hive与DolphinDB显著提高查询性能。
Spark是一个非常的通用分布式计算引擎,在SQL查询、批处理、流处理、机器学习等方面均有上佳表现。但由于SQL查询通常只需要对数据计算一次,相对于机器学习需要上百次的迭代,内存计算的优势无法充分体现。因此,我们更建议将Spark用于计算密集型的机器学习。
在测试过程中,我们也发现DolphinDB是一个非常轻量级的实现,集群的搭建简单快速, Spark + Hive+ Hadoop 集群安装配置非常复杂。
附录
附录1. 数据预览
附录2. Hive创建表语句
CREATE TABLE IF NOT EXISTS TAQ (time TIMESTAMP, bid DOUBLE, ofr DOUBLE, bidsiz INT, ofrsiz INT, mode INT, ex TINYINT, mmid STRING)PARTITIONED BY (date DATE, symbol STRING) STORED AS PARQUET;
复制代码
附录3.
DolphinDB导入数据脚本:
fps1、fps2分别代表机器1、2上所有的csv路径的vector
fps是包含fps1和fps2 的vector
allSites1、allSites2 分别代表机器1、2上数据节点名称的vector
allSite 是包含 allSites1和allSites2的vector
DATE_RANGE=2007.07.01..2007.09.01
date_schema=database('', VALUE, DATE_RANGE)
symbol_schema=database('', RANGE, buckets)
db=database(FP_DB, COMPO,[date_schema,symbol_schema])
taq = db.createPartitionedTable(schema, `taq, `date`symbol)
for(i in 0..1){
for(j in 0..(size(fps[i])-1)) {
rpc(allSites[i][j] % size(allSite[i])],submitJob,"loadData" , "loadData" ,loadTextEx{database(FP_DB), "taq", `date`symbol, fps[i][j]} )
}
}
复制代码
Spark与Hive导入数据的配置:
--master local[8]
--executor-memory 24G
作者:DolphinDB
链接:https://juejin.cn/post/6917049711161507847