我有一个未分区的gzip csv文件,我读到Spark。读取gzip文件不是一个问题,但是只要使用一个操作来计算sparkDataframe,并且该操作触及一个特定的有问题的行,就会抛出一个错误。如果我使用 df.limit()
我可以将read上的dataframe子集到有问题的观察之前的行的行号,然后可以继续我的工作流程而不会出错。
我的问题是,有没有一种方法可以跳过观察中的阅读。我想沿着df.limit\u range(100:200)做一些事情,读取csv时跳过100-200行。我尝试了各种方法来生成索引列,然后进行过滤,但在计算时遇到了问题。下面,我尝试将子集合到有问题的行之前的所有行,然后与原始未筛选的Dataframe反连接,但是一旦计算了有问题的行,就会再次导致错误,表明gzip文件无法读入。
df_full = df.withColumn("rowId", monotonically_increasing_id())
df_head = df_full.limit(100).where(col("rowID") < 99)
anti_df = df_full.join(df_head, "id", "left_anti")
错误:
FileReadException: Error while reading file s3a://some-s3-bucket/dir/subdir/file_name.gz.
Caused by: EOFException: Unexpected end of input stream
1条答案
按热度按时间luaexgnf1#
可以使用列上的筛选器来读取除第100-200行以外的所有行。
输出
anti_df
将:只要确保你的过滤器在你的星火计划中被按下。我的意思是,过滤器应该在读取之后立即执行,而不是在对其执行多个计算之后执行(此时您的代码可能会因错误而失败)。