绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
本地及MapReduce作业读取SquenceFile
2020-05-13 13:46:19

版权申明:转载请注明出处。
文章来源:大数据随笔
排版乱?请移步原文获得更好阅读体验

1.SquenceFile简介

(1)SequenceFile是一个由二进制序列化过的key/value的字节流组成的存储文件。
(2)SequenceFile可通过fileWriter.append(key,value)来完成新记录的添加操作。
(3)在存储结构上,SequenceFile主要由一个Header后跟多条Record组成。
(4) Header主要包含了Key classname,Value classname,存储压缩算法,用户自定义元数据等信息,此外,还包含了一些同步标识,用于快速定位到记录的边界。
(5)每条Record以键值对的方式进行存储,用来表示它的字符数组可依次解析成:记录的长度、Key的长度、Key值和Value值,并且Value值的结构取决于该记录是否被压缩。


说到SquenceFile就不得不提MapFile,MapFile是一种排序了的SquenceFile,其由data和index两部分组成。. index作为文件的数据索引,主要记录了每个Record的key值,以及该Record在文件中的偏移位置。在MapFile被访问的时候,索引文件会被加载到内存,通过索引映射关系可迅速定位到指定Record所在文件位置,因此,相对SequenceFile而言,MapFile的检索效率是高效的,缺点是会消耗一部分内存来存储index数据。


2.本地读取SequenceFile


这种读取方式一般不太常见,偶尔用于数据抽样查看。抽取少量数据在本地环境下分析。下面的例子中,假如存储的SequenceFile的key的类型为NullWritable,value的类型为ByteWritable,且value是对象Feature序列化后的结果。下面的代码演示从这样一个sequencefile中反序列化出Feature。核心代码段如下:


//设置本地运行的参数
Configuration conf=new Configuration();  
conf.set("mapred.job.tracker", "local");  
conf.set("fs.default.name", "file:///");  
//声明一个SequenceFile.Reader
SequenceFile.Reader reader = null;
FileSystem fs = FileSystem.get(conf);
Path path = new Path(filename);
reader = new SequenceFile.Reader(fs,path,conf);
//获取key,value对象
NullWritable key=(NullWritable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
BytesWritable value=(BytesWritable)ReflectionUtils.newInstance(reader.getValueClass(), conf);
//读取。这里key的存储为空,只解析value的内容
while(reader.next(key, value)){
    byte[] bytes = value.copyBytes();
    ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
    ObjectInputStream ois = new ObjectInputStream(bis);
    Feature[] f = (Feature[])ois.readObject();
}



3.MapReduce作业中读取SequenceFile


SequenceFile就是为Hadoop设计的一种平面结构的存储形式,所以MapReduce作业中天然的支持读取这种结构。与读入文本形式存储的文件不同的是作业的输入类型需指定为SequenceFileInputFormat.class,除此之外几乎无差别。


package test0820;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SFInput {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(SFinput.class);

        job.setMapperClass(SFMapper.class);
        job.setReducerClass(SFReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(VLongWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(VLongWritable.class);

        job.setInputFormatClass(SequenceFileInputFormat.class);

        SequenceFileInputFormat.addInputPath(job, new Path(args[]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
    public static class SFMapper extends Mapper<Text, VLongWritable,Text, VLongWritable> {
        public void map(Text key, VLongWritable value, Context context)
                throws IOException, InterruptedException {
            context.write(key, value);
        }

    }
    public static class SFReducer extends Reducer<Text, VLongWritable,Text, VLongWritable>{
        @Override
        protected void reduce(Text key, Iterable<VLongWritable> v2s,Context context)
                throws IOException, InterruptedException {
            for(VLongWritable vl : v2s){
                context.write(key, vl);
            }
        }
    }
}

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Apache Spark技术专区
创建时间:2020-05-08 17:16:40
Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎 。现在形成一个高速发展应用广泛的生态系统。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • 栈栈
    专家
戳我,来吐槽~