阅读Kafka围棋中带有特定id的信息

d7v8vwbk  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(299)

我正在kafka中使用kafka go库构建一个请求-响应设置,使用消息键作为相关id。我的设置在没有并发的情况下工作正常,但是当消息开始在单独的goroutine中发送时,reader部分跳过正确的键(因为其他例程可能已经读取了它)。
考虑到连接被不同的goroutine共享,如何从一个主题中只读取一个特定的键?
下面的客户端示例(为了简洁起见,删除了错误评估):

package main

import (
    "bytes"
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/google/uuid"
    kafka "github.com/segmentio/kafka-go"
)

var wg sync.WaitGroup

func requestMessage(connR *kafka.Conn, connW *kafka.Conn, body []byte, index int) {
    currentUUID := uuid.New()
    byteUUID := []byte(fmt.Sprintf("%s", currentUUID))
    connW.WriteMessages(kafka.Message{
        Key:   byteUUID,
        Value: body,
    })
    fmt.Println("Posted id " + string(byteUUID))
    for {
        m, _ := connR.ReadMessage(10e6)
        if bytes.Equal(m.Key, byteUUID) {
            break
        }
    }

    wg.Done()
    fmt.Println("Done " + string(byteUUID))

}

func main() {
    iterations := 100
    interval := 500 * time.Millisecond
    kafkaURL := "kafka:9092"
    topic := "benchmarktopic"
    partition := 0
    connW, _ := kafka.DialLeader(context.Background(), "tcp", kafkaURL, topic, partition)
    defer connW.Close()
    connR, _ := kafka.DialLeader(context.Background(), "tcp", kafkaURL, topic+"response", partition)
    defer connR.Close()
    for i := 0; i < iterations; i++ {
        <-time.After(interval)
        go requestMessage(connR, connW, []byte("body"), i)
        wg.Add(1)
    }
    wg.Wait()
}
2vuwiymt

2vuwiymt1#

实际上,您不能从kafka主题分区中仅读取特定密钥。问题是,您的记录将根据密钥的散列(默认行为)被分派到特定的分区。所以在同一个分区中可能有不同的密钥。因此,只要您的键多于分区数,您就会发现一个分区包含不同的键。
我想到的唯一一种方法是为主题设置n个分区,其中n是可以拥有的不同键的数量(如果使用uuid作为键,则会有一个很大的数字),并使用静态Map(键->分区)将分区分配给生产者/消费者。
顺便说一句,你已经把第0部分分配给你的制作人了,想知道为什么吗?
扬尼克

相关问题