flume-ng配置以及使用过程中遇到的问题和解决办法。重点说一下hdfs.codeC使用lzo的问题。另外附上自己实现的flume插件,https://github.com/wzktravel/flume-agent,当前有一个interceptor,两个source:
- HDFSInterceptor,在header中加入时间,文件名,ip等
- SpoolDirectoryHourlySource,收集按小时进行切片的日志
- DirTailPollableSource2, 动态tail目录下最后修改的文件
以下都基于flume ng 1.6.0版本,最后附上flume上报到hdfs的配置。
引用第三方jar包
不必放在flume的lib目录下,参考http://flume.apache.org/FlumeUserGuide.html#installing-third-party-plugins
在flume目录下新建plugins.d
目录,此目录下每个插件单独一个目录,每个插件目录下可以有lib,libext,native。
- lib: 放置插件jar包
- libext: 放置插件引用的jar包
- native: 放置所需的native库, 比如
.so
文件
如我有hdfs插件和自己实现的flume interceptor/source以及lzo,目录格式如下:
hdfs-sink
对于flume和hadoop在同一台机器上的,配置很简单,直接按照flume官网配置即可,但如果在没有hadoop的机器上使用hdfs-sink,需要一些额外的jar包,不像kafka-sink一样,flume/lib下已经有kafka相关的jar包了。
hdfs-sink所需jar包
最少需要这几个jar包,如果想使用lzo压缩,还需其他设置,看下文。
hdfs.codeC使用lzo
先假定你的集群已经支持lzo了,如果不支持,可以参考Cloudera中配置hadoop_lzo:
http://wzktravel.github.io/2015/12/10/hadoop-lzo/,
网上的资料一般都是flume和hadoop在同一台机器上的,这时候可以直接将hdfs.codeC
设置为lzo
,因为能找到对应的jar包和配置。
在纯净的机器上,需要下面几个配置:
- 手工编译lzo和hadoop-lzo的,直接将jar包放在plugins.d下即可。使用Cloudera安装hadoop-lzo parcel的,要将jar包和native下链接都放在plugins.d下。为何这样可以参考 hadoop-lzo.jar和hadoop-gpl-compression.jar区别:http://guoyunsky.iteye.com/blog/1289475
从hadoop集群上拉取core-site.xml放在flume/conf下,其实主要使用
1234<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.Lz4Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec</value></property>在flume配置
12a1.sinks.hdfs-sink.hdfs.fileType = CompressedStreama1.sinks.hdfs-sink.hdfs.codeC = com.hadoop.compression.lzo.LzopCodec
这里要说明一下,为什么只放置jar包不行呢?
因为在flume源码flume/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java getCodec()
方法中使用hadoop了的CompressionCodecFactory.getCodecClasses(conf)
。
再去hadoop源码中看CompressionCodecFactory.getCodecClasses(conf)
时发现
而IO_COMPRESSION_CODECS_KEY
就是io.compression.codecs
。
所以需要core-site.xml来指定io.compression.codecs
。
HA(High Availability)
由于HDFS集群的HA机制,在flume上传时指定namenode的做法就不太好了,当hdfs集群的namenode状态发生变化时,flume上报时会报出Exception, Operation category READ(WRITE) is not supported in state standby
,因为standby namenode是不对提供服务的。那么此时flume就处于不可用状态,必须手工修改配置文件然后重启flume才能解决。当要收集日志的服务器很多时,会增加很多人力成本;另外,日志上报状态监控没有做好的话,也许用到这个日志的时候才会发现flume出现问题。
解决这个问题也比较简单,就是将集群中hdfs-site.xml
复制一份到flume conf目录下即可,当namenode状态切换时,flume也能正确将日志上报到hdfs中。
此时,hdfs.path配置也可以省略域名。
定制
参考http://flume.apache.org/FlumeDeveloperGuide.html
对source,interceptor,参考flume中的源码写即可。
需要引用jar包flume-ng-core
:
我的flume上报到hdfs配置
|
|
其中HDFSInterceptor是自己实现的,主要添加几个参数到header中,使上报到hdfs中的文件名更符合我们的需求。
参考文档
- flume官网UserGuide: http://flume.apache.org/FlumeUserGuide.html
- Hadoop2.0的HA介绍: http://www.tuicool.com/articles/IBZFvy
- HDFS High Availability Using the Quorum Journal Manager:
http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html