hadoop lzo介绍和使用

整理一下lzo相关知识和一些使用方法。
附上对指定目录下日志进行lzo压缩的代码:https://github.com/wzktravel/hadoop-codec

LZO is a compression codec which gives better compression and decompression speed than gzip, and also the capability to split. LZO allows this because its composed of many smaller (~256K) blocks of compressed data, allowing jobs to be split along block boundaries, as opposed to gzip where the dictionary for the whole file is written at the top.

When you specify mapred.output.compression.codec as LzoCodec, hadoop will generate .lzo_deflate files. These contain the raw compressed data without any header, and cannot be decompressed with lzop -d command. Hadoop can read these files in the map phase, but this makes your life hard.

When you specify LzopCodec as the compression.codec, hadoop will generate .lzo files. These contain the header and can be decompressed using lzop -d

However, neither .lzo nor .lzo_deflate files are splittable by default. This is where LzoIndexer comes into play. It generates an index file which tells you where the record boundary is. This way, multiple map tasks can process the same file.

See this cloudera blog post and LzoIndexer for more info.

创建索引

lzo格式默认是不支持splittable的,需要为其添加索引文件,才能支持多个map并行对lzo文件进行处理

MapReduce输出时创建索引

  1. 使用lzo索引生成器

    1
    2
    3
    // 使用lzo索引生成器
    LzoIndexer lzoIndexer = new LzoIndexer(conf);
    lzoIndexer.index(new Path(outputPath));
  2. 或者使用分布式索引生成器

    1
    2
    3
    DistributedLzoIndexer lzoIndexer = new DistributedLzoIndexer();
    lzoIndexer.setConf(conf);
    lzoIndexer.run(new String[]{outputPath});

对已经是lzo的文件建立索引

1
2
3
4
5
## 单机版
$ hadoop jar /opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/hadoop-lzo.jar com.hadoop.compression.lzo.LzoIndexer /path/to/lzo/part-00000.lzo
## 分布式版
$ hadoop jar /opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/hadoop-lzo.jar com.hadoop.compression.lzo.DistributedLzoIndexer /path/to/lzo/part-00000.lzo

索引文件与源文件在相同目录下。

beachmark

使用MapReduce做wordcount

输入 输入大小 输出大小 cpu memory map耗时 reduce耗时 总耗时
Text 70G 55G 190 389G 2分4秒 10分15秒 12分24秒
Lzo 4.3G 3.1G 36 78G 1分55秒 6分11秒 8分10秒
比率 6.14% 5.64% 18.95% 20.05% 92.74% 60.33% 65.86%

如何使用lzo

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void compress(String codecClassName) throws Exception {
Class<?> codecClass = Class.forName(codecClassName);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);
//指定压缩文件路径
FSDataOutputStream outputStream = fs.create(new Path(/user/hadoop/text.gz));
//指定要被压缩的文件路径
FSDataInputStream in = fs.open(new Path(/user/hadoop/aa.txt));
//创建压缩输出流
CompressionOutputStream out = codec.createOutputStream(outputStream);
IOUtils.copyBytes(in, out, conf);
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}

MapReduce

读取lzo文件

1
job.setInputFormatClass(LzoTextInputFormat.class);

map中间结果使用lzo压缩

1
2
conf.set("mapreduce.map.output.compress", "true");
conf.set("mapreduce.map.output.compress.codec", "com.hadoop.compression.lzo.LzoCodec");

输出lzo文件

1
2
3
4
5
6
7
8
9
10
11
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
int result = job.waitForCompletion(true) ? 0 : 1;
// 上面的语句执行完成后,会生成最后的输出文件,需要在此基础上添加lzo的索引
// 使用lzo索引生成器
LzoIndexer lzoIndexer = new LzoIndexer(conf);
lzoIndexer.index(new Path(outputPath));
// 或者使用分布式索引生成器
// DistributedLzoIndexer lzoIndexer = new DistributedLzoIndexer();
// lzoIndexer.setConf(conf);
// lzoIndexer.run(new String[]{outputPath});

Spark

读取lzo文件

spark可以直接读取lzo文件。

1
2
3
scala> val lzoFile = sc.textFile("/path/to/lzo/*.lzo")
scala> val lzoWordCounts = lzoFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b)
scala> lzoWordCounts.collect()

输出lzo文件

save时指定输出格式,classOf[com.hadoop.compression.lzo.LzopCodec]

1
2
val textFile = sc.textFile("/xxx/in")
textFile.saveAsTextFile("/xxx/out/", classOf[com.hadoop.compression.lzo.LzopCodec])

Hive

创建表时指定为lzo存储格式

1
2
3
4
5
6
7
8
9
10
CREATE EXTERNAL TABLE foo (
columnA string,
columnB string
) PARTITIONED BY (date string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY "\t"
STORED AS
INPUTFORMAT "com.hadoop.mapred.DeprecatedLzoTextInputFormat"
OUTPUTFORMAT "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
LOCATION '/path/to/hive/tables/foo';

修改表为lzo存储格式

对于已经创建好的表,使用alter语句,将其修改为lzo存储格式

1
2
3
4
ALTER TABLE foo
SET FILEFORMAT
INPUTFORMAT "com.hadoop.mapred.DeprecatedLzoTextInputFormat"
OUTPUTFORMAT "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";

插入数据

需要添加下面两个参数

1
2
SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec;

hadoop压缩方案

  1. 单纯hdfs文件,推荐使用lzo格式,解压缩和压缩比都比较均衡,还可以直接使用hadoop fs -text xx.log 查看文件内容
  2. hive推荐使用ORCfile
  3. Hbase推荐使用snappy进行压缩
  4. spark sql和impala,推荐使用parquet

参考

  1. Hadoop at Twitter (part 1): Splittable LZO Compression
  2. Do we need to create an index file (with lzop) if compression type is RECORD instead of block?
  3. What’s the difference between the LzoCodec and the LzopCodec in Hadoop-LZO?
  4. HDFS中文件的压缩与解压
  5. Hadoop, how to compress mapper output but not the reducer output
  6. mapreduce中的压缩
  7. mapred-default.xml
  8. Hadoop列式存储引擎Parquet/ORC和snappy压缩
  9. IBM Developerworks: Hadoop 压缩实现分析