这几天遇到一个问题,hive在执行join操作时报错,return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
,又没有特别有用的信息,经过一番分析和搜索找到解决方法。在这里记录一下。
问题
在做大小表的join时出现的,错误信息如下。
|
|
解决
首先注意到是MapredLocalTask
,开启了本地模式。那关闭本地模式试一下,
|
|
结果还是报相同错误,只能从其他方面考虑。
在默认情况下,hive的join策略是进行reduce side join,但是当两个表中有一个是小表的时候,就会使用map join,把其中较小的一个表复制到所有节点,这样另一个表在每个节点上面的分片就可以跟这个完整的表join。
分析这个问题,job使用map join时可能造成内存溢出,关闭其自动装换尝试一下。
|
|
果然ok了。
知识
hive本地模式
0.7版本后Hive开始支持任务执行选择本地模式(local mode)。大多数的Hadoop job是需要hadoop提供的完整的可扩展性来处理大数据的。不过,有时hive的输入数据量是非常小的。在这种情况下,为查询出发执行任务的时间消耗可能会比实际job的执行时间要多的多。对于大多数这种情况,hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间会明显被缩短。
如此一来,对数据量比较小的操作,就可以在本地执行,这样要比提交任务到集群执行效率要快很多。
开启hive本地模式需要主动配置如下参数:
另外,开启本地模式后,也需要job满足一定条件才能真正使用本地模式:
- job的输入数据大小必须小于参数:
hive.exec.mode.local.auto.inputbytes.max
,默认128MB - job的map数必须小于参数:
hive.exec.mode.local.auto.tasks.max
,默认为4 - job的reduce数必须为0或者1
hive join算法
处理分布式join,一般有两种方法:
- replication join:把其中一个表复制到所有节点,这样另一个表在每个节点上面的分片就可以跟这个完整的表join了;在M/R job中对应map side join。
- repartition join:把两份数据按照join key进行hash重分布,让每个节点处理hash值相同的join key数据,也就是做局部的join,在M/R job中对应reduce side join。
reduce side join
在默认情况下,hive的join策略是进行reduce side join。
map side join
当大小表进行join时,可以考虑map side join,因为小表复制的代价会好过大表Shuffle的代价。使用map side join,有两种方式:
- 直接在sql中写hint,语法是
/*+MapJOIN (tbl)*/
,其中tbl就是你想要做replication的表。 - 设置
hive.auto.convert.join = true
,这样Hive会自动判断当前的join操作是否合适做map join,主要是找join的两个表中有没有小表。至于多大的表算小表,则是由hive.smalltable.filesize
决定,默认25MB。
bucket map join
当没有一个表足够小到能够放进内存,但是还是想用map join怎么办?这个时候就要用到bucket map join
。其方法是两个join表在join key上都做hash bucket,并且把你打算复制的那个(相对)小表的bucket数设置为大表的倍数。这样数据就会按照join key做hash bucket。小表依然复制到所有节点,Map join的时候,小表的每一组bucket加载成hashtable,与对应的一个大表bucket做局部join,这样每次只需要加载部分hashtable就可以了。
sort merge bucket map join
在两个表的join key都具有唯一性的时候(也就是可做主键),还可以进一步做sort merge bucket map join。做法还是两边要做hash bucket,而且每个bucket内部要进行排序。这样一来当两边bucket要做局部join的时候,只需要用类似merge Sort算法中的merge操作一样把两个bucket顺序遍历一遍即可完成,这样甚至都不用把一个bucket完整的加载成hashtable,这对性能的提升会有很大帮助。