绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
HDFS压缩与解压工具
2020-05-13 13:46:40

版权申明:转载请注明出处。
文章来源:大数据随笔


1.压缩方式及实现类


hdfs上解压和压缩文件不像本地这么方便,所以写了一个工具。可以解压和压缩,使用的时候指定压缩或者解压使用的类就行。先列举一下hdfs各种压缩及解压方式的实现类:
压缩格式 | 实现类
--- | --
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec
gzip | org.apache.hadoop.io.compress.GzipCodec
bzip2 | org.apache.hadoop.io.compress.Bzip2Codec
lzo | com.hadoop.compression.lzo.LzopCodec
lz4 | org.apache.hadoop.io.compress.Lz4Codec
snappy | org.apache.hadoop.io.compress.SnappyCodec


2.压缩及解压工厂


package net.bigdataer.hdfs.compress;

import java.io.IOException;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

public class CompressFactory {

    public static void main(String[] args) throws IOException, ClassNotFoundException {

        Configuration conf = new Configuration();
        String[] uargs = new GenericOptionsParser(conf, args).getRemainingArgs();
        String opt_type = uargs[];
        String code_class = uargs[1];
        String source_dir = uargs[2];
        String goal_dir = uargs[3];

        if("compress".equals(opt_type)){
            compress(code_class,source_dir,goal_dir,conf);
        }else if("uncompress".equals(opt_type)){
            uncompress(code_class,source_dir,goal_dir,conf);
        }

    }

    /**
     * 压缩
     * @param code_class
     * @param source_dir
     * @param goal_dir
     * @param conf
     * @throws ClassNotFoundException
     * @throws IOException
     */
    public static void compress(String code_class,String source_dir,String goal_dir,Configuration conf) throws ClassNotFoundException, IOException{
        Class<?> codeClass  = Class.forName(code_class);
        FileSystem fs = FileSystem.get(conf);
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codeClass, conf);
        FSDataOutputStream output = fs.create(new Path(goal_dir), new Progressable(){
            @Override
            public void progress() {
                System.out.println("*");
            }
        });

        FSDataInputStream input = fs.open(new Path(source_dir));

        CompressionOutputStream compress_out = codec.createOutputStream(output);
        IOUtils.copyBytes(input, compress_out, conf);
        IOUtils.closeStream(input);
        IOUtils.closeStream(compress_out);
    }

    /**
     * 解压缩
     * @param decode_class
     * @param source_dir
     * @param goal_dir
     * @param conf
     * @throws ClassNotFoundException
     * @throws IOException
     */
    public static void uncompress(String decode_class,String source_dir,String goal_dir,Configuration conf) throws ClassNotFoundException, IOException{
        Class<?> decodeClass = Class.forName(decode_class);
        FileSystem fs = FileSystem.get(conf);
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(decodeClass, conf);
        System.out.println("load lzo class completed: "+decodeClass.getName());
        FSDataInputStream input = fs.open(new Path(source_dir));
        CompressionInputStream codec_input = codec.createInputStream(input);
        OutputStream output = fs.create(new Path(goal_dir));
        System.out.println(codec_input.toString()+"--"+output.toString());
        IOUtils.copyBytes(codec_input, output, conf);
        System.out.println("run uncompress completed");
        IOUtils.closeStream(input);
        IOUtils.closeStream(output);
    }
}



3.使用方法


将上述代码打成jar包,放在hdfs上。运行如下命令:


hadoop jar compress.jar uncompress com.hadoop.compression.lzo.LzopCodec /hive/warehouse/ao/direction_show_log/dt=2016-07-16.m6/2016-07-16.tar.lzo /hive/warehouse/ao/ectech/tmp_disp/dt=2016-07-16/m6.log



个参数可选值有compress,uncompress,分别表示压缩和解压。
第二个参数根据不同的文件类型选择实现类。
第三个参数表示源文件路径。
第四个参数表示目标文件路径。

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Apache Spark技术专区
创建时间:2020-05-08 17:16:40
Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎 。现在形成一个高速发展应用广泛的生态系统。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • 栈栈
    专家
戳我,来吐槽~