corda:collectsignaturesflow产生unsupportedoperationexception

n3ipq98p  于 2021-07-06  发布在  Java
关注(0)|答案(0)|浏览(273)

我发现了 CollectSignaturesFlow 产生 UnsupportedOperationException 因为在某个时刻,krio无法序列化流中使用的某个集合。其中使用的p2p collectsignatureflow工作正常,没有中断。我还以更简单的方式实现了收集签名(是的,没有那么多检查,但它只是为了测试目的),它完成了这项工作。
代码:

@InitiatingFlow
    @StartableByRPC
    class CheckoutClientFlow(
            private val person: Person = ClientAccountFlows.defaultPerson,
            private val partiesAndAmounts: Map<CordaX500Name, Long>
    ) : FlowLogic<SignedTransaction>() {
        override val progressTracker: ProgressTracker = tracker()
        private var selfIncluded = false
        @Suspendable
        override fun call(): SignedTransaction {
            progressTracker.currentStep = NotaryInfo
            val notary: Party = getPreferredNotary(serviceHub)
            progressTracker.currentStep = LocateOtherParties
            val parties = locateParties(partiesAndAmounts.keys).toMutableSet()
            if (parties.contains(ourIdentity)) {
                selfIncluded = true
                parties.remove(ourIdentity)
            }
            val inputs = subFlow(GatherStateAndRefFlow(person, parties))
            val otherPartiesSessions = parties.map { initiateFlow(it) }
            progressTracker.currentStep = CreateTransaction
            val inputsWithParties = inputs.mapKeys { entry -> locateParty(entry.key) }
            val txBuilder = createTxBuilder(notary, inputsWithParties)
            var onceSignedTransaction = serviceHub.signInitialTransaction(txBuilder)
            progressTracker.currentStep = SigningByOtherParty
            val signedTx = subFlow(CollectSignaturesFlow(onceSignedTransaction, otherPartiesSessions))
//            otherPartiesSessions.forEach {
//                onceSignedTransaction += subFlow(CollectSignatureFlow(onceSignedTransaction, it))
//            }
            progressTracker.currentStep = Finalize
            return subFlow(FinalityFlow(transaction = onceSignedTransaction, sessions = otherPartiesSessions))
        }
    }
}

测试:

@Test
    fun `subtract if both parties are present other`() {
        val person = defaultPerson
        val partiesAndAmounts = mapOf(Pair(aNodeName, defaultMovableAmount), Pair(bNodeName, defaultMovableAmount))
        val resultFuture = a.startFlow(ExchangeValueMultipleParties.CheckoutClientFlow(
                person = defaultPerson,
                partiesAndAmounts = partiesAndAmounts
        ))
        network.runNetwork()
        val res1 = resultFuture.get()
        assertNotNull(res1)
        assertNotEquals(0, res1.inputs.size)
        assertEquals(partiesAndAmounts.size, res1.tx.outputs.size)
        assertEquals(partiesAndAmounts.size, res1.sigs.size)
        val stateA = a.services.vaultService.queryBy<ClientAccountState>(accountQueryCriteria).states.first { it.state.data.owner == person }
        assertNotNull(stateA)
//        assertEquals(defaultAmount - defaultMovableAmount, getTrueAmount(stateA.state.data.balance))
        val stateB = b.services.vaultService.queryBy<ClientAccountState>(accountQueryCriteria).states.first { it.state.data.owner == person }
        assertNotNull(stateB)
//        assertEquals(defaultAmount - defaultMovableAmount, getTrueAmount(stateB.state.data.balance))
    }

堆栈跟踪:

[ERROR] 19:03:34+0000 [FiberDeserializationChecker] interceptors.FiberDeserializationChecker. - Encountered unrestorable checkpoint! [errorCode=aixjez, moreInformationAt=https://errors.corda.net/OS/4.4/aixjez]
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
dataObject (co.paralleluniverse.fibers.Stack)
stack (net.corda.node.services.statemachine.FlowStateMachineImpl)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
    at com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer.read(CompatibleFieldSerializer.java:145)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:782)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObjectOrNull(ReplaceableObjectKryo.java:107)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at co.paralleluniverse.fibers.Fiber$FiberSerializer.read(Fiber.java:2156)
    at co.paralleluniverse.fibers.Fiber$FiberSerializer.read(Fiber.java:2086)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readClassAndObject(ReplaceableObjectKryo.java:112)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1$1.invoke(KryoCheckpointSerializer.kt:83)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1$1.invoke(KryoCheckpointSerializer.kt:33)
    at net.corda.node.serialization.kryo.KryoStreams.kryoInput(KryoStreams.kt:20)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1.invoke(KryoCheckpointSerializer.kt:72)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1.invoke(KryoCheckpointSerializer.kt:33)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$kryo$1.execute(KryoCheckpointSerializer.kt:61)
    at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.run(KryoPoolQueueImpl.java:58)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer.kryo(KryoCheckpointSerializer.kt:57)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer.deserialize(KryoCheckpointSerializer.kt:71)
    at net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker$start$2.invoke(FiberDeserializationCheckingInterceptor.kt:101)
    at net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker$start$2.invoke(FiberDeserializationCheckingInterceptor.kt:51)
    at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
Caused by: java.lang.UnsupportedOperationException
    at java.util.AbstractCollection.add(AbstractCollection.java:262)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObject(ReplaceableObjectKryo.java:92)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObject(ReplaceableObjectKryo.java:92)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    ... 21 more
[ERROR] 19:03:34+0000 [FiberDeserializationChecker] interceptors.FiberDeserializationChecker. - Encountered unrestorable checkpoint! [errorCode=aixjez, moreInformationAt=https://errors.corda.net/OS/4.4/aixjez]
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
dataObject (co.paralleluniverse.fibers.Stack)
stack (net.corda.node.services.statemachine.FlowStateMachineImpl)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
    at com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer.read(CompatibleFieldSerializer.java:145)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:782)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObjectOrNull(ReplaceableObjectKryo.java:107)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at co.paralleluniverse.fibers.Fiber$FiberSerializer.read(Fiber.java:2156)
    at co.paralleluniverse.fibers.Fiber$FiberSerializer.read(Fiber.java:2086)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readClassAndObject(ReplaceableObjectKryo.java:112)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1$1.invoke(KryoCheckpointSerializer.kt:83)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1$1.invoke(KryoCheckpointSerializer.kt:33)
    at net.corda.node.serialization.kryo.KryoStreams.kryoInput(KryoStreams.kt:20)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1.invoke(KryoCheckpointSerializer.kt:72)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1.invoke(KryoCheckpointSerializer.kt:33)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$kryo$1.execute(KryoCheckpointSerializer.kt:61)
    at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.run(KryoPoolQueueImpl.java:58)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer.kryo(KryoCheckpointSerializer.kt:57)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer.deserialize(KryoCheckpointSerializer.kt:71)
    at net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker$start$2.invoke(FiberDeserializationCheckingInterceptor.kt:101)
    at net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker$start$2.invoke(FiberDeserializationCheckingInterceptor.kt:51)
    at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
Caused by: java.lang.UnsupportedOperationException
    at java.util.AbstractCollection.add(AbstractCollection.java:262)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObject(ReplaceableObjectKryo.java:92)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObject(ReplaceableObjectKryo.java:92)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    ... 21 more
[ERROR] 19:03:34+0000 [FiberDeserializationChecker] interceptors.FiberDeserializationChecker. - Encountered unrestorable checkpoint! [errorCode=aixjez, moreInformationAt=https://errors.corda.net/OS/4.4/aixjez]

似乎kryo试图调用abstractcollection不支持的add()方法,以重建流中使用的某个集合,尽管该方法已实现。
下面是abstractcollection中的实现。

public boolean add(E var1) {
        throw new UnsupportedOperationException();
    }

此外,在运行调试时,可能会观察到线程不足:

[WARN] 19:55:47,111 [HikariPool-3 housekeeper] pool.HikariPool. - HikariPool-3 - Thread starvation or clock leap detected (housekeeper delta=1m35s902ms765µs600ns).
[WARN] 19:55:47,111 [HikariPool-1 housekeeper] pool.HikariPool. - HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m49s546ms362µs501ns).
[WARN] 19:55:47,113 [HikariPool-2 housekeeper] pool.HikariPool. - HikariPool-2 - Thread starvation or clock leap detected (housekeeper delta=1m37s789ms188µs200ns).
[WARN] 19:55:47,114 [HikariPool-4 housekeeper] pool.HikariPool. - HikariPool-4 - Thread starvation or clock leap detected (housekeeper delta=1m33s803ms330µs901ns).

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题