在使用持久化参与者和日志使用jdbc时。所有发送给持久化参与者的消息都是死信。但是,我看不出原因,因为我直接将它们发送给了持久化参与者。
永久执行元代码:
case class ExampleState(events: List[String] = Nil) {
def updated(evt: Evt): ExampleState = copy(evt.data :: events)
def size: Int = events.length
override def toString: String = events.reverse.toString
}
class ExampleActor extends PersistentActor {
override def persistenceId = "sample-id-1"
var state = ExampleState()
def updateState(event: Evt): Unit = {
state = state.updated(event)
}
def numEvents =
state.size
override def receiveRecover: Receive = {
case evt: Evt => updateState(evt)
case SnapshotOffer(_, snapshot: ExampleState) => state = snapshot
}
val snapShotInterval = 1000
override def receiveCommand: Receive= {
case Cmd(data) => {
println("in the command code block")
persist(Evt(s"${data}-${numEvents}")) { event => {
updateState(event)
context.system.eventStream.publish(event)
if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0)
saveSnapshot(state)
}
}
}
case Shutdown => context.stop(self)
case "print"=>println(state)
}
}
测试代码(发送到持久执行元的所有消息都是死信):
"The example persistent actor" should {
"Test Command" in {
val persistentActor = system.actorOf(Props[ExampleActor](),"examplePersistentactor")
Thread.sleep(2000)
println("before the send")
persistentActor ! Cmd("foo")
persistentActor ! Cmd("bar")
persistentActor ! Cmd("fizz")
persistentActor ! Cmd("buzz")
persistentActor ! "print"
Thread.sleep(10000)
persistentActor ! Shutdown
println("after messages should be sent and received")
}
}
2条答案
按热度按时间4nkexdtk1#
当没有执行元示例正常运行时,就会发生死信。无论消息发送到的执行元是否为持久执行元,传递消息过程都是相同的。
因此,我认为当您向持久化参与者发送消息时,它实际上并没有运行,这可能是因为持久化设置没有正确配置。
我使用In-Memory持久性运行了您的代码(将Cmd和Evt更改为String类型),它工作正常。
q5lcpyga2#
感谢您的回复!您是否熟悉jdbc插件?我配置了journal/snapshot和光滑的数据库连接,它似乎符合插件的文档。您是否发现有任何错误/遗漏?application.conf: