Spark-jobserver 提供了一个 RESTful 接口来提交和管理 spark 的 jobs、jars 和 job contexts。在原项目基础上做了一些本地化和优化工作。
- 将spark-jobserver中akka版本降级到CDH5.7中akka版本。
- spark-jobserver中joda-time版本(2.9.3)与CDH5.7中joda-time版本(1.6)版本冲突,运行时会出现java.lang.NoSuchMethodError: org.joda.time.DateTime.now()异常,将spark-jobserver中joda-time版本降级到1.6。
- 添加使用mysql作为数据库。
- 修复提交job后无法查看job信息和结果的问题,参见github issue#516。
介绍
git地址
- github地址: https://github.com/spark-jobserver/spark-jobserver
- 添加cdh支持github地址: https://github.com/bjoernlohrmann/spark-jobserver
- 优化后的github地址: https://github.com/wzktravel/spark-jobserver
特性
- “Spark as Service”:针对 job 和 contexts 的各个方面提供了 REST 风格的 api 接口进行管理
- 支持 SparkSQL、Hive、Streaming Contexts/jobs 以及定制 job contexts!具体参考Contexts
- 通过集成 Apache Shiro 来支持 LDAP 权限验证
- 通过长期运行的job contexts支持亚秒级别低延迟的任务
- 可以通过结束 context 来停止运行的作业(job)
- 分割 jar 上传步骤以提高 job 的启动
- 异步和同步的 job API,其中同步 API 对低延时作业非常有效
- 支持 Standalone Spark 和 Mesos、yarn
- Job 和 jar 信息通过一个可插拔的 DAO 接口来持久化
- 对RDD或DataFrame对象命名并缓存,通过该名称获取RDD或DataFrame。这样可以提高对象在作业间的共享和重用
- 支持 Scala 2.10 版本和 2.11 版本
安装
参考https://github.com/spark-jobserver/spark-jobserver#deployment
配置和安装
设定配置的环境是firstshare
。
- 复制conf目录下local.sh.template为
firstshare.sh
,根据实际情况修改里面内容。
其中DEPLOY_HOSTS
是要将服务部署到哪台机器上,APP_USER
和APP_GROUP
是服务文件的所有者和所在组。 复制conf目录下local.conf.template为
firstshare.conf
,根据需要修改。附上部分配置,主要修改了以下内容:
- master使用
yarn-client
,不能是yarn-cluster
,yarn-client
可以保持SparkContext不关闭。 - 数据库使用mysql。
- 修改web端口为8099
123456789101112131415161718spark {master = "yarn-client"jobserver {port = 8099context-per-jvm = falsejobdao = spark.jobserver.io.JobSqlDAO #使用file或数据库sqldao {slick-driver=slick.driver.MySQLDriverjdbc-driver=com.mysql.jdbc.Driverjdbc {url="jdbc:mysql://db_host/spark_jobserver?characterEncoding=UTF-8"user="secret"password="secret"}}}}flyway.locations="db/mysql/migration"- master使用
如有需要,可以进行shiro的配置,需要设置
authentication = on
。sh bin/server_deploy.sh firstshare
进行安装,安装过程中需要几次输入密码。- 使用
server_start.sh
和server_stop.sh
进行启停。
更新
如果对代码进行了更新,直接使用sbt ++2.10.5 job-server-extras/assembly
打包,替换安装目录下的spark-job-server.jar
即可。
需要使用自带的测试用例时,使用sbt job-server-tests/package
进行打包。
API
jar包操作
|
|
提交jar包
打包时不建议打assembly包,推荐使用dependent-jar-uris
来添加依赖。
|
|
查看所有jar包
|
|
context操作
|
|
创建context
设定yarn队列,设定
spark.yarn.queue
参数1curl -d "" 'localhost:8099/contexts/test-context?spark.yarn.queue=test'增加依赖jar包,设定
dependent-jar-uris
参数1curl -d "" 'localhost:8099/contexts/mongo-context?spark.yarn.queue=test&dependent-jar-uris=file:///opt/fs/job-server/aux-lib/mongo-spark-connector_2.10-1.1.0.jar,file:///opt/fs/job-server/aux-lib/mongo-java-driver-3.2.2.jar'
删除context
|
|
job操作
这里的job不是普通的spark任务,而是实现spark-jobserver api的任务,继承其api中SparkJob
trait,并实现runJob
方法。
|
|
提交job
提交带参数任务
1curl -d "input.string = a b c a b see" 'localhost:8099/jobs?appName=spark-mongo&classPath=com.fxiaoke.dataplatform.WordCountExample&context=mongo-context'
提交任务到指定的context,设置
context
参数值,context需要提前创建12345678curl -X POST 'localhost:8099/jobs?appName=spark-mongo&classPath=com.fxiaoke.dataplatform.MongoSparkSQL&context=mongo-context&sync=true&timeout=10000' -d '{mongo.ip = "192.168.2.19"mongo.database = mydbmongo.collection = myCollectionmongo.sql = "select name,age from mydb.myCollection where age >= 20"}'等待任务执行完成,设定
sync=true
,也可以设置timeout
参数,默认40s。1curl -d "input.string = a b c a b see" 'localhost:8099/jobs?appName=test&classPath=spark.jobserver.WordCountExample&sync=true&timeout=10000000'
查看job信息
|
|
创建Spark JobServer工程
添加依赖
由于自己修改了spark-jobserver代码,所以需要自己生成api jar包,可以使用sbt publish-local
命令,打包并publish到本地ivy仓库。
|
|
在build.sbt
中添加spark-jobserver依赖,其他依赖按需要添加。
SparkJob API
0.7版本的spark jobserver使用了新的api,直接继承SparkJob
,并实现runJob
和validate
方法即可。
示例:
|
|
Named Objects
参考https://github.com/spark-jobserver/spark-jobserver#named-objects
可以创建自定义的RDD和对象,在多个job共享rdd。
Exceptions
Exception in thread “main” java.lang.NoSuchMethodError: akka.util.Helpers$.ConfigOps(Lcom/typesafe/config/Config;)Lcom/typesafe/config/Config;
这是由于CDH5.7中akka版本比较低,而spark-jobserver中akka版本比较高引起的。可以使用添加cdh支持的版本https://github.com/bjoernlohrmann/spark-jobserver,找到适用于CDH的分支,我们使用的是
0.7.0-SNAPSHOT_cdh-5.7
。参考:
org.apache.spark.SparkException: Found both spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former.
删除
spark-env.sh
中的SPARK_CLASSPATH
. Spark1.4之后不再使用SPARK_CLASSPATH
,使用spark-defaults.conf中spark.executor.extraClassPath
代替。java.lang.NoSuchMethodError: org.joda.time.DateTime.now()
spark-jobserver中joda-time版本(2.9.3)与CDH5.7中joda-time版本(1.6)冲突,将spark-jobserver中joda-time版本降到1.6,并将DateTime.now()
全部替换为new DateTime()
。提交job后无法查看job信息和结果
参见github issue#516
已经解决,commit#109210c1git cherry-pick 109210c更新jar包后,在同一个context中执行,发现结果没有随jar包更新而更新
其实这不是一个bug或exception,参见github issue#218What is happening is this. Each SparkContext has one custom classloader. In the JVM you cannot update JAR code live – it’s not a job server limitation, but a fundamental JVM limitation. Once you load in the classes for jar1, you cannot try to load in newer versions of those classes, because the classloader will keep using the class names that are already loaded.
使用mysql作为数据库后,提交的job状态默认是FINISHED
在WebApi.scala
文件中查看getJobReport
方法12345678910111213def getJobReport(jobInfo: JobInfo, jobStarted: Boolean = false): Map[String, Any] = {val statusMap = if (jobStarted) Map(StatusKey -> "STARTED") else (jobInfo match {case JobInfo(_, _, _, _, _, None, _) => Map(StatusKey -> "RUNNING") //注释1: 当END_TIME为NONE,就判断为RUNNINGcase JobInfo(_, _, _, _, _, _, Some(ex)) => Map(StatusKey -> "ERROR",ResultKey -> formatException(ex))case JobInfo(_, _, _, _, _, Some(e), None) => Map(StatusKey -> "FINISHED") //注释2})Map("jobId" -> jobInfo.jobId,"startTime" -> jobInfo.startTime.toString(),"classPath" -> jobInfo.classPath,"context" -> (if (jobInfo.contextName.isEmpty) "<<ad-hoc>>" else jobInfo.contextName),"duration" -> getJobDurationString(jobInfo)) ++ statusMap}
从注释1可以看到,当第六个参数即END_TIME为None时,任务job处于RUNNING状态。
但在mysql的JOBS表中,END_TIME
字段为timestamp
类型,有默认值”0000-00-00 00:00”,所以一直有值,走到注释2处,表现为FINISHED。
解决方法:
设置mysql的END_TIME默认值为null。在mysql中timestamp
类型比较特殊,不能使用default NULL
设置默认值。
|
|