如何在kubernetes环境中使用spark配置beam python sdk

soat7uwm  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(415)

tldr公司;

如何配置“environment\u type”=external或process的apachebeam管道选项?

说明

目前,我们在kubernetes中有一个独立的spark集群,在这个解决方案(和设置)之后,我们启动一个beam管道,在spark worker上创建一个嵌入式spark作业服务器,spark worker需要联合运行python sdk。apache beam允许以4种不同的方式运行python sdk:
“docker”-默认值,在kubernetes集群中不可能(将使用“container inside container”)
“环回”-仅用于测试,不可能与一个以上的工人吊舱
“外部”-理想的设置,“只是”创建一个侧车容器运行在同一个豆荚作为Spark工人
“进程”-在spark worker中执行一个进程,虽然不理想,但也可能太理想。

发展

使用“external”-在同一个pod上用python sdk实现spark worker:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: spark-worker
  labels:
    app: spark-worker
spec:
  selector:
    matchLabels:
      app: spark-worker
  template:
    metadata:
      labels:
        app: spark-worker
    spec:
      containers:
      - name: spark-worker
        image: spark-py-custom:latest
        imagePullPolicy: Never
        ports:
        - containerPort: 8081
          protocol: TCP
        command: ['/bin/bash',"-c","--"]
        args: ["/start-worker.sh" ]
        resources :
          requests :
            cpu : 4
            memory : "5Gi"
          limits :
             cpu : 4
             memory : "5Gi"
        volumeMounts:
          - name: spark-jars
            mountPath: "/tmp"
      - name: python-beam-sdk
        image: apachebeam/python3.7_sdk:latest
        command: ["/opt/apache/beam/boot", "--worker_pool"]
        ports:
        - containerPort: 50000
        resources:
          limits:
            cpu: "1"
            memory: "1Gi"
      volumes:
        - name: spark-jars
          persistentVolumeClaim:
            claimName: spark-jars

如果我们执行命令

python3 wordcount.py \
--output ./data_test/counts \
--runner=SparkRunner \
--spark_submit_uber_jar \
--spark_job_server_jar=beam-runners-spark-job-server-2.28.0.jar \
--spark_master_url=spark://spark-master:7077 \
--spark_rest_url=http://spark-master:6066 \
--environment_type=EXTERNAL \
--environment_config=localhost:50000

我们得到一个卡在“运行”状态的终端:

INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.client:Timeout attempting to reach GCE metadata service.
WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
Connecting anonymously.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.28.0
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x7fc360c0b8c8> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x7fc360c0f048> ====================
INFO:apache_beam.runners.portability.abstract_job_service:Artifact server started on port 36369
INFO:apache_beam.runners.portability.abstract_job_service:Running job 'job-2448721e-e686-41d4-b924-5f8c5ae73ac2'
INFO:apache_beam.runners.portability.spark_uber_jar_job_server:Submitted Spark job with ID driver-20210305172421-0000
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED
INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING

在spark worker日志中:

21/03/05 17:24:25 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=45203" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-worker-64fd4ddd6-tqdrs:45203" "--executor-id" "0" "--hostname" "172.18.0.20" "--cores" "3" "--app-id" "app-20210305172425-0000" "--worker-url" "spark://Worker@172.18.0.20:44365"

在python sdk上:

2021/03/05 17:19:52 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=', '--artifact_endpoint=', '--provision_endpoint=', '--control_endpoint=']
2021/03/05 17:24:32 No logging endpoint provided.

正在检查spark worker stderr(在本地主机8081上):

Spark Executor Command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=45203" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-worker-64fd4ddd6-tqdrs:45203" "--executor-id" "0" "--hostname" "172.18.0.20" "--cores" "3" "--app-id" "app-20210305172425-0000" "--worker-url" "spark://Worker@172.18.0.20:44365"
========================================

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/03/05 17:24:26 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 230@spark-worker-64fd4ddd6-tqdrs
21/03/05 17:24:26 INFO SignalUtils: Registered signal handler for TERM
21/03/05 17:24:26 INFO SignalUtils: Registered signal handler for HUP
21/03/05 17:24:26 INFO SignalUtils: Registered signal handler for INT
21/03/05 17:24:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/03/05 17:24:27 INFO SecurityManager: Changing view acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing view acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/03/05 17:24:27 INFO TransportClientFactory: Successfully created connection to spark-worker-64fd4ddd6-tqdrs/172.18.0.20:45203 after 50 ms (0 ms spent in bootstraps)
21/03/05 17:24:27 INFO SecurityManager: Changing view acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing view acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/03/05 17:24:28 INFO TransportClientFactory: Successfully created connection to spark-worker-64fd4ddd6-tqdrs/172.18.0.20:45203 after 1 ms (0 ms spent in bootstraps)
21/03/05 17:24:28 INFO DiskBlockManager: Created local directory at /tmp/spark-bdffc2b3-f57a-42fa-a720-e22274b86b67/executor-f1eff7ca-d2cd-4ff4-b18b-c8d6a520f590/blockmgr-c61fb65f-ea97-4bd5-bf15-e0025845a251
21/03/05 17:24:28 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
21/03/05 17:24:28 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@spark-worker-64fd4ddd6-tqdrs:45203
21/03/05 17:24:28 INFO WorkerWatcher: Connecting to worker spark://Worker@172.18.0.20:44365
21/03/05 17:24:28 INFO TransportClientFactory: Successfully created connection to /172.18.0.20:44365 after 1 ms (0 ms spent in bootstraps)
21/03/05 17:24:28 INFO WorkerWatcher: Successfully connected to spark://Worker@172.18.0.20:44365
21/03/05 17:24:28 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
21/03/05 17:24:28 INFO Executor: Starting executor ID 0 on host 172.18.0.20
21/03/05 17:24:28 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42561.
21/03/05 17:24:28 INFO NettyBlockTransferService: Server created on 172.18.0.20:42561
21/03/05 17:24:28 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/03/05 17:24:28 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(0, 172.18.0.20, 42561, None)
21/03/05 17:24:28 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(0, 172.18.0.20, 42561, None)
21/03/05 17:24:28 INFO BlockManager: Initialized BlockManager: BlockManagerId(0, 172.18.0.20, 42561, None)

它永远卡在那里。通过检查pythonsdk的源代码,我们可以看到“没有提供日志记录端点”是致命的,这是由于缺少发送给他的配置(没有日志记录/工件/提供/控制端点)。如果我尝试将“--artifact\u endpoint”添加到python命令中,就会得到通信失败的grcp错误,因为jobserver创建了自己的工件端点。在这个设置中,需要使用固定端口配置所有这些端点(可能与sdk和worker位于同一pod中的localhost一样),但我找不到如何配置它。检查,以便我可以找到一个相关的问题,但在他的情况下,他得到pythonsdk配置自动(可能是一个sparkrunner问题?)
使用“process”——尝试在一个进程中运行pythonsdk,我用 ./gradlew :sdks:python:container:py37:docker ,将sdks/python/container/build/target/launcher/linux\u amd64/boot可执行文件复制到spark worker容器中的/python\u sdk/boot,并使用以下命令:

python3 wordcount.py \
--output ./data_test/counts \
--runner=SparkRunner \
--spark_submit_uber_jar \
--spark_master_url=spark://spark-master:7077 \
--spark_rest_url=http://spark-master:6066 \
--environment_type=PROCESS \
--spark_job_server_jar=beam-runners-spark-job-server-2.28.0.jar \
--environment_config='{"os":"linux","arch":"x84_64","command":"/python_sdk/boot"}'

导致终端出现“运行时异常”:

INFO:apache_beam.runners.portability.portable_runner:Job state changed to FAILED
Traceback (most recent call last):
  File "wordcount.py", line 91, in <module>
    run()
  File "wordcount.py", line 86, in run
    output | "Write" >> WriteToText(known_args.output)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py", line 581, in __exit__
    self.result.wait_until_finish()
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/portability/portable_runner.py", line 608, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline job-95c13aa5-96ab-4d1d-bc68-7f9d203c8251 failed in state FAILED: unknown error

再次检查spark stderr worker日志,我发现问题是 java.lang.IllegalArgumentException: No filesystem found for scheme classpath 我不知道原因。

21/03/05 18:33:12 INFO Executor: Adding file:/opt/spark/work/app-20210305183309-0000/0/./javax.servlet-api-3.1.0.jar to class loader
21/03/05 18:33:12 INFO TorrentBroadcast: Started reading broadcast variable 0
21/03/05 18:33:12 INFO TransportClientFactory: Successfully created connection to spark-worker-89c5c4c87-5q45s/172.18.0.20:34783 after 1 ms (0 ms spent in bootstraps)
21/03/05 18:33:12 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.3 KB, free 366.3 MB)
21/03/05 18:33:12 INFO TorrentBroadcast: Reading broadcast variable 0 took 63 ms
21/03/05 18:33:12 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.5 KB, free 366.3 MB)
21/03/05 18:33:13 INFO MemoryStore: Block rdd_13_0 stored as values in memory (estimated size 16.0 B, free 366.3 MB)
21/03/05 18:33:13 INFO MemoryStore: Block rdd_17_0 stored as values in memory (estimated size 16.0 B, free 366.3 MB)
21/03/05 18:33:13 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 5427 bytes result sent to driver
21/03/05 18:33:14 ERROR SerializingExecutor: Exception while executing runnable org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@5f917914
java.lang.IllegalArgumentException: No filesystem found for scheme classpath
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:467)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:537)
    at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:125)
    at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
    at org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
21/03/05 18:33:16 ERROR SerializingExecutor: Exception while executing runnable org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@67fb2b2c
java.lang.IllegalArgumentException: No filesystem found for scheme classpath
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:467)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:537)
    at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:125)
    at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
    at org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
21/03/05 18:33:19 INFO ProcessEnvironmentFactory: Still waiting for startup of environment '/python_sdk/boot' for worker id 1-1

可能缺少一些配置参数。

obs公司

如果我执行命令

python3 wordcount.py \
--output ./data_test/counts \
--runner=SparkRunner \
--spark_submit_uber_jar \
--spark_job_server_jar=beam-runners-spark-job-server-2.28.0.jar \
--spark_master_url=spark://spark-master:7077 \
--spark_rest_url=http://spark-master:6066 \
--environment_type=LOOPBACK

在我们的spark worker(在spark集群中只有一个worker)内部,我们有一个完整的工作光束管道和这些日志。

iezvtpos

iezvtpos1#

使用“external”-这绝对像是beam中的一个bug。工人端点应该设置为使用localhost;我认为不可能配置它们。我不知道为什么他们会失踪;一种有根据的猜测是,服务器无法启动,导致端点空着。我为此问题提交了错误报告(beam-11957)。
使用“过程”-方案 classpath 对应于classloaderfilesystem。此文件系统通常使用autoservice加载,这取决于类路径上是否存在classloaderfilesystemregistrar(与文件系统本身的名称无关)。作业jar的类路径基于 spark_job_server_jar . 你从哪里得到你的钱 beam-runners-spark-job-server-2.28.0.jar 从哪里来的?

相关问题