confluent-kafka python的数据流给出错误- TypeError:必须传递str或Schema

epfja78i  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(138)

我试图在Kafka python中构建流表连接操作,下面是对生产者发送的数据执行连接操作的代码
stream_table_join.py

from confluent_kafka import DeserializingConsumer, SerializingProducer 
from confluent_kafka.serialization import StringDeserializer, StringSerializer 
from confluent_kafka import Consumer 
from confluent_kafka.schema_registry.json_schema import JSONDeserializer 
import json

class LookupData:
    def __init__(self, id, name):
        self.id = id
        self.name = name

# Kafka configuration
bootstrap_servers = 'localhost:9092'
events_topic = 'events_topic'

# Configure the Kafka consumer and producer
consumer = DeserializingConsumer({
    'bootstrap.servers': bootstrap_servers,
    'key.deserializer': StringDeserializer(),
    'value.deserializer': JSONDeserializer(LookupData, from_dict=lambda x: LookupData(x['id'], x['name'])),
    'group.id': 'stream-table-join-group'
})
consumer.subscribe([events_topic])

producer = SerializingProducer({
    'bootstrap.servers': bootstrap_servers,
    'key.serializer': StringSerializer(),
    'value.serializer': StringSerializer()
})

# Load the lookup data from the lookup table
lookup_table = {}

# Join operation function
def join_operation(event):
    """ Join Operation here """

# Start consuming and producing messages
while True:
    event = json.loads(msg.value())
    enriched_event = join_operation(event)
    producer.produce(events_topic, value=json.dumps(enriched_event))
    producer.flush()

consumer.close()

存在如下共享的部分
'value.deserializer': JSONDeserializer(LookupData, from_dict=lambda x: LookupData(x['id'], x['name'])),
它利用从生产者发送的数据对象,并在JSONDeserializer的帮助下转换为LookupData类对象,这样做是为了可以加入额外的字段,但每次执行代码文件时,它只是显示错误

TypeError: You must pass either str or Schema

我试着运行了两次1。生产者运行和2。没有生产者运行,两次我有相同的错误,并没有足够的解释究竟是什么错误或如何解决这个问题
下面是来自producer.py的示例数据

{
  "id": 1,
  "name": "John Doe",
}

我试着在有生产者和没有生产者的情况下运行代码,两次我都收到了相同的错误,甚至检查了文档并试图找到错误的解决方案,但没有找到

vom3gejh

vom3gejh1#

from confluent_kafka import DeserializingConsumer
import confluent_kafka
from confluent_kafka.schema_registry import SchemaRegistryClient
from configparser import ConfigParser
import json
import fastavro
from io import BytesIO
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logging.info('Starting consumer...')

# Read the variables 
config = ConfigParser()
config.read('client.properties')

# Schema registry client
schema_registry_conf = {
  'url': config.get('DEFAULT', 'schema.registry.url'),
  'basic.auth.user.info': config.get('DEFAULT', 'basic.auth.user.info')
}

schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# Get the latest version of the value schema
subject_name = "flights-value"
schema = schema_registry_client.get_latest_version(subject_name)
schema_str = schema.schema.schema_str

logging.info(f"Schema String: {schema_str}")  # Add this line to print the schema string

# Convert the schema string to a dictionary
schema_dict = json.loads(schema_str)
logging.info(f'Schema dict: {schema_dict}')
schema_str2= json.dumps(schema_dict)
logging.info(f'Schema_str: {schema_str2}')

def value_deserializer(data, ctx):
    if data is None:
        return None
    try:
        return fastavro.schemaless_reader(BytesIO(data), schema_dict)
    except IndexError as e:
        logging.error(f"Failed to deserialize data: {data}")
        logging.error(f"Error: {e}")
        return None
    
def key_deserializer(key, ctx):
    return str(key)

# Configure the consumer
consumer_conf = {
    'bootstrap.servers': config.get('DEFAULT', 'bootstrap.servers'),
    'security.protocol': config.get('DEFAULT', 'security.protocol'),
    'sasl.mechanisms': config.get('DEFAULT', 'sasl.mechanisms'),
    'sasl.username': config.get('DEFAULT', 'sasl.username'),
    'sasl.password': config.get('DEFAULT', 'sasl.password'),
    'group.id': 'flight_data_group',
    'auto.offset.reset': 'earliest',
    'key.deserializer': key_deserializer,
    'value.deserializer': value_deserializer
}

consumer = DeserializingConsumer(consumer_conf)

# Subscribe to the topic
consumer.subscribe(['flights'])

try:
    while True:
        # Poll for messages
        msg = consumer.poll(1.0)

        if msg is None:
            continue

        if msg.error():
            if msg.error().code() == confluent_kafka.KafkaError._VALUE_DESERIALIZATION:
                logging.error(f"Failed to deserialize message value: {msg.error()}")
                continue
            else:
                logging.error(f"Consumer error: {msg.error()}")
                continue

        logging.info(f"Received message: {msg.key()} : {msg.value()} at offset {msg.offset()}")

except KeyboardInterrupt:
    pass
finally:
    # Close down consumer to commit final offsets.
    consumer.close()

它为我工作,如果可以帮助你

相关问题