python生成不同的kafka分区

vnzz0bqm  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(413)

我试图通过典型的推特流媒体示例来学习Kafka。我试图使用我的生产者流推特数据基于2个过滤器到同一主题的不同分区。例如,twitter数据track='google'到一个分区,track='apple'到另一个分区。

  1. class Producer(StreamListener):
  2. def __init__(self, producer):
  3. self.producer = producer
  4. def on_data(self, data):
  5. self.producer.send(topic_name, value=data)
  6. return True
  7. def on_error(self, error):
  8. print(error)
  9. twitter_stream = Stream(auth, Producer(producer))
  10. twitter_stream.filter(track=["Google"])

如何添加另一个磁道并将数据流到另一个分区。
同样,如何让我的消费者从特定分区消费。

  1. consumer = KafkaConsumer(
  2. topic_name,
  3. bootstrap_servers=['localhost:9092'],
  4. auto_offset_reset='latest',
  5. enable_auto_commit=True,
  6. auto_commit_interval_ms = 5000,
  7. max_poll_records = 100,
  8. value_deserializer=lambda x: json.loads(x.decode('utf-8')))
2w2cym1i

2w2cym1i1#

经过一番研究,我终于解决了这个问题:
在producer端,指定分区:

  1. self.producer.send(topic_name, value=data,partition=0)

在消费者方面,

  1. consumer = KafkaConsumer(
  2. bootstrap_servers=['localhost:9092'],
  3. auto_offset_reset='latest',
  4. enable_auto_commit=True,
  5. auto_commit_interval_ms = 5000,
  6. max_poll_records = 100,
  7. value_deserializer=lambda x: json.loads(x.decode('utf-8')))
  8. consumer.assign([TopicPartition('trial', 0)])
isr3a4wc

isr3a4wc2#

kafka在消息的键上划分数据。在给定的代码中,您只传递 value 对于生产者消息,因此密钥将为null,因此将在所有分区之间进行循环。
请参阅Kafka库的文档,了解如何为每条消息提供密钥

相关问题