java 拆分流,编辑其字段,然后将其重新组合为单个流

5us2dqdw  于 2023-02-02  发布在  Java
关注(0)|答案(1)|浏览(121)

我有以下结构

--- Stream A [ A.map1.diff() ] ---
          |                                     |
          |                                     |
Source --- --- Stream B [ B.map5.diff() ] --- --- combineLatest(A,B,C)
          |                                     |
          |                                     |
           --- Stream C [ C.map9.diff() ] ---

源函数总是输出所有Map的当前总值。单个流函数应该只输出它们各自Map的差值。diff函数是可流动的扩展,肯定能工作(在其他地方使用)。如果没有变化,diff函数不会输出。
现在我们假设map9有差异,combineLatest现在会有map9的更新值,但是map1和map5仍然有第一次创建流时的原始状态,这意味着创建时的所有值,因为我们首先需要所有值才能区分,当map5有差异时,map1和map9也是如此。
就好像每个Stream都有combineLatest块的三个独立示例,并且每个示例都只有各自map的更新值。
现在的问题是,combineLatest发出的每个更新都是巨大的,因为它基本上总是所有的值,而没有触发它的值。
我已经研究了Splitting and then combining streams with RXRecombining elements from the same reactive stream(我知道的不同语言)来尝试解决这个问题,但没有成功。
编辑:这是套接字连接结构的一部分,我不能改变。有一个所有连接共享的根可流动性和一个单独的订阅可流动性。我将添加一个它看起来如何的摘要。
Edit2:我不赞成拆分可流动流的想法,如果有一种方法可以编辑FlowAgg的字段而不拆分它,例如一个接一个地做,我也很乐意接受。
摘录:

data class FlowAgg(
    val devices: Map<Int, Device),
    val assignments: Map<Int, Assignment),
    val systemtime: Map<Int, Timestamp)
)
data class Summary(
    val id: Int,
    val device: Device? = null,
    val assigment: Assignment? = null,
    val systemtime: Timestamp? = null
)

[...]

socketTopic(
    path = "/summary",
    root = { _ ->
        Flowables.combineLatest(
            DeviceFlowable,
            AssignmentFlowable,
            SystemtimeFlowable
        ) { devices, assignments, systemtime ->
            FlowAgg(
                devices = devices,
                assignments= assignments,
                systemtime = systemtime ,
            )
        },
    subscription = { broadcast ->
        broadcast
            .publish { flow -> // flow: Flowable<FlowAgg>
                Flowables.combineLatest(
                    flow.map { it.devices }.diff(),
                    flow.map { it.assignments }.diff(),
                    flow.map { it.systemTime }.diff()
                ) { devices, assignments, systemTime ->
                    val keys = devices.keys + assignments.keys + systemTime.keys
                    keys.map {
                        Summary(
                            id = it,
                            devices = devices[it],
                            assignments = devices[it],
                            systemtime = devices[it]
                        )
                }
                .map {
                    Json.encodeToString(ListSerializer(Summary.serializer()), it)
                }
            }
    }
)
mqxuamgl

mqxuamgl1#

我已经找到了combineLatest块并行执行问题的罪魁祸首。我推测,单独的发布导致了线程上下文的分裂。
它现在的行为符合预期。
我更改了订阅,如下所示:

subscription = { broadcast ->
        Flowables.combineLatest(
            broadcast.map { it.devices }.diff(),
            broadcast.map { it.assignments }.diff(),
            broadcast.map { it.systemTime }.diff()
        ) { devices, assignments, systemTime ->
            FlowAgg(devices, assignments, systemTime)
        }
        .debounce(5, TimeUnit.MILLISECONDS)
        .map { (devices, assignments, systemTime) ->
            val keys = devices.keys + assignments.keys + systemTime.keys
            keys.map {
                Summary(
                    id = it,
                    devices = devices[it],
                    assignments = devices[it],
                    systemtime = devices[it]
               )
        }
        .map {
            Json.encodeToString(ListSerializer(Summary.serializer()), it)
        }
    }
}

编辑:我还修改了diff函数,使其接受一个标志,在没有修改的情况下也发出一个emptyMap,并将combineLatest替换为zip,删除了去抖动。

subscription = { broadcast ->
        Flowables.zip(
            broadcast.map { it.devices }.diff(),
            broadcast.map { it.assignments }.diff(),
            broadcast.map { it.systemTime }.diff()
        ) { devices, assignments, systemTime ->
            FlowAgg(devices, assignments, systemTime)
        }
        .map { (devices, assignments, systemTime) ->
            val keys = devices.keys + assignments.keys + systemTime.keys
            keys.map {
                Summary(
                    id = it,
                    devices = devices[it],
                    assignments = devices[it],
                    systemtime = devices[it]
               )
        }
        .map {
            Json.encodeToString(ListSerializer(Summary.serializer()), it)
        }
    }
}

相关问题