Flume SpoolDirectorySource遇到emoj截断问题

最近在flume上报hdfs过程中遇到一些文件在中间被截断的问题,经过排查发现遇到emoj表情时会出现这种情况,如”上海👃”。下面介绍问题是如何定位并修复的。以下代码都基于org.apache.flume:flume-ng-core:1.6.0

首先看SpoolDirectorySource.java,这是整个Spooling Directory Source的入口。
SpoolDirectorySource.start片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public synchronized void start() {
...
try {
reader = new ReliableSpoolingFileEventReader.Builder()
.spoolDirectory(directory)
.completedSuffix(completedSuffix)
.ignorePattern(ignorePattern)
.trackerDirPath(trackerDirPath)
.annotateFileName(fileHeader)
.fileNameHeader(fileHeaderKey)
.annotateBaseName(basenameHeader)
.baseNameHeader(basenameHeaderKey)
.deserializerType(deserializerType)
.deserializerContext(deserializerContext)
.deletePolicy(deletePolicy)
.inputCharset(inputCharset)
.decodeErrorPolicy(decodeErrorPolicy)
.consumeOrder(consumeOrder)
.build();
} catch (IOException ioe) {
throw new FlumeException("Error instantiating spooling event parser", ioe);
}
...
}

SpoolDirectorySource内部类SpoolDirectoryRunnable.run片段

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void run() {
...
while (!Thread.interrupted()) {
List<Event> events = reader.readEvents(batchSize);
if (events.isEmpty()) {
break;
}
sourceCounter.addToEventReceivedCount(events.size());
sourceCounter.incrementAppendBatchReceivedCount();
...
}
}

可以看到调用了ReliableSpoolingFileEventReader中的readEvents方法读取文件中数据,然后去看这个方法的实现。

ReliableSpoolingFileEventReader.readEvents片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
public List<Event> readEvents(int numEvents) throws IOException {
...
while (events.isEmpty()) {
logger.info("Last read took us just up to a file boundary. Rolling to the next file, if there is one.");
retireCurrentFile();
currentFile = getNextFile();
if (!currentFile.isPresent()) {
return Collections.emptyList();
}
events = currentFile.get().getDeserializer().readEvents(numEvents);
}
...
}

此段代码上下部分主要是对状态和条件的一些判断,获取数据的主要部分在currentFile.get().getDeserializer().readEvents(numEvents);,如此得确定使用的哪一个EventDeserializer

ReliableSpoolingFileEventReader.openFile片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Opens a file for consuming
* @param file
* @return {@link #FileInfo} for the file to consume or absent option if the
* file does not exists or readable.
*/
private Optional<FileInfo> openFile(File file) {
try {
// roll the meta file, if needed
String nextPath = file.getPath();
PositionTracker tracker =
DurablePositionTracker.getInstance(metaFile, nextPath);
if (!tracker.getTarget().equals(nextPath)) {
tracker.close();
deleteMetaFile();
tracker = DurablePositionTracker.getInstance(metaFile, nextPath);
}
// sanity check
Preconditions.checkState(tracker.getTarget().equals(nextPath),
"Tracker target %s does not equal expected filename %s",
tracker.getTarget(), nextPath);
ResettableInputStream in =
new ResettableFileInputStream(file, tracker,
ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset,
decodeErrorPolicy);
EventDeserializer deserializer = EventDeserializerFactory.getInstance
(deserializerType, deserializerContext, in);
return Optional.of(new FileInfo(file, deserializer));
} catch (FileNotFoundException e) {
// File could have been deleted in the interim
logger.warn("Could not find file: " + file, e);
return Optional.absent();
} catch (IOException e) {
logger.error("Exception opening file: " + file, e);
return Optional.absent();
}
}

此方法定义了获取文件数据时使用的EventDeserializer,在SpoolDirectorySourceConfigurationConstants中看到deserializerType的默认值是LINEEventDeserializerFactory会根据此字段映射到LineDeserializer

另外此方法还定义了ResettableInputStream,这个类是后面与文件打交道的,问题也是出来这个类中。注意inputCharsetdecodeErrorPolicy两个字段,默认值分别为UTF-8FAILdecodeErrorPolicy字段定义了如果遇到解析失败的字符时应该如果处理,默认是FAIL,还可以选择IGNOREREPLACEFAIL时会抛出Exception,flume整个进程会阻塞在这,IGNORE会忽略此字符,REPLACE会用另外一个字符替代。

SpoolDirectorySourceConfigurationConstants中片段:

1
2
3
4
5
6
7
8
9
10
11
/** Deserializer to use to parse the file data into Flume Events */
public static final String DESERIALIZER = "deserializer";
public static final String DEFAULT_DESERIALIZER = "LINE";
/** Character set used when reading the input. */
public static final String INPUT_CHARSET = "inputCharset";
public static final String DEFAULT_INPUT_CHARSET = "UTF-8";
/** What to do when there is a character set decoding error. */
public static final String DECODE_ERROR_POLICY = "decodeErrorPolicy";
public static final String DEFAULT_DECODE_ERROR_POLICY = DecodeErrorPolicy.FAIL.name();

再回到readEvents方法上,只看默认的LineDeserializer中实现。

LineDeserializer.readEventsLineDeserializer.readEventLineDeserializer.readLine片段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Override
public List<Event> readEvents(int numEvents) throws IOException {
ensureOpen();
List<Event> events = Lists.newLinkedList();
for (int i = 0; i < numEvents; i++) {
Event event = readEvent();
if (event != null) {
events.add(event);
} else {
break;
}
}
return events;
}
@Override
public Event readEvent() throws IOException {
ensureOpen();
String line = readLine();
if (line == null) {
return null;
} else {
return EventBuilder.withBody(line, outputCharset);
}
}
// TODO: consider not returning a final character that is a high surrogate
// when truncating
private String readLine() throws IOException {
StringBuilder sb = new StringBuilder();
int c;
int readChars = 0;
while ((c = in.readChar()) != -1) {
readChars++;
// FIXME: support \r\n
if (c == '\n') {
break;
}
sb.append((char)c);
if (readChars >= maxLineLength) {
logger.warn("Line length exceeds max ({}), truncating line!",
maxLineLength);
break;
}
}
if (readChars > 0) {
return sb.toString();
} else {
return null;
}
}

上面的readEventsreadEvent都比较简单,最终是调用了readLine方法。readline方法中的in对象就是前面说的ResettableInputStream,当in.readChar()返回的是-1\n就认为此行读取完毕。只能再看ResettableInputStream.readChar方法实现。

ResettableInputStream.readChar片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
public synchronized int readChar() throws IOException {
// The decoder can have issues with multi-byte characters.
// This check ensures that there are at least maxCharWidth bytes in the buffer
// before reaching EOF.
if (buf.remaining() < maxCharWidth) {
buf.clear();
buf.flip();
refillBuf();
}
int start = buf.position();
charBuf.clear();
boolean isEndOfInput = false;
if (position >= fileSize) {
isEndOfInput = true;
}
CoderResult res = decoder.decode(buf, charBuf, isEndOfInput); // ------- 1
if (res.isMalformed() || res.isUnmappable()) { // ------- 2
res.throwException();
}
int delta = buf.position() - start;
charBuf.flip(); // ------- 3
if (charBuf.hasRemaining()) { // ------- 4
char c = charBuf.get();
// don't increment the persisted location if we are in between a
// surrogate pair, otherwise we may never recover if we seek() to this
// location!
incrPosition(delta, !Character.isHighSurrogate(c));
return c;
// there may be a partial character in the decoder buffer
} else {
incrPosition(delta, false);
return -1;
}
}

看此方法第一行注释,The decoder can have issues with multi-byte characters.,说明此方法的开发人员也知道这个方法的问题所在。当字符是多个字符宽度时,decoder解析会出问题。
前面说的在new ResettableInputStream时会指定inputCharset参数,此参数就决定了使用哪一个decoder,默认是
UTF_8,此处指的是UTF_8 class

先整体说一下,具体的decode过程可以自行研究。
假定此行中存在上海👃你好,当读完读取下一个char时,也就是上面1所在位置时,会发生overflow,即res.isOverflowtrue,但在2的位置没有对overflow进行判断,继续往下走,经过3处的flip之后,charBuf变成空,4时走else逻辑,返回-1,上层class会认为此文件已经读取完毕,导致文件后面数据丢失,并且不报Exception。

其实在flume-1.7.0中此问题就已经解决,可以参考https://github.com/apache/flume/commit/344e0accae5675fd3d14b8414531528607865aae#diff-2abdc3f807bae84ca2b41578c4e66ca7

主要改进地方:

  1. 修改前的maxCharWidth = (int)Math.ceil(charset.newEncoder().maxBytesPerChar());,charset为”utf8”时值为”3”,修改后会对”UTF-8”,”UTF-16”,”UTF-32”进行不同设置。
  2. charbuf大小也由之前single char改为two chars
  3. 修改后,不仅对charbuf判断,还会对buf进行判断。当charBuf.hasRemainingtrue时,可以断定是个single char,直接返回;当为false时,判断buf.hasRemaining()true时,将这两个char分别放在highSurrogatelowSurrogate中,此次返回highSurrogate,并设置hasLowSurrogate为true。当下次调用此方法时,先判断hasLowSurrogat,如果为true,直接返回lowSurrogate,并将hasLowSurrogat恢复为false。如此一来,这两个char全都读取完毕。