Kafka 属性错误:'DataFrameWriter'对象没有'start'属性

jgwigjjp  于 2022-12-03  发布在  Apache
关注(0)|答案(1)|浏览(67)

我正在尝试写一个代码使用Kafka,Python和SparK的问题陈述是:从XML读取数据,使用的数据将是二进制格式。此数据必须存储在 Dataframe 中。
我收到以下错误:
错误:文件“C:/Users/HP/PycharmProjects/xml_streaming/ConS.py“,第55行,格式为(“控制台”)
属性错误:'DataFrameWriter'对象没有'start'属性
下面是我的代码供参考:

#import  *
    
    # Set spark environments
    #os.environ['PYSPARK_PYTHON'] = <PATH>
    #os.environ['PYSPARK_DRIVER_PYTHON'] = <PATH>
    
    spark = SparkSession\
             .builder\
             .master("local[1]")\
             .appName("Consumer")\
             .getOrCreate()
    
    topic_Name = 'XML_File_Processing3'
    consumer = kafka.KafkaConsumer(topic_Name, bootstrap_servers=['localhost:9092'], auto_offset_reset='latest')
    
    kafka_df = spark\
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("kafka.security.protocol", "SSL") \
        .option("failOnDataLoss", "false") \
        .option("subscribe", topic_Name) \
        .load()
    #.option("startingOffsets", "earliest") \
    print("Loaded to DataFrame kafka_df")
    kafka_df.printSchema()
    new_df = kafka_df.selectExpr("CAST(value AS STRING)")
    schema = ArrayType(StructType()\
            .add("book_id", IntegerType())\
            .add("author", StringType())\
            .add("title", StringType())\
            .add("genre",StringType())\
            .add("price",IntegerType())\
            .add("publish_date", IntegerType())\
            .add("description", StringType()))
    book_DF = new_df.select(from_json(col("value"), schema).alias("dataf"))     #.('data')).select("data.*")
    book_DF.printSchema()
    #book_DF.select("dataf.author").show()
    
    book_DF.write\
           .format("console")\
           .start()
km0tfn4u

km0tfn4u1#

我对Kafka没有太多的经验,但是最后你在book_DF.write.format("console")的结果上使用了start()方法,它是一个DataFrameWriter对象,它没有start()方法。
你想把它写成流吗?那么你可能需要使用类似writeStream的方法:

book_DF.writeStream \
           .format("kafka") \
           .start()

更多信息+示例可以在这里找到。
如果你只是想把 Dataframe 打印到控制台,你应该可以使用show methodbook_DF.show()

相关问题