最近在flume上报hdfs过程中遇到一些文件在中间被截断的问题,经过排查发现遇到emoj表情时会出现这种情况,如”上海👃”。下面介绍问题是如何定位并修复的。以下代码都基于org.apache.flume:flume-ng-core:1.6.0
。
首先看SpoolDirectorySource.java
,这是整个Spooling Directory Source
的入口。SpoolDirectorySource.start
片段:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public synchronized void start () {
...
try {
reader = new ReliableSpoolingFileEventReader.Builder()
.spoolDirectory(directory)
.completedSuffix(completedSuffix)
.ignorePattern(ignorePattern)
.trackerDirPath(trackerDirPath)
.annotateFileName(fileHeader)
.fileNameHeader(fileHeaderKey)
.annotateBaseName(basenameHeader)
.baseNameHeader(basenameHeaderKey)
.deserializerType(deserializerType)
.deserializerContext(deserializerContext)
.deletePolicy(deletePolicy)
.inputCharset(inputCharset)
.decodeErrorPolicy(decodeErrorPolicy)
.consumeOrder(consumeOrder)
.build();
} catch (IOException ioe) {
throw new FlumeException("Error instantiating spooling event parser" , ioe);
}
...
}
和SpoolDirectorySource
内部类SpoolDirectoryRunnable.run
片段1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void run () {
...
while (!Thread.interrupted()) {
List<Event> events = reader.readEvents(batchSize);
if (events.isEmpty()) {
break ;
}
sourceCounter.addToEventReceivedCount(events.size());
sourceCounter.incrementAppendBatchReceivedCount();
...
}
}
可以看到调用了ReliableSpoolingFileEventReader
中的readEvents
方法读取文件中数据,然后去看这个方法的实现。
ReliableSpoolingFileEventReader.readEvents
片段:1
2
3
4
5
6
7
8
9
10
11
12
13
public List<Event> readEvents (int numEvents) throws IOException {
...
while (events.isEmpty()) {
logger.info("Last read took us just up to a file boundary. Rolling to the next file, if there is one." );
retireCurrentFile();
currentFile = getNextFile();
if (!currentFile.isPresent()) {
return Collections.emptyList();
}
events = currentFile.get().getDeserializer().readEvents(numEvents);
}
...
}
此段代码上下部分主要是对状态和条件的一些判断,获取数据的主要部分在currentFile.get().getDeserializer().readEvents(numEvents);
,如此得确定使用的哪一个EventDeserializer
。
ReliableSpoolingFileEventReader.openFile
片段:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
* Opens a file for consuming
* @param file
* @return {@link #FileInfo} for the file to consume or absent option if the
* file does not exists or readable.
*/
private Optional<FileInfo> openFile (File file) {
try {
String nextPath = file.getPath();
PositionTracker tracker =
DurablePositionTracker.getInstance(metaFile, nextPath);
if (!tracker.getTarget().equals(nextPath)) {
tracker.close();
deleteMetaFile();
tracker = DurablePositionTracker.getInstance(metaFile, nextPath);
}
Preconditions.checkState(tracker.getTarget().equals(nextPath),
"Tracker target %s does not equal expected filename %s" ,
tracker.getTarget(), nextPath);
ResettableInputStream in =
new ResettableFileInputStream(file, tracker,
ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset,
decodeErrorPolicy);
EventDeserializer deserializer = EventDeserializerFactory.getInstance
(deserializerType, deserializerContext, in);
return Optional.of(new FileInfo(file, deserializer));
} catch (FileNotFoundException e) {
logger.warn("Could not find file: " + file, e);
return Optional.absent();
} catch (IOException e) {
logger.error("Exception opening file: " + file, e);
return Optional.absent();
}
}
此方法定义了获取文件数据时使用的EventDeserializer
,在SpoolDirectorySourceConfigurationConstants
中看到deserializerType
的默认值是LINE
,EventDeserializerFactory
会根据此字段映射到LineDeserializer
。
另外此方法还定义了ResettableInputStream
,这个类是后面与文件打交道的,问题也是出来这个类中。注意inputCharset
和decodeErrorPolicy
两个字段,默认值分别为UTF-8
和FAIL
。decodeErrorPolicy
字段定义了如果遇到解析失败的字符时应该如果处理,默认是FAIL
,还可以选择IGNORE
和REPLACE
。FAIL
时会抛出Exception,flume整个进程会阻塞在这,IGNORE
会忽略此字符,REPLACE
会用另外一个字符替代。
SpoolDirectorySourceConfigurationConstants
中片段:1
2
3
4
5
6
7
8
9
10
11
public static final String DESERIALIZER = "deserializer" ;
public static final String DEFAULT_DESERIALIZER = "LINE" ;
public static final String INPUT_CHARSET = "inputCharset" ;
public static final String DEFAULT_INPUT_CHARSET = "UTF-8" ;
public static final String DECODE_ERROR_POLICY = "decodeErrorPolicy" ;
public static final String DEFAULT_DECODE_ERROR_POLICY = DecodeErrorPolicy.FAIL.name();
再回到readEvents
方法上,只看默认的LineDeserializer
中实现。
LineDeserializer.readEvents
和LineDeserializer.readEvent
和LineDeserializer.readLine
片段1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Override
public List<Event> readEvents (int numEvents) throws IOException {
ensureOpen();
List<Event> events = Lists.newLinkedList();
for (int i = 0 ; i < numEvents; i++) {
Event event = readEvent();
if (event != null ) {
events.add(event);
} else {
break ;
}
}
return events;
}
@Override
public Event readEvent () throws IOException {
ensureOpen();
String line = readLine();
if (line == null ) {
return null ;
} else {
return EventBuilder.withBody(line, outputCharset);
}
}
private String readLine () throws IOException {
StringBuilder sb = new StringBuilder();
int c;
int readChars = 0 ;
while ((c = in.readChar()) != -1 ) {
readChars++;
if (c == '\n' ) {
break ;
}
sb.append((char )c);
if (readChars >= maxLineLength) {
logger.warn("Line length exceeds max ({}), truncating line!" ,
maxLineLength);
break ;
}
}
if (readChars > 0 ) {
return sb.toString();
} else {
return null ;
}
}
上面的readEvents
和readEvent
都比较简单,最终是调用了readLine
方法。readline
方法中的in
对象就是前面说的ResettableInputStream
,当in.readChar()
返回的是-1
或\n
就认为此行读取完毕。只能再看ResettableInputStream.readChar
方法实现。
ResettableInputStream.readChar
片段:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
public synchronized int readChar () throws IOException {
if (buf.remaining() < maxCharWidth) {
buf.clear();
buf.flip();
refillBuf();
}
int start = buf.position();
charBuf.clear();
boolean isEndOfInput = false ;
if (position >= fileSize) {
isEndOfInput = true ;
}
CoderResult res = decoder.decode(buf, charBuf, isEndOfInput);
if (res.isMalformed() || res.isUnmappable()) {
res.throwException();
}
int delta = buf.position() - start;
charBuf.flip();
if (charBuf.hasRemaining()) {
char c = charBuf.get();
incrPosition(delta, !Character.isHighSurrogate(c));
return c;
} else {
incrPosition(delta, false );
return -1 ;
}
}
看此方法第一行注释,The decoder can have issues with multi-byte characters.
,说明此方法的开发人员也知道这个方法的问题所在。当字符是多个字符宽度时,decoder解析会出问题。 前面说的在new ResettableInputStream
时会指定inputCharset
参数,此参数就决定了使用哪一个decoder
,默认是UTF_8
,此处指的是UTF_8 class
。
先整体说一下,具体的decode
过程可以自行研究。 假定此行中存在上海👃你好
,当读完海
读取下一个char时,也就是上面1
所在位置时,会发生overflow
,即res.isOverflow
是true
,但在2
的位置没有对overflow
进行判断,继续往下走,经过3
处的flip
之后,charBuf
变成空,4
时走else
逻辑,返回-1
,上层class会认为此文件已经读取完毕,导致文件后面数据丢失,并且不报Exception。
其实在flume-1.7.0中此问题就已经解决,可以参考https://github.com/apache/flume/commit/344e0accae5675fd3d14b8414531528607865aae#diff-2abdc3f807bae84ca2b41578c4e66ca7
主要改进地方:
修改前的maxCharWidth = (int)Math.ceil(charset.newEncoder().maxBytesPerChar());
,charset为”utf8”时值为”3”,修改后会对”UTF-8”,”UTF-16”,”UTF-32”进行不同设置。
charbuf
大小也由之前single char
改为two chars
。
修改后,不仅对charbuf
判断,还会对buf
进行判断。当charBuf.hasRemaining
为true
时,可以断定是个single char
,直接返回;当为false
时,判断buf.hasRemaining()
为true
时,将这两个char分别放在highSurrogate
和lowSurrogate
中,此次返回highSurrogate
,并设置hasLowSurrogate
为true。当下次调用此方法时,先判断hasLowSurrogat
,如果为true
,直接返回lowSurrogate
,并将hasLowSurrogat
恢复为false
。如此一来,这两个char全都读取完毕。