mongo-spark-connector使用说明

mongoDB的spark connector使用说明,mongo版本为2.6.12,spark版本为1.6.0。官网为https://docs.mongodb.com/spark-connector/

Getting Started

https://docs.mongodb.com/spark-connector/getting-started/

启动spark-shell

1
2
3
spark-shell --conf "spark.mongodb.input.uri=mongodb://192.168.2.12/fs-form.form?readPreference=primaryPreferred" \
--conf "spark.mongodb.output.uri=mongodb://192.168.2.12/mydb.test4" \
--packages org.mongodb.spark:mongo-spark-connector_2.10:1.1.0

--packages会自动下载依赖包

向Mongo中写入数据

参考https://docs.mongodb.com/spark-connector/getting-started/#write-to-mongodb

1
2
3
4
5
scala> import com.mongodb.spark._ //import MongoDB connector package
scala> import com.mongodb.spark.config._
scala> import org.bson.Document //import bson
scala> val sparkDocuments = sc.parallelize((1 to 10).map(i => Document.parse(s"{spark: $i}")))
scala> sparkDocuments.saveToMongoDB(WriteConfig(Map("uri" -> "mongodb://192.168.2.12/mydb.test4"))) // Uses the WriteConfig

保存时也可以使用另外一种形式

1
2
scala> val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" -> "majority"), Some(WriteConfig(sc)))
scala> MongoSpark.save(sparkDocuments, writeConfig)

查看mongo中数据:

1
2
3
4
5
6
7
8
9
10
11
StandAlone:PRIMARY> db.test4.find()
{ "_id" : ObjectId("58086a308ad6b14f7ab0584e"), "spark" : 6 }
{ "_id" : ObjectId("58086a308ad6b14f7ab0584f"), "spark" : 7 }
{ "_id" : ObjectId("58086a308ad6b14f64d99533"), "spark" : 1 }
{ "_id" : ObjectId("58086a308ad6b14f7ab05850"), "spark" : 8 }
{ "_id" : ObjectId("58086a308ad6b14f64d99534"), "spark" : 2 }
{ "_id" : ObjectId("58086a308ad6b14f7ab05851"), "spark" : 9 }
{ "_id" : ObjectId("58086a308ad6b14f64d99535"), "spark" : 3 }
{ "_id" : ObjectId("58086a308ad6b14f7ab05852"), "spark" : 10 }
{ "_id" : ObjectId("58086a308ad6b14f64d99536"), "spark" : 4 }
{ "_id" : ObjectId("58086a308ad6b14f64d99537"), "spark" : 5 }

从Mongo中读取并分析数据

如果mongo版本低于3.2,直接使用MongoSpark.load(sc)会报错,ERROR partitioner.DefaultMongoPartitioner。需要使用ReadConfig,指定partitioner参数。

1
2
3
4
5
scala> val readConfig = ReadConfig(Map("database" -> "mydb", "collection" -> "test4", "partitioner" -> "MongoShardedPartitioner"), Some(ReadConfig(sc)))
readConfig: com.mongodb.spark.config.ReadConfig = ReadConfig(mydb,test4,Some(mongodb://192.168.2.12/fs-form.form?readPreference=primaryPreferred),1000,com.mongodb.spark.rdd.partitioner.MongoShardedPartitioner@368e1bf3,Map(),15,ReadPreferenceConfig(primaryPreferred,None),ReadConcernConfig(None),false)
scala> val customRdd = MongoSpark.load(sc, readConfig)
scala> println(customRdd.first.toJson)
{ "_id" : { "$oid" : "58086a308ad6b14f64d99533" }, "spark" : 1 }

或直接使用sc.loadFromMongoDB(ReadConfig)

1
2
3
scala> val rdd = sc.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://192.168.2.12/mydb.test4", "partitioner" -> "MongoShardedPartitioner")))
scala> println(rdd.first.toJson)
{ "_id" : { "$oid" : "58086a308ad6b14f64d99533" }, "spark" : 1 }

Spark SQL

https://docs.mongodb.com/spark-connector/spark-sql/

我们使用的spark是1.6.0,而SparkSession是Spark2.0引入的,所以不能参考master的文件,可以参考之前的版本https://github.com/mongodb/mongo-spark/blob/1.x/examples/src/test/scala/tour/SparkSQL.scala

使用DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala> import org.apache.spark.sql.SQLContext
scala> import com.mongodb.spark._
scala> import com.mongodb.spark.config._
scala> import com.mongodb.spark.sql._
scala> import org.bson.Document
scala> val sqlContext = SQLContext.getOrCreate(sc)
scala> val df = sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://192.168.2.12/mydb.test4", "partitioner" -> "MongoShardedPartitioner")))
scala> df.filter(df("age") < 100).show()
+--------------------+---+-------------+
| _id|age| name|
+--------------------+---+-------------+
|[58087ec39a1ca7ec...| 50|Bilbo Baggins|
|[58087ec39a1ca7ec...| 77| Kíli|
|[58087ec49a1ca7ec...| 82| Fíli|
+--------------------+---+-------------+

显式使用DataFrame和DataSet

直接从mongo读取时装载为DataFrame

1
2
3
4
5
6
7
8
scala> case class Character(name: String, age: Int)
scala> val sqlContext = SQLContext.getOrCreate(sc)
scala> val df = sqlContext.loadFromMongoDB[Character](ReadConfig(Map("uri" -> "mongodb://192.168.2.12/mydb.test4", "partitioner" -> "MongoShardedPartitioner")))
scala> df.printSchema()
root
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
scala> val dataset = df.as[Character]

或从RDD转换为DataFrame或DataSet

1
2
3
4
scala> val rdd = sc.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://192.168.2.12/mydb.test4", "partitioner" -> "MongoShardedPartitioner")))
scala> val dfInferredSchema = rdd.toDF()
scala> val dfExplicitSchema = rdd.toDF[Character]()
scala> val ds = rdd.toDS[Character]()

使用Spark SQL

1
2
3
4
5
6
7
8
9
10
11
scala> val sqlContext = SQLContext.getOrCreate(sc)
scala> val df = sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://192.168.2.12/mydb.test4", "partitioner" -> "MongoShardedPartitioner")))
scala> df.registerTempTable("characters")
scala> sqlContext.sql("SELECT * FROM test6 WHERE age >= 100 and age < 170").show()
+------+---+
| name|age|
+------+---+
|Dwalin|169|
| Óin|167|
| Glóin|158|
+------+---+

Exceptions

ERROR partitioner.DefaultMongoPartitioner

mongodb版本低于3.2时,读取数据时如果不指定ReadConfig中partitioner,会使用默认的DefaultMongoPartitioner,这时候会报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
16/10/20 15:12:39 ERROR partitioner.DefaultMongoPartitioner:
----------------------------------------
WARNING: MongoDB version < 3.2 detected.
----------------------------------------
With legacy MongoDB installations you will need to explicitly configure the Spark Connector with a partitioner.
This can be done by:
* Setting a "spark.mongodb.input.partitioner" in SparkConf.
* Setting in the "partitioner" parameter in ReadConfig.
* Passing the "partitioner" option to the DataFrameReader.
The following Partitioners are available:
* MongoShardedPartitioner - for sharded clusters, requires read access to the config database.
* MongoSplitVectorPartitioner - for single nodes or replicaSets. Utilises the SplitVector command on the primary.
* MongoPaginateByCountPartitioner - creates a specific number of partitions. Slow as requires a query for every partition.
* MongoPaginateBySizePartitioner - creates partitions based on data size. Slow as requires a query for every partition.

错误信息给出的提示比较明显,DefaultMongoPartitioner在3.2才会存在,需要指定partitioner为其他几种。

安装mongo

参考https://docs.mongodb.com/manual/tutorial/install-mongodb-on-red-hat/

添加mongo2.6 repo

1
2
3
4
5
6
cat > /etc/yum.repos.d/mongodb-org-2.6.repo
[mongodb-org-2.6]
name=MongoDB 2.6 Repository
baseurl=http://downloads-distro.mongodb.org/repo/redhat/os/x86_64/
gpgcheck=0
enabled=1

进行安装

  • 安装所有包

    1
    sudo yum install -y mongodb-org
  • 只安装shell

    1
    sudo yum install -y mongodb-org-shell

参考

  1. mongo shell常用命令
  2. MongoDB Connector for Spark
  3. RDD, DataFrame, DataSet区别