我试图通过典型的推特流媒体示例来学习Kafka。我试图使用我的生产者流推特数据基于2个过滤器到同一主题的不同分区。例如,twitter数据track='google'到一个分区,track='apple'到另一个分区。
class Producer(StreamListener):
def __init__(self, producer):
self.producer = producer
def on_data(self, data):
self.producer.send(topic_name, value=data)
return True
def on_error(self, error):
print(error)
twitter_stream = Stream(auth, Producer(producer))
twitter_stream.filter(track=["Google"])
如何添加另一个磁道并将数据流到另一个分区。
同样,如何让我的消费者从特定分区消费。
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
enable_auto_commit=True,
auto_commit_interval_ms = 5000,
max_poll_records = 100,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
2条答案
按热度按时间2w2cym1i1#
经过一番研究,我终于解决了这个问题:
在producer端,指定分区:
在消费者方面,
isr3a4wc2#
kafka在消息的键上划分数据。在给定的代码中,您只传递
value
对于生产者消息,因此密钥将为null,因此将在所有分区之间进行循环。请参阅Kafka库的文档,了解如何为每条消息提供密钥