flink作业在加载大文件时挂起提交

5anewei6  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(318)

我用java编写了一个flink流作业,它加载一个包含订户数据(4列)的csv文件,然后在与订户数据匹配时从套接字流中读取数据。
最初我使用的是一个小型csv文件(8 mb),一切正常:


# flink run analytics-flink.jar 19001 /root/minisubs.csv /root/output.csv

loaded 200000 subscribers from csv file
11/02/2015 16:36:59 Job execution switched to status RUNNING.
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to SCHEDULED 
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to DEPLOYING 
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to RUNNING

我将csv文件切换到一个更大的文件(~45 mb),现在我看到的是:


# flink run analytics-flink.jar 19001 /root/subs.csv /root/output.csv

loaded 1173547 subscribers from csv file

请注意,上面的订户数是文件中的行数。我试图在flink配置中查找任何超时,但找不到任何超时。
非常感谢您的帮助!
编辑:通过使用commons csv 1.2库,使用此方法加载csv:

private static HashMap<String, String> loadSubscriberGroups(
            String referenceDataFile) throws IOException {
        HashMap<String,String> subscriberGroups = new HashMap<String, String>();

        File csvData = new File(referenceDataFile);
        CSVParser parser = CSVParser.parse(csvData, Charset.defaultCharset(), CSVFormat.EXCEL);
        for (CSVRecord csvRecord : parser) {
            String imsi = csvRecord.get(0);
            String groupStr = csvRecord.get(3);

            if(groupStr == null || groupStr.isEmpty()) {
                continue;
            }
            subscriberGroups.put(imsi, groupStr);
        }

        return subscriberGroups;
    }

下面是文件的一个示例(我知道结尾有个逗号,最后一列暂时为空):

450000000000001,450000000001,7752,Tier-2,
450000000000002,450000000002,1112,Tier-1,
450000000000003,450000000003,6058,Tier-2,
oxcyiej7

oxcyiej71#

来自robert meztger(apache flink开发人员):
我可以解释为什么你的第一种方法不起作用:
您试图使用我们的rpc系统(akka)将csv文件从flink客户端发送到集群。当您向flink提交作业时,我们将序列化用户创建的所有对象(Map器、源等)并将其发送到集群。有一个方法streamexecutionenvironment.fromelements(..)允许用户在提交作业时序列化一些对象。但是像这样传输的数据量受到akka帧大小的限制。在我们的例子中,我认为默认值是10兆字节。之后,akka可能会删除或拒绝部署消息。
解决方案是使用rich操作符而不是常规操作符(例如richmapfunction而不是mapfunction),重写open()方法并在该方法中加载csv文件。
谢谢罗伯特!

相关问题