kafka:如何使每个msg从一开始就只被一组人消费一次

c0vxltue  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(294)

我在这里使用kafka consumer(1.3.1版)。
我要做的是:
有10个分区。每个分区从偏移量0开始。
有一群消费者(例如1、2、3)。
有时,一个消费者在下降或上升。
因此,组成员可能会发生变化。但我希望每个分区中的每条消息只能被组使用一次(1、2或3)。
我的代码是:

consumer = KafkaConsumer('my_topic',
            bootstrap_servers=['ip:9092'],
            auto_offset_reset='earliest',
            max_partition_fetch_bytes=131072,
            group_id='writer.test')

以上配置是否足够?欢迎任何评论。谢谢
更新
我试过以下代码。每次在分区760中,每个消息可能被一个组中的两个使用者消费两次。为什么?出什么事了?

def test():
    #PULL FROM KAFKA
    consumer = KafkaConsumer(
            'topic',
            bootstrap_servers=[ip],
            auto_offset_reset='latest',
            max_partition_fetch_bytes=131072,
            auto_commit_interval_ms=500,
            group_id='writer2.test')

    print consumer.poll()
    for i in range(10000):
        msg = next(consumer)
        if str(msg[1])=='670':
            print 'partition= %s, offset= %s' % (msg[1], msg[2])
    consumer.unsubscribe()

if __name__ == "__main__":
    for i in range(10):
        import time
        time.sleep(5)
        test()

输出1:

{}
partition= 670, offset= 224
partition= 670, offset= 225
partition= 670, offset= 226
partition= 670, offset= 227
partition= 670, offset= 228
partition= 670, offset= 229
partition= 670, offset= 230
partition= 670, offset= 231
partition= 670, offset= 232
partition= 670, offset= 233
partition= 670, offset= 234
partition= 670, offset= 235
partition= 670, offset= 236
partition= 670, offset= 237
partition= 670, offset= 238
partition= 670, offset= 239
partition= 670, offset= 240
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259

在另一个窗口中运行同一个文件,输出:

{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
4c8rllxm

4c8rllxm1#

如果使用使用者组,kafka至少提供一次传递保证,因此,在使用者失败时,重新分配这些使用者的分区,一些消息可能会被第二次传递。
如果要确保没有消息被处理两次,可以将模式切换为最多一次传递。但是,如果失败,您可能会丢失一些消息(即从未处理)。
要最多启用一次,需要禁用自动提交,然后直接手动提交 poll ,即,在开始处理通过 poll .
看到了吗http://docs.confluent.io/3.0.0/clients/consumer.html#detailed-有关更多详细信息,请参见示例(即使示例不是python的,一般模式也是相同的)。

相关问题