我正在执行以下步骤,在这个过程中,虽然我对数据使用了cache(),但会丢失转换后的数据。
步骤1:从cassandra读取数据:
data = spark_session.read \
.format('org.apache.spark.sql.cassandra') \
.options(table=table, keyspace=keyspace) \
.load()
data_cached = data.cache()
第二步:从aws s3 bucket读取数据,比如说s3\u data\u path
s3_full_df = spark.read.format("parquet").load(S3_data_path)
full_data = s3_full_df .cache()
full_data.show(n=1, truncate=False)
步骤3:查找步骤1中的cassandra数据和步骤2中的s3Parquet文件数据之间的差异
diff_data = data_cached.subtract(full_data)
diff_data_cached = diff_data .cache()
diff_data_cached.count()
第4步:将第3步数据diff\u data\u缓存到aws s3 bucket中,比如s3\u diff\u path
diff_data_cached.write.parquet(inc_path)
步骤5:imp步骤:将cassandra数据从步骤1覆盖到aws s3路径s3\U数据\U路径(在步骤2中)
data_cached.write.parquet(full_path, mode="overwrite")
第6步:写缓存在数据库中的差异数据。此步骤有问题。
在第三步中,diff\ u data\ u cached是可用的,但是在第五步中diff\ u data\ u cached是空的,我的假设是在第五步中,数据被第一步中的数据覆盖,因此两个Dataframe之间没有差异,但是由于我对diff\u data\u cached运行了cache()操作,然后运行count()将数据加载到内存中,所以我的期望是diff\u data\u cached应该在内存中对步骤6可用,而不是spark懒散地计算。
1条答案
按热度按时间0sgqnhkj1#
你是1)在hadoop/大数据域中写回一个源,2)应用了缓存,3)因此使缓存失效。
在过去,我很确定这没有发生——失效是因为(底层)不可变rdd被认为是数据源的快照。我是基于https://issues.apache.org/jira/browse/spark-24596.
我想知道您是否对不再写入的分区进行了筛选,然后对新分区进行了写入,如果这样做具有相同的效果。我很快就会查到的,会给你发邮件的。