本地模式下运行flink和Yarn簇的不同结果

gcuhipw9  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(397)

我使用flinkjavaapi运行一个代码,从kafka获取一些字节,然后使用另一个库静态方法(解析和插入结果都由库完成)将其插入cassandra数据库,对其进行解析。在ide的local上运行代码,我得到了想要的答案,但是在yarn cluster上运行parse方法并没有像预期的那样工作!

public class Test {
    static HashMap<Integer, Object> ConfigHashMap = new HashMap<>();

    public static void main(String[] args) throws Exception {

        CassandraConnection.connect();
        Parser.setInsert(true);

        stream.flatMap(new FlatMapFunction<byte[], Void>() {
            @Override
            public void flatMap(byte[] value, Collector<Void> out) throws Exception {
                Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
                // Parser.parse(ByteBuffer.wrap(value));
            }
        });
        env.execute();
    }
}

中有一个静态hashmap字段 Parser 类,使解析数据的配置基于它的信息,并且数据将在执行期间插入它。在Yarn上运行的问题是,此数据不适用于 taskmanagers 他们只是打印 config is not available! 所以我重新定义hashmap作为 parse 方法,但结果没有差异!
我怎样才能解决这个问题?

2fjabf4q

2fjabf4q1#

我将静态方法和字段更改为非静态的,并使用 RichFlatMapFunction 解决了问题。

stream.flatMap(new RichFlatMapFunction<byte[], Void>() {
            CassandraConnection con = new CassandraConnection();
            int i = 0 ;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                con.connect();
            }

            @Override
            public void flatMap(byte[] value, Collector<Void> out) throws Exception {

                ByteBuffer tb = ByteBuffer.wrap(value);
                np.parse(tb, ConfigHashMap, con);
            }
        });

相关问题