我对spark还很陌生,我对RDD和Dataframe有以下高层次的问题,如果我没弄错的话,这些RDD和Dataframe是建立在RDD之上的:
我知道有两种类型的操作可以在rdd上完成,转换和操作。我还理解,只有在对rdd执行操作时,转换才会被执行,rdd是转换的产物。考虑到rdd在内存中,我想知道是否有可能优化这些rdd消耗的内存量,以下面的示例为例:
KafkaDF = KafkaDFRaw.select(
KafkaDFRaw.key,
KafkaDFRaw.value,
KafkaDFRaw.topic,
unix_timestamp('timestamp',
'yyyy-MM-dd HH:mm:ss').alias('kafka_arrival_time')
).withColumn("spark_arrival_time", udf(time.time, DoubleType())())
我有一个kafkadfrawDataframe,我产生了一个新的rdd,叫做kafkadf。然后我希望向这个新的rdd添加列。我应该将它们添加到现有rdd中吗?像这样:
decoded_value_udf = udf(lambda value: value.decode("utf-8"))
KafkaDF = KafkaDF\
.withColumn(
"cleanKey", decoded_value_udf(KafkaDF.key))\
.withColumn(
"cleanValue", decoded_value_udf(KafkaDF.value))
或者我应该从上一个Dataframe创建一个新的Dataframe?像这样:
decoded_value_udf = udf(lambda value: value.decode("utf-8"))
KafkaDF_NEW = KafkaDF\
.withColumn(
"cleanKey", decoded_value_udf(KafkaDF.key))\
.withColumn(
"cleanValue", decoded_value_udf(KafkaDF.value))
这对内存优化有影响吗?
事先谢谢你的帮助。
1条答案
按热度按时间dxxyhpgq1#
无论何时调用操作,都会执行优化的dag,并按照计划使用内存。您可以比较执行计划以了解:
在两者之间创建额外的变量来保存转换不会影响内存利用率。内存需求将取决于数据大小、分区大小、洗牌等。