hive join遇到的问题及解决方法

这几天遇到一个问题,hive在执行join操作时报错,return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask,又没有特别有用的信息,经过一番分析和搜索找到解决方法。在这里记录一下。

问题

在做大小表的join时出现的,错误信息如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
org.apache.hive.service.cli.HiveSQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
at org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:374)
at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:180)
at org.apache.hive.service.cli.operation.SQLOperation.access$100(SQLOperation.java:72)
at org.apache.hive.service.cli.operation.SQLOperation$2$1.run(SQLOperation.java:232)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
at org.apache.hive.service.cli.operation.SQLOperation$2.run(SQLOperation.java:245)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

解决

首先注意到是MapredLocalTask,开启了本地模式。那关闭本地模式试一下,

1
set hive.exec.mode.local.auto=false;

结果还是报相同错误,只能从其他方面考虑。

在默认情况下,hive的join策略是进行reduce side join,但是当两个表中有一个是小表的时候,就会使用map join,把其中较小的一个表复制到所有节点,这样另一个表在每个节点上面的分片就可以跟这个完整的表join。
分析这个问题,job使用map join时可能造成内存溢出,关闭其自动装换尝试一下。

1
set hive.auto.convert.join = false;

果然ok了。

知识

hive本地模式

0.7版本后Hive开始支持任务执行选择本地模式(local mode)。大多数的Hadoop job是需要hadoop提供的完整的可扩展性来处理大数据的。不过,有时hive的输入数据量是非常小的。在这种情况下,为查询出发执行任务的时间消耗可能会比实际job的执行时间要多的多。对于大多数这种情况,hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间会明显被缩短。
如此一来,对数据量比较小的操作,就可以在本地执行,这样要比提交任务到集群执行效率要快很多。

开启hive本地模式需要主动配置如下参数:

1
hive> set hive.exec.mode.local.auto=true; ## 默认为false

另外,开启本地模式后,也需要job满足一定条件才能真正使用本地模式:

  1. job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max,默认128MB
  2. job的map数必须小于参数:hive.exec.mode.local.auto.tasks.max,默认为4
  3. job的reduce数必须为0或者1

hive join算法

处理分布式join,一般有两种方法:

  • replication join:把其中一个表复制到所有节点,这样另一个表在每个节点上面的分片就可以跟这个完整的表join了;在M/R job中对应map side join。
  • repartition join:把两份数据按照join key进行hash重分布,让每个节点处理hash值相同的join key数据,也就是做局部的join,在M/R job中对应reduce side join。

reduce side join

在默认情况下,hive的join策略是进行reduce side join。

map side join

当大小表进行join时,可以考虑map side join,因为小表复制的代价会好过大表Shuffle的代价。使用map side join,有两种方式:

  1. 直接在sql中写hint,语法是/*+MapJOIN (tbl)*/,其中tbl就是你想要做replication的表。
  2. 设置hive.auto.convert.join = true,这样Hive会自动判断当前的join操作是否合适做map join,主要是找join的两个表中有没有小表。至于多大的表算小表,则是由hive.smalltable.filesize决定,默认25MB。

bucket map join

当没有一个表足够小到能够放进内存,但是还是想用map join怎么办?这个时候就要用到bucket map join。其方法是两个join表在join key上都做hash bucket,并且把你打算复制的那个(相对)小表的bucket数设置为大表的倍数。这样数据就会按照join key做hash bucket。小表依然复制到所有节点,Map join的时候,小表的每一组bucket加载成hashtable,与对应的一个大表bucket做局部join,这样每次只需要加载部分hashtable就可以了。

sort merge bucket map join

在两个表的join key都具有唯一性的时候(也就是可做主键),还可以进一步做sort merge bucket map join。做法还是两边要做hash bucket,而且每个bucket内部要进行排序。这样一来当两边bucket要做局部join的时候,只需要用类似merge Sort算法中的merge操作一样把两个bucket顺序遍历一遍即可完成,这样甚至都不用把一个bucket完整的加载成hashtable,这对性能的提升会有很大帮助。

参考

  1. hive join遇到问题
  2. 深入浅出数据仓库中SQL性能优化之Hive篇
  3. Hive Setting调优