如何在scala中从sparkDataframe中分离出损坏的记录?

xa9qqrwz  于 2021-05-18  发布在  Spark
关注(0)|答案(0)|浏览(329)

在使用scala的spark中,我需要分离出具有 _corrupt_record .
我有以下代码:
在这里,我把数据读入一个df-这很好。

val data_frame_datasource0 = glueContext.getCatalogSource(database = "my-stream-database", tableName = "my-stream-table", tmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{"startingPosition": "TRIM_HORIZON", "inferSchema": "true"}""")).getDataFrame()

这里我想创建一个数据具有 _corrupt_record 与好数据分离,以便以后我可以将split\u valid\u corrupt\u df(0)转储到坏数据位置。

val split_valid_corrupt_df = data_frame_datasource0.splitRows(Seq("_corrupt_record"), transformationContext = "split_valid_corrupt_df", CallSite("Not provided", ""), stageThreshold = 10, totalThreshold = 100)

但是 _corrupt_record 没有分开。
这里面我缺了什么?我是新来的。我还需要别的吗 Seq 附加或非空过滤器 _corrupt_record 不知怎么的?
谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题