我有以下结构
--- Stream A [ A.map1.diff() ] ---
| |
| |
Source --- --- Stream B [ B.map5.diff() ] --- --- combineLatest(A,B,C)
| |
| |
--- Stream C [ C.map9.diff() ] ---
源函数总是输出所有Map的当前总值。单个流函数应该只输出它们各自Map的差值。diff函数是可流动的扩展,肯定能工作(在其他地方使用)。如果没有变化,diff函数不会输出。
现在我们假设map9有差异,combineLatest现在会有map9的更新值,但是map1和map5仍然有第一次创建流时的原始状态,这意味着创建时的所有值,因为我们首先需要所有值才能区分,当map5有差异时,map1和map9也是如此。
就好像每个Stream都有combineLatest块的三个独立示例,并且每个示例都只有各自map的更新值。
现在的问题是,combineLatest发出的每个更新都是巨大的,因为它基本上总是所有的值,而没有触发它的值。
我已经研究了Splitting and then combining streams with RX和Recombining elements from the same reactive stream(我知道的不同语言)来尝试解决这个问题,但没有成功。
编辑:这是套接字连接结构的一部分,我不能改变。有一个所有连接共享的根可流动性和一个单独的订阅可流动性。我将添加一个它看起来如何的摘要。
Edit2:我不赞成拆分可流动流的想法,如果有一种方法可以编辑FlowAgg
的字段而不拆分它,例如一个接一个地做,我也很乐意接受。
摘录:
data class FlowAgg(
val devices: Map<Int, Device),
val assignments: Map<Int, Assignment),
val systemtime: Map<Int, Timestamp)
)
data class Summary(
val id: Int,
val device: Device? = null,
val assigment: Assignment? = null,
val systemtime: Timestamp? = null
)
[...]
socketTopic(
path = "/summary",
root = { _ ->
Flowables.combineLatest(
DeviceFlowable,
AssignmentFlowable,
SystemtimeFlowable
) { devices, assignments, systemtime ->
FlowAgg(
devices = devices,
assignments= assignments,
systemtime = systemtime ,
)
},
subscription = { broadcast ->
broadcast
.publish { flow -> // flow: Flowable<FlowAgg>
Flowables.combineLatest(
flow.map { it.devices }.diff(),
flow.map { it.assignments }.diff(),
flow.map { it.systemTime }.diff()
) { devices, assignments, systemTime ->
val keys = devices.keys + assignments.keys + systemTime.keys
keys.map {
Summary(
id = it,
devices = devices[it],
assignments = devices[it],
systemtime = devices[it]
)
}
.map {
Json.encodeToString(ListSerializer(Summary.serializer()), it)
}
}
}
)
1条答案
按热度按时间mqxuamgl1#
我已经找到了combineLatest块并行执行问题的罪魁祸首。我推测,单独的发布导致了线程上下文的分裂。
它现在的行为符合预期。
我更改了订阅,如下所示:
编辑:我还修改了diff函数,使其接受一个标志,在没有修改的情况下也发出一个emptyMap,并将combineLatest替换为zip,删除了去抖动。