java 拆分流,编辑其字段,然后将其重新组合为单个流

5us2dqdw  于 2023-02-02  发布在  Java
关注(0)|答案(1)|浏览(143)

我有以下结构

  1. --- Stream A [ A.map1.diff() ] ---
  2. | |
  3. | |
  4. Source --- --- Stream B [ B.map5.diff() ] --- --- combineLatest(A,B,C)
  5. | |
  6. | |
  7. --- Stream C [ C.map9.diff() ] ---

源函数总是输出所有Map的当前总值。单个流函数应该只输出它们各自Map的差值。diff函数是可流动的扩展,肯定能工作(在其他地方使用)。如果没有变化,diff函数不会输出。
现在我们假设map9有差异,combineLatest现在会有map9的更新值,但是map1和map5仍然有第一次创建流时的原始状态,这意味着创建时的所有值,因为我们首先需要所有值才能区分,当map5有差异时,map1和map9也是如此。
就好像每个Stream都有combineLatest块的三个独立示例,并且每个示例都只有各自map的更新值。
现在的问题是,combineLatest发出的每个更新都是巨大的,因为它基本上总是所有的值,而没有触发它的值。
我已经研究了Splitting and then combining streams with RXRecombining elements from the same reactive stream(我知道的不同语言)来尝试解决这个问题,但没有成功。
编辑:这是套接字连接结构的一部分,我不能改变。有一个所有连接共享的根可流动性和一个单独的订阅可流动性。我将添加一个它看起来如何的摘要。
Edit2:我不赞成拆分可流动流的想法,如果有一种方法可以编辑FlowAgg的字段而不拆分它,例如一个接一个地做,我也很乐意接受。
摘录:

  1. data class FlowAgg(
  2. val devices: Map<Int, Device),
  3. val assignments: Map<Int, Assignment),
  4. val systemtime: Map<Int, Timestamp)
  5. )
  6. data class Summary(
  7. val id: Int,
  8. val device: Device? = null,
  9. val assigment: Assignment? = null,
  10. val systemtime: Timestamp? = null
  11. )
  12. [...]
  13. socketTopic(
  14. path = "/summary",
  15. root = { _ ->
  16. Flowables.combineLatest(
  17. DeviceFlowable,
  18. AssignmentFlowable,
  19. SystemtimeFlowable
  20. ) { devices, assignments, systemtime ->
  21. FlowAgg(
  22. devices = devices,
  23. assignments= assignments,
  24. systemtime = systemtime ,
  25. )
  26. },
  27. subscription = { broadcast ->
  28. broadcast
  29. .publish { flow -> // flow: Flowable<FlowAgg>
  30. Flowables.combineLatest(
  31. flow.map { it.devices }.diff(),
  32. flow.map { it.assignments }.diff(),
  33. flow.map { it.systemTime }.diff()
  34. ) { devices, assignments, systemTime ->
  35. val keys = devices.keys + assignments.keys + systemTime.keys
  36. keys.map {
  37. Summary(
  38. id = it,
  39. devices = devices[it],
  40. assignments = devices[it],
  41. systemtime = devices[it]
  42. )
  43. }
  44. .map {
  45. Json.encodeToString(ListSerializer(Summary.serializer()), it)
  46. }
  47. }
  48. }
  49. )
mqxuamgl

mqxuamgl1#

我已经找到了combineLatest块并行执行问题的罪魁祸首。我推测,单独的发布导致了线程上下文的分裂。
它现在的行为符合预期。
我更改了订阅,如下所示:

  1. subscription = { broadcast ->
  2. Flowables.combineLatest(
  3. broadcast.map { it.devices }.diff(),
  4. broadcast.map { it.assignments }.diff(),
  5. broadcast.map { it.systemTime }.diff()
  6. ) { devices, assignments, systemTime ->
  7. FlowAgg(devices, assignments, systemTime)
  8. }
  9. .debounce(5, TimeUnit.MILLISECONDS)
  10. .map { (devices, assignments, systemTime) ->
  11. val keys = devices.keys + assignments.keys + systemTime.keys
  12. keys.map {
  13. Summary(
  14. id = it,
  15. devices = devices[it],
  16. assignments = devices[it],
  17. systemtime = devices[it]
  18. )
  19. }
  20. .map {
  21. Json.encodeToString(ListSerializer(Summary.serializer()), it)
  22. }
  23. }
  24. }

编辑:我还修改了diff函数,使其接受一个标志,在没有修改的情况下也发出一个emptyMap,并将combineLatest替换为zip,删除了去抖动。

  1. subscription = { broadcast ->
  2. Flowables.zip(
  3. broadcast.map { it.devices }.diff(),
  4. broadcast.map { it.assignments }.diff(),
  5. broadcast.map { it.systemTime }.diff()
  6. ) { devices, assignments, systemTime ->
  7. FlowAgg(devices, assignments, systemTime)
  8. }
  9. .map { (devices, assignments, systemTime) ->
  10. val keys = devices.keys + assignments.keys + systemTime.keys
  11. keys.map {
  12. Summary(
  13. id = it,
  14. devices = devices[it],
  15. assignments = devices[it],
  16. systemtime = devices[it]
  17. )
  18. }
  19. .map {
  20. Json.encodeToString(ListSerializer(Summary.serializer()), it)
  21. }
  22. }
  23. }
展开查看全部

相关问题