在stormcrawler中向元数据添加某些键时发生java.util.concurrentmodificationexception

vs3odd8k  于 2021-07-15  发布在  Storm
关注(0)|答案(1)|浏览(524)

我在元数据中添加了一个字段,用于传输和持久化状态索引。该字段是一个字符串列表,其名称为input\u keywords。在strom集群中运行topology后,该拓扑将暂停,并显示以下日志:

java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.disruptor$consume_loop_STAR_$fn__4132.invoke(disruptor.clj:84) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
    at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.worker$mk_transfer_fn$transfer_fn__10378.invoke(worker.clj:203) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.executor$start_batch_transfer_GT_worker_handler_BANG$fn__10056.invoke(executor.clj:314) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.disruptor$clojure_handler$reify__4115.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    ... 6 more
Caused by: java.util.ConcurrentModificationException
    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437) ~[?:1.8.0_112]
    at java.util.HashMap$EntryIterator.next(HashMap.java:1471) ~[?:1.8.0_112]
    at java.util.HashMap$EntryIterator.next(HashMap.java:1469) ~[?:1.8.0_112]
    at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:99) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
    at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.worker$mk_transfer_fn$transfer_fn__10378.invoke(worker.clj:203) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.executor$start_batch_transfer_GT_worker_handler_BANG$fn__10056.invoke(executor.clj:314) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.disruptor$clojure_handler$reify__4115.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    ... 6 more
2021-02-27 08:03:34.276 o.a.s.d.executor Thread-20-disruptor-executor[45 45]-send-queue [ERROR] 
java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.disruptor$consume_loop_STAR_$fn__4132.invoke(disruptor.clj:84) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
    at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.worker$mk_transfer_fn$transfer_fn__10378.invoke(worker.clj:203) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.executor$start_batch_transfer_GT_worker_handler_BANG$fn__10056.invoke(executor.clj:314) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.disruptor$clojure_handler$reify__4115.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    ... 6 more
Caused by: java.util.ConcurrentModificationException
    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437) ~[?:1.8.0_112]
    at java.util.HashMap$EntryIterator.next(HashMap.java:1471) ~[?:1.8.0_112]
    at java.util.HashMap$EntryIterator.next(HashMap.java:1469) ~[?:1.8.0_112]
    at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:99) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
    at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.worker$mk_transfer_fn$transfer_fn__10378.invoke(worker.clj:203) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.executor$start_batch_transfer_GT_worker_handler_BANG$fn__10056.invoke(executor.clj:314) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.disruptor$clojure_handler$reify__4115.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    ... 6 more
2021-02-27 08:03:34.327 o.a.s.util Thread-20-disruptor-executor[45 45]-send-queue [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
    at org.apache.storm.daemon.worker$fn_10827$fn_10828.invoke(worker.clj:781) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.executor$mk_executor_data$fn_10034$fn_10035.invoke(executor.clj:281) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:494) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]

对于拓扑的每个组件,我们有不同的并行性提示。在元数据中添加输入关键字之后,我们得到了错误。错误的主要原因是什么?

iyzzxitl

iyzzxitl1#

您正在修改正在序列化的元数据示例。你不能这样做,请参阅风暴疑难解答页。
如1.16的发行说明所述,您可以锁定元数据。这不会解决问题,但会告诉您在代码中的何处写入元数据。

In our topology, we emitted the same metadata to multiple bolts at the same time.

神秘的解释。别那么做。

相关问题