有人能用这个库用pyspark给Kafka写信吗?
我已经能够使用自述文档中的代码成功地阅读:
import logging, traceback
import requests
from pyspark.sql import Column
from pyspark.sql.column import *
jvm_gateway = spark_context._gateway.jvm
abris_avro = jvm_gateway.za.co.absa.abris.avro
naming_strategy = getattr(getattr(abris_avro.read.confluent.SchemaManager, "SchemaStorageNamingStrategies$"), "MODULE$").TOPIC_NAME()
schema_registry_config_dict = {"schema.registry.url": schema_registry_url,
"schema.registry.topic": topic,
"value.schema.id": "latest",
"value.schema.naming.strategy": naming_strategy}
conf_map = getattr(getattr(jvm_gateway.scala.collection.immutable.Map, "EmptyMap$"), "MODULE$")
for k, v in schema_registry_config_dict.items():
conf_map = getattr(conf_map, "$plus")(jvm_gateway.scala.Tuple2(k, v))
deserialized_df = data_frame.select(Column(abris_avro.functions.from_confluent_avro(data_frame._jdf.col("value"), conf_map))
.alias("data")).select("data.*")
然而,我正在努力通过写主题来扩展这种行为 to_confluent_avro
功能。
暂无答案!
目前还没有任何答案,快来回答吧!