在storm拓扑中获取notserializableexception错误

zpqajqem  于 2021-06-24  发布在  Storm
关注(0)|答案(2)|浏览(411)

storm版本:1.2.1,java版本:8
我正在用scala编写一个storm拓扑,在集群模式下运行时出现以下错误。我也能得到同样的结果 LocalCluster 模式以及配置: conf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, Boolean.box( true)) . 以下是跟踪:

2018-05-05 00:49:59,342 ERROR util [Thread-37-disruptor-executor[6 6]-send-queue] Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: java.io.NotSerializableException: com.fasterxml.jackson.databind.node.ObjectNode
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.disruptor$consume_loop_STAR_$fn__4492.invoke(disruptor.clj:84) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
Caused by: java.lang.RuntimeException: java.io.NotSerializableException: com.fasterxml.jackson.databind.node.ObjectNode
    at org.apache.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:41) ~[storm-core-1.2.1.jar:1.2.1]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
    at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.worker$assert_can_serialize.invoke(worker.clj:133) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.worker$mk_transfer_fn$fn__5204.invoke(worker.clj:213) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__4882.invoke(executor.clj:314) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1]
    ... 6 more
Caused by: java.io.NotSerializableException: com.fasterxml.jackson.databind.node.ObjectNode
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) ~[?:1.8.0_131]
    at org.apache.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:38) ~[storm-core-1.2.1.jar:1.2.1]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
    at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.worker$assert_can_serialize.invoke(worker.clj:133) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.worker$mk_transfer_fn$fn__5204.invoke(worker.clj:213) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__4882.invoke(executor.clj:314) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1]
    ... 6 more

就像暴风雪试图连载的那样 ObjectNode ,这是做不到的 NotSerializableException .
不应该 ObjectNodeserializable ? 我在这里看到一个关于这个的老讨论,但我觉得这应该是 serializable .
我尝试在storm配置中添加以下内容,但没有帮助。

conf.registerSerialization(classOf[com.fasterxml.jackson.databind.node.ObjectNode])

我还尝试添加 conf.setSkipMissingKryoRegistrations(false) ,但再次没有救援。
有什么合适的解决办法?

lo8azlld

lo8azlld1#

从@stig的答案中得到灵感,从这个答案中,每当我在螺栓之间传递这个,而不是在我的对象之间传递这个,我就序列化对象的。现在我在我的螺栓中发送这样的字节数组:

val messages = input.asInstanceOf[TupleImpl].get("Request").asInstanceOf[Array[Byte]].getObj[List[myObject]]
val objMapper = new ObjectMapper()
messages.foreach(message => collector.emit(new Values(objMapper.writeValueAsBytes(message))))

编辑1:

解决这个问题的另一种可能方法是(我没有尝试过,我通过发送字节来解决)为从一个螺栓传递到另一个螺栓的对象编写serialiser类,如下所述。以下是来自此链接的serialiser示例:

public class StockAvroSerializer extends Serializer<Stock> {

    private static final Logger LOG = LoggerFactory.getLogger(StockAvroSerializer.class);
    private Schema SCHEMA = Stock.getClassSchema();

    public void write(Kryo kryo, Output output, Stock object) {
        DatumWriter<Stock> writer = new SpecificDatumWriter<>(SCHEMA);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        try {
            writer.write(object, encoder);
            encoder.flush();
        } catch (IOException e) {
            LOG.error(e.toString(), e);
        }
        IOUtils.closeQuietly(out);
        byte[] outBytes = out.toByteArray();
        output.writeInt(outBytes.length, true);
        output.write(outBytes);
    }

    public Stock read(Kryo kryo, Input input, Class<Stock> type) {
        byte[] value = input.getBuffer();
        SpecificDatumReader<Stock> reader = new SpecificDatumReader<>(SCHEMA);
        Stock record = null;
        try {
            record = reader.read(null, DecoderFactory.get().binaryDecoder(value, null));
        } catch (IOException e) {
            LOG.error(e.toString(), e);
        }
        return record;
    }
}

编辑2:

我发现了objectnode无法序列化的原因: JsonNode 不知道如何仅使用序列化时可用的信息来序列化自身:没有要使用的objectmapper或jsongenerator;后者是它必须序列化自身(以及内容(如果有的话))的组件。它不能也应该尝试示例化它们(应该如何配置它们?);静态单例往往会在更大的系统中引起问题(一部分试图以一种方式配置它们,另一部分则不同)
但这是相当古老的沟通,在新版本中,我相信应该有一些机制,使其序列化。

6mzjoqzu

6mzjoqzu2#

ObjectNode 不可序列化(它不实现可序列化接口)。 conf.setSkipMissingKryoRegistrations(false) 是默认设置。看到了吗https://storm.apache.org/releases/2.0.0-snapshot/serialization.html,它描述了此属性的作用。我认为你不想改变你的情况。
添加 conf.registerSerialization(ObjectNode.class); 到拓扑配置应该工作,不知道为什么它不适合你。如果您不能让它工作,您可以通过在发出值之前序列化为map或string来解决它。

相关问题