近在看 Flink 相关的东西,重读了下《Hadoop 权威指南》,以下内容作为学习笔记,版本为第四版(代码已经根据新的官网文档调整)。
很多人对 Hadoop 有一种误解,觉得 Hadoop 慢,而 Spark、Storm、Flink 很快,实际上它们并不是一个层面的东西。
Hadoop 生态
Hadoop 是一个很大的生态,包含了一系列的数据处理工具,基础的是 HDFS 和 MapReduce,前者可以简单理解为分布式文件系统,用来存数据,后者用来做计算。
Spark 等实现的是 MapReduce 的功能,或者说 MapReduce 的优化。此外 Hadoop 还有一个常见的组件是资源调度框架 YARN ,Spark 等都可以运行在 YARN 上。关联的项目就更多了,构建于 HDFS 上的 HBase,类 SQL 的 Hive 数据操纵语言,构建于 MapReduce 之上的 Mahout 机器学习框架,序列化数据的 Avro,日志收集 Flume 等等。
为什么需要 Hadoop
小数据情况下,数据库的 B 树很快,但数据量一多,涉及大量的排序和合并,MapReduce 的优势就上来了。
MapReduce 的原理网上很多,这里不过多介绍,有兴趣的可以看看谷歌的原始论文 MapReduce: Simplified Data Processing on Large Clusters。(以下的 MapReduce 不特别强调都是指 Hadoop 组件)。
MapReduce 的优势:
- 可以线性伸缩;
- 通过数据本地化获取更好的性能,将数据“搬运”到计算节点加快访问速度;
- 无共享的模式,自动检测失败,失败恢复
HDFS 与 MapReduce:
- map 任务的结果写入本地硬盘而不是 HDFS,因为终结果还需要 reduce, 所以任务结束后会被删除,没必要保存,同时读取数据也尽量本地化;
- 一个map 处理一个分片,默认为 128MB, 如果分片过小任务并行度不够;
- reduce 任务需要接收多个 map 任务,无法本地化,结果一般保存在 HDFS 上,个副本在本地节点,copy 在其他机架上保证可靠性;
- reduce 任务数(并行度)不是由输入数据数量决定的,reduce 任务也不是必须的
- 在 map 和 reduce 之间可以加入 combine 函数,相当于在 map 节点对数据进行一次“汇总”,但是 combine 要保证不改变终结果; map 在保存到磁盘前会先排序,如果需要combine会先combine;通过 partition 函数(有点像数据库的表分区,kafka topic 分区)发送到对应的 reducer
- HDFS 用于大数据存储,但不是特别适合大量小文件,每个文件目录数据块的基本信息约150字节,这些信息都存在内存上以快速访问;不适合低时间延迟的应用;追加写方式而不是覆盖原有内容,支持单写入者(每个文件),不支持多写入;
- 类似于磁盘的块,HDFS 也有块,默认是 128MB,小于一个块的文件不会占据整个块大小而是实际大小,块越大,定位时间越小,但同时map的并行度取决于块的数量;块抽象了文件的存储,简化了设计,管理和计算只要根据块而不是文件的数目,注意文件的元数据权限等可以单独存储和管理在另外的文件系统;
- HDFS 分为维护文件系统结构的 namenode 和 实际存储数据的 datanode (要理解这样的设计可以看下上文的谷歌论文);数据实际存储在磁盘的 dfs.datanode.data.dir (hdfs-default.xml)上
- hdfs 的并行复制 hdoop distcp file1 file2
- YARN 请求资源分配时用“就近原则”,优先在靠近存储数据的节点上分配资源;两层结构可以更容易的扩展;YARN 是一个通用的资源管理组件,不仅仅可以运行 MapReduce
- 推测执行:当多个任务并行时,任务的总体完成时间取决于慢的那一个,那么如果有一个任务过慢(可能是节点的硬件问题或配置问题),我们就可以启动另一个任务作为备份;推测执行会减少吞吐量;reduce 任务需要大量复制 map 数据,所以针对 reduce 关闭是有益的;非幂等的任务,即有些任务重复执行结果不一致
基础代码
以下代码省略了基础的包引入,需要懂基础的 Java
一个 Mapper 需要实现以下接口:
Class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
分别对应输入key 类型,输入value 类型,输出key类型,输出value类型,代码来自官网:
public class TokenCounterMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
其中的 context 用于输出写入(磁盘、HDFS等), Reducer 代码类似,都需要匹配对应的输入输出类型。
启动 job 的代码(含义很明确了,不过多解释):
Job job = Job.getInstance();
job.setJarByClass(MyJob.class);
job.setJobName("myjob");
job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));
job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);
job.waitForCompletion(true);
当然需要先安装 Hadoop,然后一堆路径配置,然后就可以运行了:
hadoop 类名 输入路径 输出路径
文件系统检查:
hdfs fsck / -files -blocks
其它的命令类似,常用的是 hdfs fs , 可以进行一些类似本地文件系统的操作:
hdfs fs -ls /
合并远程文件目录到本地文件,一般用于聚合 reduce 任务结果
hadoop fs -getmerge 目录 文件名
另外了解到有一个新的数据分析引擎 ClickHouse 据说非常之快,并且不依赖于 Hadoop,有兴趣的可以了解下。
参考文献:
- 谷歌三大论文(看了几遍,每遍都有新收获)
- Streaming Systems 做流计算的可以看下
- ClickHouse DBMS