Kotlin协程,ConcurrentHashMap

6ljaweal  于 2023-04-21  发布在  Kotlin
关注(0)|答案(1)|浏览(144)

我正在尝试将TDLib Java Example示例代码转换为Kotlin协程。我遇到了一些无法修复的问题。
在下面的代码片段中,当应用程序启动时,它有时会崩溃并出现并发异常

private val chats: ConcurrentHashMap<Long, TdApi.Chat> = ConcurrentHashMap()
private val mainChatList: NavigableSet<OrderedChat> = TreeSet()

suspend fun load(limit: Int) = withContext(Dispatchers.IO) {
    awaitAll(
        async {
            telegramRepository.loadChats(limit)
        },
        async {
            telegramRepository.newChatFlow
                .onEach { chat ->
                    chats[chat.id] = chat
                    val positions = chat.positions.clone()
                    chat.positions = arrayOfNulls(0)
                    
                    setChatPositions(chat, positions)
                }.collect()
        },
        async {
            telegramRepository.chatLastMessageFlow
                .onEach { updateChat ->
                    val chat = chats[updateChat.chatId]
                    if (chat != null) {
                        chat.lastMessage = updateChat.lastMessage
                        
                        setChatPositions(chat, updateChat.positions)
                    }
                }.collect()
        },
        async {
            telegramRepository.chatPositionFlow
                .onEach { updateChat ->
                    val chat = chats[updateChat.chatId]
                    if (chat != null) {
                        var i = 0
                        for (k in chat.positions.indices) {
                            if (chat.positions[k].list.constructor == TdApi.ChatListMain.CONSTRUCTOR) {
                                break
                            }
                            i++
                        }
                        val newPositions = arrayOfNulls<TdApi.ChatPosition>(
                            size = chat.positions.size + (if (updateChat.position.order == 0L) 0 else 1)
                                    - (if (i < chat.positions.size) 1 else 0)
                        )
                        var pos = 0
                        if (updateChat.position.order != 0L) {
                            newPositions[pos++] = updateChat.position
                        }
                        for (j in chat.positions.indices) {
                            if (i != j) {
                                newPositions[pos++] = chat.positions[j]
                            }
                        }
                        assert(pos == newPositions.size)
                        
                        setChatPositions(chat, newPositions)
                    }
                }.collect()
        }
    )
}

private fun setChatPositions(chat: TdApi.Chat, positions: Array<TdApi.ChatPosition?>) {
    //TODO: Concurrency exception
    for (position in chat.positions) {
        if (position.list.constructor == TdApi.ChatListMain.CONSTRUCTOR) {
            val isRemoved = mainChatList.remove(OrderedChat(chat.id, position))
            assert(isRemoved)
        }
    }
    chat.positions = positions
    for (position in chat.positions) {
        if (position.list.constructor == TdApi.ChatListMain.CONSTRUCTOR) {
            val isAdded: Boolean = mainChatList.add(OrderedChat(chat.id, position))
            assert(isAdded)
        }
    }
}

fun getData() = flow {
    //TODO: Concurrency exception
    while (true) {
        val chats = mainChatList.map { orderedChat ->
            val chat = chats[orderedChat.chatId]!!
            Chat(
                id = chat.id,
                title = chat.title.ifEmpty { "Deleted account" },
                photo = chat.photo?.let {
                    ProfilePhoto(
                        thumbnail = chat.photo!!.minithumbnail!!.data,
                        file = chat.photo!!.small
                    )
                },
                isPinned = orderedChat.position.isPinned,
                unreadCount = chat.unreadCount,
                lastMessage = chat.lastMessage
            )
        }.toList()
        emit(chats)
        delay(1000L)
    }
}

data class OrderedChat (
    val chatId: Long,
    val position: TdApi.ChatPosition
)

据我所知,这发生在我的ViewModel通过getData()方法接收数据的时候,同时,setChatPositions()方法被并行调用,修改了mainChatList元素。
然而,由于我在协程方面的经验不足,我不明白它是如何同步的。任何关于重构上述代码的帮助也是可以接受的,我相信我犯了足够多的错误。

gzszwxb4

gzszwxb41#

协程同步是使用Mutex完成的。创建一个Mutex属性,并在修改或迭代mainChatList的代码周围使用它withLock { }

private val mainChatListMutex = Mutex()

// ...

private fun setChatPositions(chat: TdApi.Chat, positions: Array<TdApi.ChatPosition?>) {
    //TODO: Concurrency exception
    for (position in chat.positions) {
        if (position.list.constructor == TdApi.ChatListMain.CONSTRUCTOR) {
            val isRemoved = mainChatListMutex.withLock { 
                mainChatList.remove(OrderedChat(chat.id, position)) 
            }
            assert(isRemoved)
        }
    }
    chat.positions = positions
    for (position in chat.positions) {
        if (position.list.constructor == TdApi.ChatListMain.CONSTRUCTOR) {
            val isAdded: Boolean = mainChatListMutex.withLock { 
                mainChatList.add(OrderedChat(chat.id, position))
            }
            assert(isAdded)
        }
    }
}

fun getData() = flow {
    //TODO: Concurrency exception
    while (true) {
        val chats = mainChatListMutex.withLock { 
                mainChatList.map { orderedChat ->
                val chat = chats[orderedChat.chatId]!!
                Chat(
                    id = chat.id,
                    title = chat.title.ifEmpty { "Deleted account" },
                    photo = chat.photo?.let {
                        ProfilePhoto(
                            thumbnail = chat.photo!!.minithumbnail!!.data,
                            file = chat.photo!!.small
                        )
                    },
                    isPinned = orderedChat.position.isPinned,
                    unreadCount = chat.unreadCount,
                    lastMessage = chat.lastMessage
                )
            }
        }
        emit(chats)
        delay(1000L)
    }
}

如何通过反复检查来避免浪费资源,并避免不必要地使用!!。我不能在minithumbnail之后修复!!,因为我不知道您的ProfilePhoto类是如何工作的。我不知道为什么您可以认为在这里使用!!是安全的。

private val chatsChangedFlow = MutableSharedFlow<Unit>(replay = 1).apply {
    tryEmit(Unit)
}

// Call chatsChangedFlow.tryEmit(Unit) whenever you change the underlying data.

val data = chatsChangedFlow.map {
    mainChatListMutex.withLock { 
        mainChatList.map { orderedChat ->
            val chat = chats.getValue(orderedChat.chatId)
            Chat(
                id = chat.id,
                title = chat.title.ifEmpty { "Deleted account" },
                photo = chat.photo?.let { 
                    ProfilePhoto(
                        thumbnail = it.minithumbnail!!.data,
                        file = it.small
                    )
                },
                isPinned = orderedChat.position.isPinned,
                unreadCount = chat.unreadCount,
                lastMessage = chat.lastMessage
            )
        }
    }
}

相关问题