我使用flinkjavaapi运行一个代码,从kafka获取一些字节,然后使用另一个库静态方法(解析和插入结果都由库完成)将其插入cassandra数据库,对其进行解析。在ide的local上运行代码,我得到了想要的答案,但是在yarn cluster上运行parse方法并没有像预期的那样工作!
public class Test {
static HashMap<Integer, Object> ConfigHashMap = new HashMap<>();
public static void main(String[] args) throws Exception {
CassandraConnection.connect();
Parser.setInsert(true);
stream.flatMap(new FlatMapFunction<byte[], Void>() {
@Override
public void flatMap(byte[] value, Collector<Void> out) throws Exception {
Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
// Parser.parse(ByteBuffer.wrap(value));
}
});
env.execute();
}
}
中有一个静态hashmap字段 Parser
类,使解析数据的配置基于它的信息,并且数据将在执行期间插入它。在Yarn上运行的问题是,此数据不适用于 taskmanagers
他们只是打印 config is not available!
所以我重新定义hashmap作为 parse
方法,但结果没有差异!
我怎样才能解决这个问题?
1条答案
按热度按时间2fjabf4q1#
我将静态方法和字段更改为非静态的,并使用
RichFlatMapFunction
解决了问题。