在minikube中,当驱动程序在sparkapplication.yaml中运行时,spark驱动程序不会挂载hostpath

kmpatx3s  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(292)

我是spark和minikube的新手。在sparkapplication.yaml中运行spark作业时遇到了这个问题,spark driver和executor都创建成功,但都没有挂载hostpath。我参考了汤姆路易斯的迷你们Spark的例子。如果我直接通过dockfile//copy~~~//将数据放入sparkjob图像文件中,一切都会正常运行。
当前,数据(*.csv)位于localfolder-(mounted)-minikube-(not mounted)-spark driver pod中。
我不知道为什么hostpath没有挂载,可能有一些错误我做了^^;有人能看看我的问题吗?谢谢。。!
模板/sparkapplication.yaml

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name:  {{ .Release.Name | trunc 63 }}
  labels:
    chartname: {{ .Chart.Name | trunc 63 | quote }}
    release: {{ .Release.Name | trunc 63 | quote }}
    revision: {{ .Release.Revision | quote }}
    sparkVersion: {{ .Values.sparkVersion | quote }}
    version: {{ .Chart.Version | quote }}
spec:
  type: Scala
  mode: cluster
  image: {{ list .Values.imageRegistry .Values.image | join "/" | quote }}
  imagePullPolicy: {{ .Values.imagePullPolicy }}
  {{- if .Values.imagePullSecrets }}
  imagePullSecrets:
  {{- range .Values.imagePullSecrets }}
    - {{ . | quote }}
  {{- end }}
  {{- end }}
  mainClass: {{ .Values.mainClass | quote }}
  mainApplicationFile: {{ .Values.jar | quote }}
  {{- if .Values.arguments }}
  arguments:
  {{- range .Values.arguments }}
    - {{ . | quote }}
  {{- end }}
  {{- end }}
  sparkVersion: {{ .Values.sparkVersion | quote }}
  restartPolicy:
    type: Never
  {{- if or .Values.jarDependencies .Values.fileDependencies .Values.sparkConf .Values.hadoopConf }}
  deps:
    {{- if .Values.jarDependencies }}
    jars:
    {{- range .Values.jarDependencies }}
      - {{ . | quote }}
    {{- end }}
    {{- end }}
    {{- if .Values.fileDependencies }}
    files:
    {{- range .Values.fileDependencies }}
      - {{ . | quote }}
    {{- end }}
    {{- end }}
    {{- if .Values.sparkConf }}
    sparkConf:
    {{- range $conf, $value := .Values.sparkConf }}
      {{ $conf | quote }}: {{ $value | quote }}
    {{- end }}
    {{- end }}
    {{- if .Values.hadoopConf }}
    hadoopConf:
    {{- range $conf, $value := .Values.hadoopConf }}
      {{ $conf | quote }}: {{ $value | quote }}
    {{- end }}
    {{- end }}
  {{- end }}
  driver:
    {{- if .Values.envSecretKeyRefs }}
    envSecretKeyRefs:
    {{- range $name, $value := .Values.envSecretKeyRefs }}
      {{ $name }}:
        name: {{ $value.name}}
        key: {{ $value.key}}
    {{- end }}
    {{- end }}
    {{- if .Values.envVars }}
    envVars:
    {{- range $name, $value := .Values.envVars }}
      {{ $name }}: {{ $value | quote }}
    {{- end }}
    {{- end }}
    securityContext:
      runAsUser: {{ .Values.userId }}
    cores: {{ .Values.driver.cores }}
    coreLimit: {{ .Values.driver.coreLimit | default .Values.driver.cores | quote }}
    memory: {{ .Values.driver.memory }}
    hostNetwork: {{ .Values.hostNetwork }}
    labels:
      release: {{ .Release.Name | trunc 63 | quote }}
      revision: {{ .Release.Revision | quote }}
      sparkVersion: {{ .Values.sparkVersion | quote }}
      version: {{ .Chart.Version | quote }}
    serviceAccount: {{ .Values.serviceAccount }}
    {{- if .Values.javaOptions }}
    javaOptions: {{ .Values.javaOptions | quote}}
    {{- end }}
    {{- if .Values.mounts }}
    volumeMounts:
    {{- range $name, $path := .Values.mounts }}
      - name: {{ $name }}
        mountPath: {{ $path }}
    {{- end }}
    {{- end }}
    {{- if .Values.tolerations }}
    tolerations:
{{ toYaml .Values.tolerations | indent 6 }}
    {{- end }}
  executor:
    {{- if .Values.envVars }}
    envVars:
    {{- range $name, $value := .Values.envVars }}
      {{ $name | quote }}: {{ $value | quote }}
    {{- end }}
    {{- end }}
    securityContext:
      runAsUser: {{ .Values.userId }}
    cores: {{ .Values.executor.cores }}
    coreLimit: {{ .Values.executor.coreLimit | default .Values.executor.cores | quote }}
    instances: {{ .Values.executor.instances }}
    memory: {{ .Values.executor.memory }}
    labels:
      release: {{ .Release.Name | trunc 63 | quote }}
      revision: {{ .Release.Revision | quote }}
      sparkVersion: {{ .Values.sparkVersion | quote }}
      version: {{ .Chart.Version | quote }}
    serviceAccount: {{ .Values.serviceAccount }}
    {{- if .Values.javaOptions }}
    javaOptions: {{ .Values.javaOptions }}
    {{- end }}
    {{- if .Values.mounts }}
    volumeMounts:
    {{- range $name, $path := .Values.mounts }}
      - name: {{ $name }}
        mountPath: {{ $path }}
    {{- end }}
    {{- end }}
    {{- if .Values.tolerations }}
    tolerations:
{{ toYaml .Values.tolerations | indent 6 }}
    {{- end }}
  {{- if .Values.jmxExporterJar }}
  monitoring:
    exposeDriverMetrics: true
    exposeExecutorMetrics: true
    prometheus:
      port: {{ .Values.jmxPort | default 8090 }}
      jmxExporterJar: {{ .Values.jmxExporterJar }}
  {{- end }}
  {{- if .Values.volumes }}
  volumes:
    - name: input-data
      hostPath:
        path: /input-data
    - name: output-data
      hostPath:
        path: /output-data
  {{- end }}
  {{- if .Values.nodeSelector }}
  nodeSelector:
{{ toYaml .Values.nodeSelector | indent 4 }}
  {{- end }}

值.yaml


# Generated by build.sbt. Please don't manually update

version: 0.1
sparkVersion: 3.0.2
image: kaspi/kaspi-sparkjob:0.1
jar: local:///opt/spark/jars/kaspi-kaspi-sparkjob.jar
mainClass: kaspi.sparkjob
fileDependencies: []
environment: minikube
serviceAccount: spark-spark
imageRegistry: localhost:5000
arguments:
  - "/mnt/data-in/"
  - "/mnt/data-out/"
volumes:
  - name: input-data
    hostPath:
      path: /input-data
  - name: output-data
    hostPath:
      path: /output-data
mounts:
  input-data: /mnt/data-in
  output-data: /mnt/data-out
driver:
  cores: 1
  memory: "2g"
executor:
  instances: 2
  cores: 1
  memory: "1g"
hadoopConf:
sparkConf:
hostNetwork: false
imagePullPolicy: Never
userId: 0

构建.sbt

val sparkVersion = "3.0.2"

val sparkLibs = Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.apache.spark" %% "spark-mllib" % sparkVersion
)

lazy val commonSettings = Seq(
  organization := "kaspi",
  scalaVersion := "2.12.13",
  version := "0.1",
  libraryDependencies ++= sparkLibs
)

val domain = "kaspi"

// for building FAT jar
lazy val assemblySettings = Seq(
  assembly / assemblyOption := (assemblyOption in assembly).value.copy(includeScala = false),
  assembly / assemblyOutputPath := baseDirectory.value / "output" / s"${domain}-${name.value}.jar"
)

val targetDockerJarPath = "/opt/spark/jars"
val baseRegistry = sys.props.getOrElse("baseRegistry", default = "localhost:5000")

// for building docker image
lazy val dockerSettings = Seq(
  imageNames in docker := Seq(
    ImageName(s"$domain/${name.value}:latest"),
    ImageName(s"$domain/${name.value}:${version.value}"),
  ),
  buildOptions in docker := BuildOptions(
    cache = false,
    removeIntermediateContainers = BuildOptions.Remove.Always,
    pullBaseImage = BuildOptions.Pull.Always
  ),
  dockerfile in docker := {
    // The assembly task generates a fat JAR file
    val artifact: File = assembly.value
    val artifactTargetPath = s"$targetDockerJarPath/$domain-${name.value}.jar"
    new Dockerfile {
      from(s"$baseRegistry/spark-runner:0.1")
    }.add(artifact, artifactTargetPath)
  }
)

// Include "provided" dependencies back to default run task
lazy val runLocalSettings = Seq(
  // https://stackoverflow.com/questions/18838944/how-to-add-provided-dependencies-back-to-run-test-tasks-classpath/21803413#21803413
  Compile / run := Defaults
    .runTask(
      fullClasspath in Compile,
      mainClass in (Compile, run),
      runner in (Compile, run)
    )
    .evaluated
)

lazy val root = (project in file("."))
  .enablePlugins(sbtdocker.DockerPlugin)
  .enablePlugins(AshScriptPlugin)
  .settings(
    commonSettings,
    assemblySettings,
    dockerSettings,
    runLocalSettings,
    name := "kaspi-sparkjob",
    Compile / mainClass := Some("kaspi.sparkjob"),
    Compile / resourceGenerators += createImporterHelmChart.taskValue
  )

// Task to create helm chart
lazy val createImporterHelmChart: Def.Initialize[Task[Seq[File]]] = Def.task {
  val chartFile = baseDirectory.value / "helm" / "Chart.yaml"
  val valuesFile = baseDirectory.value / "helm" / "values.yaml"

  val chartContents =
    s"""# Generated by build.sbt. Please don't manually update
       |apiVersion: v1
       |name: $domain-${name.value}
       |version: ${version.value}
       |appVersion: ${version.value}
       |description: ETL Job
       |home: https://github.com/jyyoo0530/kaspi
       |sources:
       |  - https://github.com/jyyoo0530/kaspi
       |maintainers:
       |  - name: Jeremy Yoo
       |    email: jyyoo0530@gmail.com
       |    url: https://www.linkedin.com/in/jeeyoungyoo
       |""".stripMargin

  val valuesContents =
    s"""# Generated by build.sbt. Please don't manually update
       |version: ${version.value}
       |sparkVersion: ${sparkVersion}
       |image: $domain/${name.value}:${version.value}
       |jar: local://$targetDockerJarPath/$domain-${name.value}.jar
       |mainClass: ${(Compile / run / mainClass).value.getOrElse("__MAIN_CLASS__")}
       |fileDependencies: []
       |environment: minikube
       |serviceAccount: spark-spark
       |imageRegistry: localhost:5000
       |arguments:
       |  - "/mnt/data-in/"
       |  - "/mnt/data-out/"
       |volumes:
       |  - name: input-data
       |    hostPath:
       |      path: /input-data
       |  - name: output-data
       |    hostPath:
       |      path: /output-data
       |mounts:
       |  input-data: /mnt/data-in
       |  output-data: /mnt/data-out
       |driver:
       |  cores: 1
       |  memory: "2g"
       |executor:
       |  instances: 2
       |  cores: 1
       |  memory: "1g"
       |hadoopConf:
       |sparkConf:
       |hostNetwork: false
       |imagePullPolicy: Never
       |userId: 0
       |""".stripMargin

  IO.write(chartFile, chartContents)
  IO.write(valuesFile, valuesContents)
  Seq(chartFile, valuesFile)
}

lazy val showVersion = taskKey[Unit]("Show version")
showVersion := {
  println((version).value)
}

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

2021/2/25更新**

我试着在yaml下面进行测试,然后将hostpath中的卷成功地装入pod。没有区别,但对象特性不同,一个是“容器”,一个是“驱动程序”,“执行器”…等等(同样的问题也发生在使用gafferhdfs时,其中k8s对象名是“namenode”、“datanode”…等等)。使用自定义kubernetes对象名会有问题吗??但如果它仍然是继承的容器属性,,,则没有理由不被装载。。。。所以。。。。还在挣扎……!:)

apiVersion: v1
kind: Pod
metadata:
  name: hostpath
  namespace: spark-apps
spec:
  containers:
    - name: nginx
      image: nginx
      volumeMounts:
        - name: volumepath
          mountPath: /mnt/data
  volumes:
    - name: volumepath
      hostPath:
        path: /input-data
        type: Directory

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题