flume-ng配置以及使用lzo

flume-ng配置以及使用过程中遇到的问题和解决办法。重点说一下hdfs.codeC使用lzo的问题。另外附上自己实现的flume插件,https://github.com/wzktravel/flume-agent,当前有一个interceptor,两个source:

  1. HDFSInterceptor,在header中加入时间,文件名,ip等
  2. SpoolDirectoryHourlySource,收集按小时进行切片的日志
  3. DirTailPollableSource2, 动态tail目录下最后修改的文件

以下都基于flume ng 1.6.0版本,最后附上flume上报到hdfs的配置。

引用第三方jar包

不必放在flume的lib目录下,参考http://flume.apache.org/FlumeUserGuide.html#installing-third-party-plugins
在flume目录下新建plugins.d目录,此目录下每个插件单独一个目录,每个插件目录下可以有lib,libext,native。

  • lib: 放置插件jar包
  • libext: 放置插件引用的jar包
  • native: 放置所需的native库, 比如.so文件

如我有hdfs插件和自己实现的flume interceptor/source以及lzo,目录格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
plugins.d/
plugins.d/custom/
plugins.d/custom/lib/flume-agent-1.0-SNAPSHOT.jar
plugins.d/hadoop/
plugins.d/hadoop/lib/commons-configuration-1.6.jar
plugins.d/hadoop/lib/hadoop-auth-2.6.0-cdh5.4.8.jar
plugins.d/hadoop/lib/hadoop-common-2.6.0-cdh5.4.8.jar
plugins.d/hadoop/lib/hadoop-hdfs-2.6.0-cdh5.4.8.jar
plugins.d/hadoop/lib/hadoop-nfs-2.6.0-cdh5.4.8.jar
plugins.d/hadoop/lib/htrace-core-3.0.4.jar
plugins.d/hadoop-lzo/
plugins.d/hadoop-lzo/lib/hadoop-lzo-cdh4-0.4.15-gplextras.jar
plugins.d/hadoop-lzo/native/libgplcompression.a
plugins.d/hadoop-lzo/native/libgplcompression.la
plugins.d/hadoop-lzo/native/libgplcompression.so
plugins.d/hadoop-lzo/native/libgplcompression.so.0
plugins.d/hadoop-lzo/native/libgplcompression.so.0.0.0

hdfs-sink

对于flume和hadoop在同一台机器上的,配置很简单,直接按照flume官网配置即可,但如果在没有hadoop的机器上使用hdfs-sink,需要一些额外的jar包,不像kafka-sink一样,flume/lib下已经有kafka相关的jar包了。

hdfs-sink所需jar包

最少需要这几个jar包,如果想使用lzo压缩,还需其他设置,看下文。

1
2
3
4
5
6
plugins.d/hadoop/lib/commons-configuration-1.6.jar
plugins.d/hadoop/lib/hadoop-auth-2.6.0-cdh5.4.8.jar
plugins.d/hadoop/lib/hadoop-common-2.6.0-cdh5.4.8.jar
plugins.d/hadoop/lib/hadoop-hdfs-2.6.0-cdh5.4.8.jar
plugins.d/hadoop/lib/hadoop-nfs-2.6.0-cdh5.4.8.jar
plugins.d/hadoop/lib/htrace-core-3.0.4.jar

hdfs.codeC使用lzo

先假定你的集群已经支持lzo了,如果不支持,可以参考Cloudera中配置hadoop_lzo:
http://wzktravel.github.io/2015/12/10/hadoop-lzo/

网上的资料一般都是flume和hadoop在同一台机器上的,这时候可以直接将hdfs.codeC设置为lzo,因为能找到对应的jar包和配置。
在纯净的机器上,需要下面几个配置:

  1. 手工编译lzo和hadoop-lzo的,直接将jar包放在plugins.d下即可。使用Cloudera安装hadoop-lzo parcel的,要将jar包和native下链接都放在plugins.d下。为何这样可以参考 hadoop-lzo.jar和hadoop-gpl-compression.jar区别:http://guoyunsky.iteye.com/blog/1289475
  2. 从hadoop集群上拉取core-site.xml放在flume/conf下,其实主要使用

    1
    2
    3
    4
    <property>
    <name>io.compression.codecs</name>
    <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.Lz4Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec</value>
    </property>
  3. 在flume配置

    1
    2
    a1.sinks.hdfs-sink.hdfs.fileType = CompressedStream
    a1.sinks.hdfs-sink.hdfs.codeC = com.hadoop.compression.lzo.LzopCodec

这里要说明一下,为什么只放置jar包不行呢?
因为在flume源码flume/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java getCodec()方法中使用hadoop了的CompressionCodecFactory.getCodecClasses(conf)

HDFSEventSink.getCodec

再去hadoop源码中看CompressionCodecFactory.getCodecClasses(conf)时发现

CompressionCodecFactory.getCodecClasses

IO_COMPRESSION_CODECS_KEY就是io.compression.codecs

IO_COMPRESSION_CODECS_KEY

所以需要core-site.xml来指定io.compression.codecs

HA(High Availability)

由于HDFS集群的HA机制,在flume上传时指定namenode的做法就不太好了,当hdfs集群的namenode状态发生变化时,flume上报时会报出Exception, Operation category READ(WRITE) is not supported in state standby,因为standby namenode是不对提供服务的。那么此时flume就处于不可用状态,必须手工修改配置文件然后重启flume才能解决。当要收集日志的服务器很多时,会增加很多人力成本;另外,日志上报状态监控没有做好的话,也许用到这个日志的时候才会发现flume出现问题。

解决这个问题也比较简单,就是将集群中hdfs-site.xml复制一份到flume conf目录下即可,当namenode状态切换时,flume也能正确将日志上报到hdfs中。
此时,hdfs.path配置也可以省略域名。

1
2
3
4
## 修改前
# a1.sinks.hdfs-sink.hdfs.path = hdfs://{active namenode ip}/facishare-data/app/center/web/%{year}/%{month}/%{day}/
## 修改后
a1.sinks.hdfs-sink.hdfs.path = /facishare-data/app/center/web/%{year}/%{month}/%{day}/

定制

参考http://flume.apache.org/FlumeDeveloperGuide.html

对source,interceptor,参考flume中的源码写即可。
需要引用jar包flume-ng-core:

1
2
3
4
5
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>${flume.version}</version>
</dependency>

我的flume上报到hdfs配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# name the components on this agent
a1.sources = r1
a1.sinks = hdfs-sink
a1.channels = hdfs-channel
# define interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.firstshare.flume.interceptor.HDFSInterceptor$Builder
a1.sources.r1.interceptors.i1.hdfsinterceptor.switch = true
# describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data/appStatLog/flume
a1.sources.r1.deletePolicy = immediate
a1.sources.r1.basenameHeader = true
a1.sources.r1.basenameHeaderKey = file
a1.sources.r1.ignorePattern = ^(.)*\\.tmp$
a1.sources.r1.fileSuffix = .COMPLETED
# hdfs sink
a1.sinks.hdfs-sink.type = hdfs
# hdfs路径规则: /facishare-data/产品线/模块/子模块/$year/$month/$day/$filename.$host.lzo
a1.sinks.hdfs-sink.hdfs.path = /facishare-data/app/center/web/%{year}/%{month}/%{day}/
a1.sinks.hdfs-sink.hdfs.filePrefix = %{filename}.%{host}
a1.sinks.hdfs-sink.hdfs.fileSuffix = .lzo
a1.sinks.hdfs-sink.hdfs.fileType = CompressedStream
a1.sinks.hdfs-sink.hdfs.codeC = com.hadoop.compression.lzo.LzopCodec
# roll -> close current file and create a new one
# Number of seconds to wait before rolling current file (0 = never roll based on time interval)
a1.sinks.hdfs-sink.hdfs.rollInterval = 0
# File size to trigger roll, in bytes (0: never roll based on file size)
a1.sinks.hdfs-sink.hdfs.rollSize = 204800000
a1.sinks.hdfs-sink.hdfs.rollCount = 0
# Timeout after which inactive files get closed (0 = disable automatic closing of idle files)
a1.sinks.hdfs-sink.hdfs.idleTimeout = 30
a1.channels.hdfs-channel.type = file
a1.channels.hdfs-channel.checkpointDir = ./checkpointDir
a1.channels.hdfs-channel.dataDirs = ./dataDir
# bind the source and sink to the channel
a1.sources.r1.channels = hdfs-channel
a1.sinks.hdfs-sink.channel = hdfs-channel

其中HDFSInterceptor是自己实现的,主要添加几个参数到header中,使上报到hdfs中的文件名更符合我们的需求。

参考文档