flink:java.io.notserializableeexception:redis.clients.jedis.jediscluster

0x6upsns  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(678)

**结束。**此问题需要详细的调试信息。它目前不接受答案。
**想改进这个问题吗?**更新问题,使其成为堆栈溢出的主题。

一年前关门了。
改进这个问题
当我提交新的flink作业时,它抛出

Caused by: java.io.NotSerializableException: redis.clients.jedis.JedisCluster
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:512)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
    ... 24 more

这是我的密码:

JedisCluster jedisCluster = JedisClusterBuilder.getInstance(JedisClusterEnum.THIRD);

    DataStream<MobileClickEvent> clickEventDataStream = environment.addSource(clickConsumer);

    clickEventDataStream
            .filter(Objects::nonNull)
            .keyBy(new KeySelector<MobileClickEvent, String>() {
                @Override
                public String getKey(MobileClickEvent value) throws Exception {
                    return value.getItemId() + "_" + value.getItemType();
                }
            })
            .process(new KeyedProcessFunction<String, MobileClickEvent, Object>() {
                @Override
                public void processElement(MobileClickEvent value, Context ctx, Collector<Object> out) throws Exception {
                    String key = ctx.getCurrentKey();
                    jedisCluster.hincrBy("{item_feature}" + key, "click", 1);
                    jedisCluster.expire("{item_feature}" + key, 60 * 10);
                }
            });
cunj1qz1

cunj1qz11#

在op的回答中, jedisCluster 将为每个元素初始化。
考虑覆盖 open 同时,在那里初始化。
函数的初始化方法。它在实际工作方法(如map或join)之前调用,因此适合于一次性设置工作。

.process(new KeyedProcessFunction<String, MobileClickEvent, Object>() {
    private JedisCluster jedisCluster;

    @Override
    public void open(Configuration parameters) {
        jedisCluster = JedisClusterBuilder.getInstance(JedisClusterEnum.THIRD);
    }

    @Override
    public void processElement(MobileClickEvent value, Context ctx, Collector<Object> out) throws Exception {
        String key = ctx.getCurrentKey();
        jedisCluster.hincrBy(REDIS_PREFIX + key, "click", 1);
        jedisCluster.expire(REDIS_PREFIX + key, 60 * 10);
    }
});

相关问题