mongoDB的spark connector使用说明,mongo版本为2.6.12,spark版本为1.6.0。官网为https://docs.mongodb.com/spark-connector/。
Getting Started
https://docs.mongodb.com/spark-connector/getting-started/
启动spark-shell
1 2 3
| spark-shell --conf "spark.mongodb.input.uri=mongodb://192.168.2.12/fs-form.form?readPreference=primaryPreferred" \ --conf "spark.mongodb.output.uri=mongodb://192.168.2.12/mydb.test4" \ --packages org.mongodb.spark:mongo-spark-connector_2.10:1.1.0
|
--packages
会自动下载依赖包
向Mongo中写入数据
参考https://docs.mongodb.com/spark-connector/getting-started/#write-to-mongodb
1 2 3 4 5
| scala> import com.mongodb.spark._ scala> import com.mongodb.spark.config._ scala> import org.bson.Document scala> val sparkDocuments = sc.parallelize((1 to 10).map(i => Document.parse(s"{spark: $i}"))) scala> sparkDocuments.saveToMongoDB(WriteConfig(Map("uri" -> "mongodb://192.168.2.12/mydb.test4")))
|
保存时也可以使用另外一种形式
1 2
| scala> val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" -> "majority"), Some(WriteConfig(sc))) scala> MongoSpark.save(sparkDocuments, writeConfig)
|
查看mongo中数据:
1 2 3 4 5 6 7 8 9 10 11
| StandAlone:PRIMARY> db.test4.find() { "_id" : ObjectId("58086a308ad6b14f7ab0584e"), "spark" : 6 } { "_id" : ObjectId("58086a308ad6b14f7ab0584f"), "spark" : 7 } { "_id" : ObjectId("58086a308ad6b14f64d99533"), "spark" : 1 } { "_id" : ObjectId("58086a308ad6b14f7ab05850"), "spark" : 8 } { "_id" : ObjectId("58086a308ad6b14f64d99534"), "spark" : 2 } { "_id" : ObjectId("58086a308ad6b14f7ab05851"), "spark" : 9 } { "_id" : ObjectId("58086a308ad6b14f64d99535"), "spark" : 3 } { "_id" : ObjectId("58086a308ad6b14f7ab05852"), "spark" : 10 } { "_id" : ObjectId("58086a308ad6b14f64d99536"), "spark" : 4 } { "_id" : ObjectId("58086a308ad6b14f64d99537"), "spark" : 5 }
|
从Mongo中读取并分析数据
如果mongo版本低于3.2,直接使用MongoSpark.load(sc)
会报错,ERROR partitioner.DefaultMongoPartitioner
。需要使用ReadConfig,指定partitioner
参数。
1 2 3 4 5
| scala> val readConfig = ReadConfig(Map("database" -> "mydb", "collection" -> "test4", "partitioner" -> "MongoShardedPartitioner"), Some(ReadConfig(sc))) readConfig: com.mongodb.spark.config.ReadConfig = ReadConfig(mydb,test4,Some(mongodb: scala> val customRdd = MongoSpark.load(sc, readConfig) scala> println(customRdd.first.toJson) { "_id" : { "$oid" : "58086a308ad6b14f64d99533" }, "spark" : 1 }
|
或直接使用sc.loadFromMongoDB(ReadConfig)
。
1 2 3
| scala> val rdd = sc.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://192.168.2.12/mydb.test4", "partitioner" -> "MongoShardedPartitioner"))) scala> println(rdd.first.toJson) { "_id" : { "$oid" : "58086a308ad6b14f64d99533" }, "spark" : 1 }
|
Spark SQL
https://docs.mongodb.com/spark-connector/spark-sql/
我们使用的spark是1.6.0,而SparkSession是Spark2.0引入的,所以不能参考master的文件,可以参考之前的版本https://github.com/mongodb/mongo-spark/blob/1.x/examples/src/test/scala/tour/SparkSQL.scala
使用DataFrame
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| scala> import org.apache.spark.sql.SQLContext scala> import com.mongodb.spark._ scala> import com.mongodb.spark.config._ scala> import com.mongodb.spark.sql._ scala> import org.bson.Document scala> val sqlContext = SQLContext.getOrCreate(sc) scala> val df = sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://192.168.2.12/mydb.test4", "partitioner" -> "MongoShardedPartitioner"))) scala> df.filter(df("age") < 100).show() +--------------------+---+-------------+ | _id|age| name| +--------------------+---+-------------+ |[58087ec39a1ca7ec...| 50|Bilbo Baggins| |[58087ec39a1ca7ec...| 77| Kíli| |[58087ec49a1ca7ec...| 82| Fíli| +--------------------+---+-------------+
|
显式使用DataFrame和DataSet
直接从mongo读取时装载为DataFrame
1 2 3 4 5 6 7 8
| scala> case class Character(name: String, age: Int) scala> val sqlContext = SQLContext.getOrCreate(sc) scala> val df = sqlContext.loadFromMongoDB[Character](ReadConfig(Map("uri" -> "mongodb://192.168.2.12/mydb.test4", "partitioner" -> "MongoShardedPartitioner"))) scala> df.printSchema() root |-- name: string (nullable = true) |-- age: integer (nullable = false) scala> val dataset = df.as[Character]
|
或从RDD转换为DataFrame或DataSet
1 2 3 4
| scala> val rdd = sc.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://192.168.2.12/mydb.test4", "partitioner" -> "MongoShardedPartitioner"))) scala> val dfInferredSchema = rdd.toDF() scala> val dfExplicitSchema = rdd.toDF[Character]() scala> val ds = rdd.toDS[Character]()
|
使用Spark SQL
1 2 3 4 5 6 7 8 9 10 11
| scala> val sqlContext = SQLContext.getOrCreate(sc) scala> val df = sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://192.168.2.12/mydb.test4", "partitioner" -> "MongoShardedPartitioner"))) scala> df.registerTempTable("characters") scala> sqlContext.sql("SELECT * FROM test6 WHERE age >= 100 and age < 170").show() +------+---+ | name|age| +------+---+ |Dwalin|169| | Óin|167| | Glóin|158| +------+---+
|
Exceptions
ERROR partitioner.DefaultMongoPartitioner
mongodb版本低于3.2时,读取数据时如果不指定ReadConfig中partitioner
,会使用默认的DefaultMongoPartitioner
,这时候会报错。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| 16/10/20 15:12:39 ERROR partitioner.DefaultMongoPartitioner: ---------------------------------------- WARNING: MongoDB version < 3.2 detected. ---------------------------------------- With legacy MongoDB installations you will need to explicitly configure the Spark Connector with a partitioner. This can be done by: * Setting a "spark.mongodb.input.partitioner" in SparkConf. * Setting in the "partitioner" parameter in ReadConfig. * Passing the "partitioner" option to the DataFrameReader. The following Partitioners are available: * MongoShardedPartitioner - for sharded clusters, requires read access to the config database. * MongoSplitVectorPartitioner - for single nodes or replicaSets. Utilises the SplitVector command on the primary. * MongoPaginateByCountPartitioner - creates a specific number of partitions. Slow as requires a query for every partition. * MongoPaginateBySizePartitioner - creates partitions based on data size. Slow as requires a query for every partition.
|
错误信息给出的提示比较明显,DefaultMongoPartitioner
在3.2才会存在,需要指定partitioner
为其他几种。
安装mongo
参考https://docs.mongodb.com/manual/tutorial/install-mongodb-on-red-hat/
添加mongo2.6 repo
1 2 3 4 5 6
| cat > /etc/yum.repos.d/mongodb-org-2.6.repo [mongodb-org-2.6] name=MongoDB 2.6 Repository baseurl=http://downloads-distro.mongodb.org/repo/redhat/os/x86_64/ gpgcheck=0 enabled=1
|
进行安装
安装所有包
1
| sudo yum install -y mongodb-org
|
只安装shell
1
| sudo yum install -y mongodb-org-shell
|
参考
- mongo shell常用命令
- MongoDB Connector for Spark
- RDD, DataFrame, DataSet区别