Spark jobserver的安装和使用

Spark-jobserver 提供了一个 RESTful 接口来提交和管理 spark 的 jobs、jars 和 job contexts。在原项目基础上做了一些本地化和优化工作。

  1. 将spark-jobserver中akka版本降级到CDH5.7中akka版本。
  2. spark-jobserver中joda-time版本(2.9.3)与CDH5.7中joda-time版本(1.6)版本冲突,运行时会出现java.lang.NoSuchMethodError: org.joda.time.DateTime.now()异常,将spark-jobserver中joda-time版本降级到1.6。
  3. 添加使用mysql作为数据库。
  4. 修复提交job后无法查看job信息和结果的问题,参见github issue#516

介绍

git地址

特性

  1. “Spark as Service”:针对 job 和 contexts 的各个方面提供了 REST 风格的 api 接口进行管理
  2. 支持 SparkSQL、Hive、Streaming Contexts/jobs 以及定制 job contexts!具体参考Contexts
  3. 通过集成 Apache Shiro 来支持 LDAP 权限验证
  4. 通过长期运行的job contexts支持亚秒级别低延迟的任务
  5. 可以通过结束 context 来停止运行的作业(job)
  6. 分割 jar 上传步骤以提高 job 的启动
  7. 异步和同步的 job API,其中同步 API 对低延时作业非常有效
  8. 支持 Standalone Spark 和 Mesos、yarn
  9. Job 和 jar 信息通过一个可插拔的 DAO 接口来持久化
  10. 对RDD或DataFrame对象命名并缓存,通过该名称获取RDD或DataFrame。这样可以提高对象在作业间的共享和重用
  11. 支持 Scala 2.10 版本和 2.11 版本

安装

参考https://github.com/spark-jobserver/spark-jobserver#deployment

配置和安装

设定配置的环境是firstshare

  1. 复制conf目录下local.sh.template为firstshare.sh,根据实际情况修改里面内容。
    其中DEPLOY_HOSTS是要将服务部署到哪台机器上,APP_USERAPP_GROUP是服务文件的所有者和所在组。
  2. 复制conf目录下local.conf.template为firstshare.conf,根据需要修改。

    附上部分配置,主要修改了以下内容:

    1. master使用yarn-client,不能是yarn-clusteryarn-client可以保持SparkContext不关闭。
    2. 数据库使用mysql。
    3. 修改web端口为8099
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    spark {
    master = "yarn-client"
    jobserver {
    port = 8099
    context-per-jvm = false
    jobdao = spark.jobserver.io.JobSqlDAO #使用file或数据库
    sqldao {
    slick-driver=slick.driver.MySQLDriver
    jdbc-driver=com.mysql.jdbc.Driver
    jdbc {
    url="jdbc:mysql://db_host/spark_jobserver?characterEncoding=UTF-8"
    user="secret"
    password="secret"
    }
    }
    }
    }
    flyway.locations="db/mysql/migration"
  3. 如有需要,可以进行shiro的配置,需要设置authentication = on

  4. sh bin/server_deploy.sh firstshare进行安装,安装过程中需要几次输入密码。
  5. 使用server_start.shserver_stop.sh进行启停。

更新

如果对代码进行了更新,直接使用sbt ++2.10.5 job-server-extras/assembly打包,替换安装目录下的spark-job-server.jar即可。

需要使用自带的测试用例时,使用sbt job-server-tests/package进行打包。

API

jar包操作

1
2
GET /jars - lists all the jars and the last upload timestamp
POST /jars/<appName> - uploads a new jar under <appName>

提交jar包

打包时不建议打assembly包,推荐使用dependent-jar-uris来添加依赖。

1
2
curl --data-binary @job-server-tests/target/scala-2.10/job-server-tests_2.10-0.7.0-SNAPSHOT_cdh-5.7.jar localhost:8099/jars/test
curl --data-binary @target/scala-2.10/spark-jobserver-mongo_2.10-1.0.0-SNAPSHOT.jar localhost:8099/jars/test-mongo

查看所有jar包

1
curl localhost:8099/jars

context操作

1
2
3
4
GET /contexts - lists all current contexts
POST /contexts/<name> - creates a new context
DELETE /contexts/<name> - stops a context and all jobs running in it
PUT /contexts?reset=reboot - kills all contexts and re-loads only the contexts from config

创建context

  1. 设定yarn队列,设定spark.yarn.queue参数

    1
    curl -d "" 'localhost:8099/contexts/test-context?spark.yarn.queue=test'
  2. 增加依赖jar包,设定dependent-jar-uris参数

    1
    curl -d "" 'localhost:8099/contexts/mongo-context?spark.yarn.queue=test&dependent-jar-uris=file:///opt/fs/job-server/aux-lib/mongo-spark-connector_2.10-1.1.0.jar,file:///opt/fs/job-server/aux-lib/mongo-java-driver-3.2.2.jar'

删除context

1
curl -X DELETE localhost:8099/contexts/test-context

job操作

这里的job不是普通的spark任务,而是实现spark-jobserver api的任务,继承其api中SparkJob trait,并实现runJob方法。

1
2
3
4
5
GET /jobs - Lists the last N jobs
POST /jobs - Starts a new job, use ?sync=true to wait for results
GET /jobs/<jobId> - Gets the result or status of a specific job
DELETE /jobs/<jobId> - Kills the specified job
GET /jobs/<jobId>/config - Gets the job configuration

提交job

  1. 提交带参数任务

    1
    curl -d "input.string = a b c a b see" 'localhost:8099/jobs?appName=spark-mongo&classPath=com.fxiaoke.dataplatform.WordCountExample&context=mongo-context'
  1. 提交任务到指定的context,设置context参数值,context需要提前创建

    1
    2
    3
    4
    5
    6
    7
    8
    curl -X POST 'localhost:8099/jobs?appName=spark-mongo&classPath=com.fxiaoke.dataplatform.MongoSparkSQL&context=mongo-context&sync=true&timeout=10000' -d '
    {
    mongo.ip = "192.168.2.19"
    mongo.database = mydb
    mongo.collection = myCollection
    mongo.sql = "select name,age from mydb.myCollection where age >= 20"
    }
    '
  2. 等待任务执行完成,设定sync=true,也可以设置timeout参数,默认40s。

    1
    curl -d "input.string = a b c a b see" 'localhost:8099/jobs?appName=test&classPath=spark.jobserver.WordCountExample&sync=true&timeout=10000000'

查看job信息

1
2
3
4
5
6
7
8
9
10
$ curl localhost:8099/jobs/1b8f60f8-067e-4d83-b198-8fc6017227e8
{
"duration": "3.0 secs",
"classPath": "com.fxiaoke.dataplatform.MongoSparkSQL",
"startTime": "2016-10-21T18:37:01.000+08:00",
"context": "mongo-context",
"result": ["{\"name\":\"Thorin\",\"age\":195}", "{\"name\":\"Balin\",\"age\":178}", "{\"name\":\"Dwalin\",\"age\":169}", "{\"name\":\"\u00d3in\",\"age\":167}", "{\"name\":\"Gl\u00f3in\",\"age\":158}"],
"status": "FINISHED",
"jobId": "1b8f60f8-067e-4d83-b198-8fc6017227e8"
}

创建Spark JobServer工程

添加依赖

由于自己修改了spark-jobserver代码,所以需要自己生成api jar包,可以使用sbt publish-local命令,打包并publish到本地ivy仓库。

1
2
3
4
sbt ++2.10.5 job-server-api/publish-local
sbt ++2.10.5 job-server/publish-local
sbt ++2.10.5 job-server-extras/publish-local
sbt ++2.10.5 akka-app/publish-local

build.sbt中添加spark-jobserver依赖,其他依赖按需要添加。

1
2
libraryDependencies += "spark.jobserver" %% "job-server-api" % "0.7.0-SNAPSHOT_cdh-5.7" % "provided"
libraryDependencies += "spark.jobserver" %% "job-server-extras" % "0.7.0-SNAPSHOT_cdh-5.7" % "provided"

SparkJob API

0.7版本的spark jobserver使用了新的api,直接继承SparkJob,并实现runJobvalidate方法即可。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import com.typesafe.config.Config
import org.apache.spark.SparkContext
import org.scalactic._
import spark.jobserver.api.{JobEnvironment, SingleProblem, SparkJob, ValidationProblem}
import scala.util.Try
object WordCountExample extends SparkJob {
type JobData = Seq[String] //可以自定义JobData和JobOutput类型
type JobOutput = collection.Map[String, Long]
def runJob(sc: SparkContext, runtime: JobEnvironment, data: JobData): JobOutput = {
sc.parallelize(data).countByValue
}
def validate(sc: SparkContext, runtime: JobEnvironment, config: Config): JobData Or Every[ValidationProblem] = {
Try(config.getString("input.string").split(" ").toSeq)
.map(words => Good(words))
.getOrElse(Bad(One(SingleProblem("No input.string param"))))
}
}

Named Objects

参考https://github.com/spark-jobserver/spark-jobserver#named-objects

可以创建自定义的RDD和对象,在多个job共享rdd。

Exceptions

  1. Exception in thread “main” java.lang.NoSuchMethodError: akka.util.Helpers$.ConfigOps(Lcom/typesafe/config/Config;)Lcom/typesafe/config/Config;

    这是由于CDH5.7中akka版本比较低,而spark-jobserver中akka版本比较高引起的。可以使用添加cdh支持的版本https://github.com/bjoernlohrmann/spark-jobserver,找到适用于CDH的分支,我们使用的是0.7.0-SNAPSHOT_cdh-5.7

    参考:

  2. org.apache.spark.SparkException: Found both spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former.

    删除spark-env.sh中的SPARK_CLASSPATH. Spark1.4之后不再使用SPARK_CLASSPATH,使用spark-defaults.conf中spark.executor.extraClassPath代替。

  3. java.lang.NoSuchMethodError: org.joda.time.DateTime.now()
    spark-jobserver中joda-time版本(2.9.3)与CDH5.7中joda-time版本(1.6)冲突,将spark-jobserver中joda-time版本降到1.6,并将DateTime.now()全部替换为new DateTime()

  4. 提交job后无法查看job信息和结果
    参见github issue#516
    已经解决,commit#109210c

    1
    git cherry-pick 109210c
  5. 更新jar包后,在同一个context中执行,发现结果没有随jar包更新而更新
    其实这不是一个bug或exception,参见github issue#218

    What is happening is this. Each SparkContext has one custom classloader. In the JVM you cannot update JAR code live – it’s not a job server limitation, but a fundamental JVM limitation. Once you load in the classes for jar1, you cannot try to load in newer versions of those classes, because the classloader will keep using the class names that are already loaded.

  6. 使用mysql作为数据库后,提交的job状态默认是FINISHED
    WebApi.scala文件中查看getJobReport方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    def getJobReport(jobInfo: JobInfo, jobStarted: Boolean = false): Map[String, Any] = {
    val statusMap = if (jobStarted) Map(StatusKey -> "STARTED") else (jobInfo match {
    case JobInfo(_, _, _, _, _, None, _) => Map(StatusKey -> "RUNNING") //注释1: 当END_TIME为NONE,就判断为RUNNING
    case JobInfo(_, _, _, _, _, _, Some(ex)) => Map(StatusKey -> "ERROR",
    ResultKey -> formatException(ex))
    case JobInfo(_, _, _, _, _, Some(e), None) => Map(StatusKey -> "FINISHED") //注释2
    })
    Map("jobId" -> jobInfo.jobId,
    "startTime" -> jobInfo.startTime.toString(),
    "classPath" -> jobInfo.classPath,
    "context" -> (if (jobInfo.contextName.isEmpty) "<<ad-hoc>>" else jobInfo.contextName),
    "duration" -> getJobDurationString(jobInfo)) ++ statusMap
    }

从注释1可以看到,当第六个参数即END_TIME为None时,任务job处于RUNNING状态。
但在mysql的JOBS表中,END_TIME字段为timestamp类型,有默认值”0000-00-00 00:00”,所以一直有值,走到注释2处,表现为FINISHED。

解决方法:
设置mysql的END_TIME默认值为null。在mysql中timestamp类型比较特殊,不能使用default NULL设置默认值。

1
alter table JOBS modify column END_TIME timestamp NULL;

参考

  1. sbt publish local jar
  2. Scala教程:简单构建工具SBT
  3. 使用本地maven私服加速sbt下载