因此,我有一个spark结构化流作业,它将扁平化的json消息 Dataframe 以 parquet 格式存储在日期分区文件夹(即/source/wireless/active_portal/activation/dt=当前日期)中的hdfs位置,它还创建了_spark_metadata,其中包含它已写入输出位置的文件的名称。此HDFS位置的顶部有一个hive表。
我必须在所有 parquet 文件中将特定列值PIN更改为默认值XXXX,而不更改其名称,这样_spark_metadata就不会受到干扰
是否有一种方法可以通过使用spark命令或hive命令来更改hdfs位置中的列中的数据。
schema of parquet files
root
|-- accNumber: string (nullable = true)
|-- accountPin: string (nullable = false)
|-- firstName: string (nullable = true)
|-- lastName: string (nullable = true)
|-- dt: date (nullable = false)
|-- load_time: timestamp (nullable = false)
spark metadata file:
{"path":"hdfs://HDPDEVNN/source/wireless/activation_portal/activation/dt=2022-03-30/part-00000-02b9b4f6-ea66-483e-9348-a9b87f33a232.c000.snappy.parquet","size":6834,"isDir":false,"modificationTime":1648655859206,"blockReplication":2,"blockSize":134217728,"action":"add"}
我尝试获取spark Dataframe 中的所有数据并更改列和重写,但这会更改文件的名称并干扰spark_metadata文件。现在,我无法使用spark读取整个文件夹,因为它给了我xyz。parquet文件不存在,因为文件名已更改
我还尝试在配置单元中使用Insert Overwrite语句Directory语句,但这也会创建具有不同名称的文件
1条答案
按热度按时间gmxoilav1#
不能在HDFS上修改文件,因此必须使用Hive。假设您的表不是事务表,则不能在适当的位置修改数据,因此必须通过另一个表清洗数据。
这里有一个我如何处理这个问题的虚拟例子。从内存中写这个,所以它可能不是完全正确的语法。
现在您已经得到了更改后的数据,只需将其放回原始表中。