我遇到了一个奇怪的问题,我需要问你们,如果我没有错过什么。我在解析普通文件中的gzip json时遇到了一个问题,但我将其简化为更简单的情况:
我有文件系统原始源代码,和简单的SQL计数行。对于1 k行的非压缩测试文件,我得到1 k作为计数的结果。对于同一个文件,用终端压缩,结果是12。
最奇怪的是,如果应用于json日志文件(这是我的初始任务),Flink实际上从gzip文件中解析部分json对象。
这是我的SQL:
def main():
table_env.execute_sql(f"""
CREATE TABLE logs_source (
raw_row STRING
) WITH (
'connector' = 'filesystem',
'path' = '{logs_path}',
'source.monitor-interval' = '10',
'format' = 'raw'
)
""")
table_env.execute_sql("""
CREATE TABLE print_sink (
ip_number BIGINT NOT NULL
) WITH (
'connector' = 'print'
)
""")
table_env.execute_sql(f"""
INSERT INTO print_sink
SELECT
COUNT(raw_row)
FROM logs_source
""").wait()
在文档中的某个地方写着,gzip是基于扩展名(我的文件名是 *.log.gz)动态解码的。
我搜索了任何选项或参数,以启用解析gzip文件具体-但我失败了...
Flink版本1.16.0,im使用pyflink,Python 3.9
有什么问题吗谢谢你的任何想法!
1条答案
按热度按时间nhjlsmyf1#
我也有同样的问题。看起来flink只阅读了它压缩的json文件的一部分。它在处理未压缩的json文件时工作正常。Flink版本1.16.0,Flink with java。