我在googleclouddataproc上运行了一个长时间运行的spark结构化流媒体作业,它使用kafka作为源和汇。我还将检查点保存在google云存储中。
运行一周后,我注意到它正在稳定地消耗所有100gb的磁盘存储空间,将文件保存到 /hadoop/dfs/data/current/BP-315396706-10.128.0.26-1568586969675/current/finalized/...
.
我的理解是,我的spark工作不应该依赖于本地磁盘存储。
我完全误解了吗?
我是这样提交工作的:
(cd app/src/packages/ && zip -r mypkg.zip mypkg/ ) && mv app/src/packages/mypkg.zip build
gcloud dataproc jobs submit pyspark \
--cluster cluster-26aa \
--region us-central1 \
--properties ^#^spark.jars.packages=org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 \
--py-files build/mypkg.zip \
--max-failures-per-hour 10 \
--verbosity info \
app/src/explode_rmq.py
以下是我工作的相关部分:
资料来源:
spark = SparkSession \
.builder \
.appName("MyApp") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.addPyFile('mypkg.zip')
df = spark \
.readStream \
.format("kafka") \
.options(**config.KAFKA_PARAMS) \
.option("subscribe", "lsport-rmq-12") \
.option("startingOffsets", "earliest") \
.load() \
.select(f.col('key').cast(t.StringType()), f.col('value').cast(t.StringType()))
Flume:
sink_kafka_q = sink_df \
.writeStream \
.format("kafka") \
.options(**config.KAFKA_PARAMS) \
.option("topic", "my_topic") \
.option("checkpointLocation", "gs://my-bucket-data/checkpoints/my_topic") \
.start()
2条答案
按热度按时间bxgwgixi1#
您可以ssh到主节点并运行以下命令来找出谁在消耗hdfs空间吗?
我做了一个简单的spark-pi测试,
运行作业之前:
作业完成后:
它似乎被spark eventlog消耗,请参阅spark.eventlog.dir。您可以考虑使用
spark.eventLog.compress=true
或者用spark.eventLog.enabled=false
nbnkbykc2#
如果内存不足,spark将在本地磁盘上保存信息。您可以如下方式禁用磁盘上的持久性:
或者您可以尝试序列化信息以像这样占用更少的内存
读取序列化数据将占用更多cpu。
每个Dataframe都有其独特的序列化级别。
更多信息:https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-坚持