我试图在Kafka python中构建流表连接操作,下面是对生产者发送的数据执行连接操作的代码
stream_table_join.py
from confluent_kafka import DeserializingConsumer, SerializingProducer
from confluent_kafka.serialization import StringDeserializer, StringSerializer
from confluent_kafka import Consumer
from confluent_kafka.schema_registry.json_schema import JSONDeserializer
import json
class LookupData:
def __init__(self, id, name):
self.id = id
self.name = name
# Kafka configuration
bootstrap_servers = 'localhost:9092'
events_topic = 'events_topic'
# Configure the Kafka consumer and producer
consumer = DeserializingConsumer({
'bootstrap.servers': bootstrap_servers,
'key.deserializer': StringDeserializer(),
'value.deserializer': JSONDeserializer(LookupData, from_dict=lambda x: LookupData(x['id'], x['name'])),
'group.id': 'stream-table-join-group'
})
consumer.subscribe([events_topic])
producer = SerializingProducer({
'bootstrap.servers': bootstrap_servers,
'key.serializer': StringSerializer(),
'value.serializer': StringSerializer()
})
# Load the lookup data from the lookup table
lookup_table = {}
# Join operation function
def join_operation(event):
""" Join Operation here """
# Start consuming and producing messages
while True:
event = json.loads(msg.value())
enriched_event = join_operation(event)
producer.produce(events_topic, value=json.dumps(enriched_event))
producer.flush()
consumer.close()
存在如下共享的部分'value.deserializer': JSONDeserializer(LookupData, from_dict=lambda x: LookupData(x['id'], x['name'])),
它利用从生产者发送的数据对象,并在JSONDeserializer
的帮助下转换为LookupData
类对象,这样做是为了可以加入额外的字段,但每次执行代码文件时,它只是显示错误
TypeError: You must pass either str or Schema
我试着运行了两次1。生产者运行和2。没有生产者运行,两次我有相同的错误,并没有足够的解释究竟是什么错误或如何解决这个问题
下面是来自producer.py
的示例数据
{
"id": 1,
"name": "John Doe",
}
我试着在有生产者和没有生产者的情况下运行代码,两次我都收到了相同的错误,甚至检查了文档并试图找到错误的解决方案,但没有找到
1条答案
按热度按时间vom3gejh1#
它为我工作,如果可以帮助你