版权申明:转载请注明出处。
文章来源:大数据随笔
1.压缩方式及实现类
hdfs上解压和压缩文件不像本地这么方便,所以写了一个工具。可以解压和压缩,使用的时候指定压缩或者解压使用的类就行。先列举一下hdfs各种压缩及解压方式的实现类:
压缩格式 | 实现类
--- | --
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec
gzip | org.apache.hadoop.io.compress.GzipCodec
bzip2 | org.apache.hadoop.io.compress.Bzip2Codec
lzo | com.hadoop.compression.lzo.LzopCodec
lz4 | org.apache.hadoop.io.compress.Lz4Codec
snappy | org.apache.hadoop.io.compress.SnappyCodec
2.压缩及解压工厂
package net.bigdataer.hdfs.compress;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
public class CompressFactory {
public static void main(String[] args) throws IOException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] uargs = new GenericOptionsParser(conf, args).getRemainingArgs();
String opt_type = uargs[];
String code_class = uargs[1];
String source_dir = uargs[2];
String goal_dir = uargs[3];
if("compress".equals(opt_type)){
compress(code_class,source_dir,goal_dir,conf);
}else if("uncompress".equals(opt_type)){
uncompress(code_class,source_dir,goal_dir,conf);
}
}
/**
* 压缩
* @param code_class
* @param source_dir
* @param goal_dir
* @param conf
* @throws ClassNotFoundException
* @throws IOException
*/
public static void compress(String code_class,String source_dir,String goal_dir,Configuration conf) throws ClassNotFoundException, IOException{
Class<?> codeClass = Class.forName(code_class);
FileSystem fs = FileSystem.get(conf);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codeClass, conf);
FSDataOutputStream output = fs.create(new Path(goal_dir), new Progressable(){
@Override
public void progress() {
System.out.println("*");
}
});
FSDataInputStream input = fs.open(new Path(source_dir));
CompressionOutputStream compress_out = codec.createOutputStream(output);
IOUtils.copyBytes(input, compress_out, conf);
IOUtils.closeStream(input);
IOUtils.closeStream(compress_out);
}
/**
* 解压缩
* @param decode_class
* @param source_dir
* @param goal_dir
* @param conf
* @throws ClassNotFoundException
* @throws IOException
*/
public static void uncompress(String decode_class,String source_dir,String goal_dir,Configuration conf) throws ClassNotFoundException, IOException{
Class<?> decodeClass = Class.forName(decode_class);
FileSystem fs = FileSystem.get(conf);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(decodeClass, conf);
System.out.println("load lzo class completed: "+decodeClass.getName());
FSDataInputStream input = fs.open(new Path(source_dir));
CompressionInputStream codec_input = codec.createInputStream(input);
OutputStream output = fs.create(new Path(goal_dir));
System.out.println(codec_input.toString()+"--"+output.toString());
IOUtils.copyBytes(codec_input, output, conf);
System.out.println("run uncompress completed");
IOUtils.closeStream(input);
IOUtils.closeStream(output);
}
}
3.使用方法
将上述代码打成jar包,放在hdfs上。运行如下命令:
hadoop jar compress.jar uncompress com.hadoop.compression.lzo.LzopCodec /hive/warehouse/ao/direction_show_log/dt=2016-07-16.m6/2016-07-16.tar.lzo /hive/warehouse/ao/ectech/tmp_disp/dt=2016-07-16/m6.log
个参数可选值有compress,uncompress,分别表示压缩和解压。
第二个参数根据不同的文件类型选择实现类。
第三个参数表示源文件路径。
第四个参数表示目标文件路径。