Flink SQL不会动态解压缩gzip源代码-但仍会解析其中的一部分

mgdq6dx1  于 2023-06-04  发布在  Apache
关注(0)|答案(1)|浏览(162)

我遇到了一个奇怪的问题,我需要问你们,如果我没有错过什么。我在解析普通文件中的gzip json时遇到了一个问题,但我将其简化为更简单的情况:
我有文件系统原始源代码,和简单的SQL计数行。对于1 k行的非压缩测试文件,我得到1 k作为计数的结果。对于同一个文件,用终端压缩,结果是12。
最奇怪的是,如果应用于json日志文件(这是我的初始任务),Flink实际上从gzip文件中解析部分json对象。
这是我的SQL:

def main():
    table_env.execute_sql(f"""
        CREATE TABLE logs_source (
            raw_row STRING
        ) WITH (
            'connector' = 'filesystem',
            'path' = '{logs_path}',
            'source.monitor-interval' = '10',
            'format' = 'raw'
        )
    """)

    table_env.execute_sql("""
        CREATE TABLE print_sink (
            ip_number BIGINT NOT NULL
        ) WITH (
            'connector' = 'print'
        )
    """)
    table_env.execute_sql(f"""
            INSERT INTO print_sink
                SELECT
                    COUNT(raw_row)         
                FROM logs_source
    """).wait()

在文档中的某个地方写着,gzip是基于扩展名(我的文件名是 *.log.gz)动态解码的。
我搜索了任何选项或参数,以启用解析gzip文件具体-但我失败了...
Flink版本1.16.0,im使用pyflink,Python 3.9
有什么问题吗谢谢你的任何想法!

nhjlsmyf

nhjlsmyf1#

我也有同样的问题。看起来flink只阅读了它压缩的json文件的一部分。它在处理未压缩的json文件时工作正常。Flink版本1.16.0,Flink with java。

相关问题