spark作业

tvmytwxo  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(485)

我在googleclouddataproc上运行了一个长时间运行的spark结构化流媒体作业,它使用kafka作为源和汇。我还将检查点保存在google云存储中。
运行一周后,我注意到它正在稳定地消耗所有100gb的磁盘存储空间,将文件保存到 /hadoop/dfs/data/current/BP-315396706-10.128.0.26-1568586969675/current/finalized/... .
我的理解是,我的spark工作不应该依赖于本地磁盘存储。
我完全误解了吗?
我是这样提交工作的:

(cd  app/src/packages/ &&  zip -r mypkg.zip mypkg/ ) && mv app/src/packages/mypkg.zip build
gcloud dataproc jobs submit pyspark \
    --cluster cluster-26aa \
    --region us-central1 \
    --properties ^#^spark.jars.packages=org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 \
    --py-files build/mypkg.zip \
    --max-failures-per-hour 10 \
    --verbosity info \
    app/src/explode_rmq.py

以下是我工作的相关部分:
资料来源:

spark = SparkSession \
        .builder \
        .appName("MyApp") \
        .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    spark.sparkContext.addPyFile('mypkg.zip')

    df = spark \
        .readStream \
        .format("kafka") \
        .options(**config.KAFKA_PARAMS) \
        .option("subscribe", "lsport-rmq-12") \
        .option("startingOffsets", "earliest") \
        .load() \
        .select(f.col('key').cast(t.StringType()), f.col('value').cast(t.StringType()))

Flume:

sink_kafka_q = sink_df \
        .writeStream \
        .format("kafka") \
        .options(**config.KAFKA_PARAMS) \
        .option("topic", "my_topic") \
        .option("checkpointLocation", "gs://my-bucket-data/checkpoints/my_topic") \
        .start()
bxgwgixi

bxgwgixi1#

您可以ssh到主节点并运行以下命令来找出谁在消耗hdfs空间吗?

hdfs df -du -h /

我做了一个简单的spark-pi测试,
运行作业之前:

$ hdfs dfs -du /
34       /hadoop
0        /tmp
2107947  /user

作业完成后:

$ hdfs dfs -du /user/
0        /user/hbase
0        /user/hdfs
0        /user/hive
0        /user/mapred
0        /user/pig
0        /user/root
2107947  /user/spark
0        /user/yarn
0        /user/zookeeper

$ hdfs dfs -du /user/spark/
2107947  /user/spark/eventlog

它似乎被spark eventlog消耗,请参阅spark.eventlog.dir。您可以考虑使用 spark.eventLog.compress=true 或者用 spark.eventLog.enabled=false

nbnkbykc

nbnkbykc2#

如果内存不足,spark将在本地磁盘上保存信息。您可以如下方式禁用磁盘上的持久性:

df.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)

或者您可以尝试序列化信息以像这样占用更少的内存

df.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)

读取序列化数据将占用更多cpu。
每个Dataframe都有其独特的序列化级别。
更多信息:https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-坚持

相关问题