spark streaming-从s3/cos读取文件失败

mnowg1ta  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(408)

我想提交一份 wordcount 在kubernetes集群上运行的spark作业,如下面的命令所示。

  1. ./bin/spark-submit \
  2. --master k8s://https://c111.us-south.containers.cloud.ibm.com:32206 \
  3. --deploy-mode cluster \
  4. --name spark-pi \
  5. --class org.apache.spark.examples.SparkPi --packages com.ibm.stocator:stocator:1.1.3 \
  6. --conf spark.executor.instances=5 --conf spark.hadoop.fs.cos.myobjectstorage.access.key= --conf spark.hadoop.fs.cos.myobjectstorage.secret.key= --conf spark.hadoop.fs.stocator.scheme.list=cos --conf spark.hadoop.fs.cos.impl=com.ibm.stocator.fs.ObjectStoreFileSystem --conf spark.hadoop.fs.stocator.cos.impl=com.ibm.stocator.fs.cos.COSAPIClient --conf spark.hadoop.fs.stocator.cos.scheme=cos --conf spark.jars.ivy=/tmp/.ivy\
  7. --conf spark.kubernetes.container.image=us.icr.io/mods15/spark-py:v1 --conf spark.hadoop.fs.cos.myobjectstorage.endpoint=http://s3.us.cloud-object-storage.appdomain.cloud --conf spark.hadoop.fs.cos.myobjectstorage.v2.signer.type=false --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark local:///opt/spark/examples/src/main/python/wordcount.py cos://vmac-code-engine-bucket.myobjectstorage/book.txt

在我将spark streaming添加到python代码示例之前,一切都正常,如下所示 StreamingContext.textFileStream 因为filestream在python中不可用。我在日志中没有看到任何错误,但是写入cos文件夹的输出是空的(没有任何字数)。

  1. from pyspark import SparkContext
  2. from pyspark.streaming import StreamingContext
  3. import time
  4. def time_in_seconds():
  5. seconds=time.time()
  6. return seconds
  7. timeInSeconds=time_in_seconds()
  8. sc = SparkContext("local[2]", "WordCount")
  9. ssc = StreamingContext(sc, 60)
  10. lines = ssc.textFileStream("cos://COS_BUCKET_NAME.COS_SERVICE_NAME/ES_TOPIC_NAME/")
  11. # Split each line into words
  12. words = lines.flatMap(lambda line: line.split(" "))
  13. # Count each word in each batch
  14. pairs = words.map(lambda word: (word, 1))
  15. wordCounts = pairs.reduceByKey(lambda x, y: x + y)
  16. # Print the first ten elements of each RDD generated in this DStream to the console
  17. wordCounts.pprint()
  18. wordCounts.saveAsTextFiles(f"cos://COS_BUCKET_NAME.COS_SERVICE_NAME/results/wordcount-result-{timeInSeconds}")
  19. ssc.start()
  20. ssc.awaitTermination()

我找不到任何关于如何在kubernetes上运行spark流的文档。我假设从cos桶读取失败。命令或python wordcount示例中是否缺少任何内容?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题