spark自定义分区策略(partitioner)

Spark内部提供了HashPartitionerRangePartitioner两种分区策略,但有些场景下,我们希望能够根据业务需求自定义分区策略。只需要继承Partitioner,然后实现其方法即可。

1
2
3
4
5
6
7
8
/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
*/
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}

现在遇到一个场景,从数据库同步数据到kafka中,然后使用spark从kafka获取数据进行处理。由于根据数据库操作记录进行同步,所以在数据库中同一条数据可能被更新或删除多次,这要求我们需要将相同主键的数据分区到同一个区中进行处理。

也就是说,我们需要的分区策略是,从数据中找出主键,将其分配到某一个分区中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class PrimaryKeyPartitioner(partitions: Int) extends Partitioner {
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = key match {
case null => 0
case _ =>
try {
val json = JSON.parseObject(key.toString)
val primaryKeyString = json.getString("primary_keys")
.split(",")
.map(k => json.getString(k))
.mkString("_")
Utils.nonNegativeMod(primaryKeyString.hashCode, numPartitions)
} catch {
case e: Exception =>
e.printStackTrace()
0
}
}
}

使用非常简单

1
2
3
4
5
rdd.partitionBy(new PrimaryKeyPartitioner(numPartitions)).foreachPartition(
par => {
// 处理逻辑
}
)

需要注意的是,使用partitionBy方法的rdd类型需要是kv元组,从源码中可以看出。

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Return a copy of the RDD partitioned using the specified partitioner.
*/
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}

所以如果是从文件中读取数据,需要将其转换为元组才能使用partitionBy方法。

1
val rdd: RDD[(String, String)] = sc.textFile("sss.log").map((_, ""))