尝试用kafka和pyspark在postgresql中编写spark的流Dataframe

v09wglhw  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(448)

我一直在寻找这个问题在这个网站的每一方,我没有找到任何解决办法。我编写了一个java类,它用kafka创建了一个producer,并发送了一些文件,效果很好。然后,我想编写一个python脚本来读取这些文件并将它们放入postgresql中的数据库。
每个文件(每个文件是一个有很多列的数据集)成为kafka consumer中的一个主题,文件的每一行成为相对主题中的一条消息。
这是我用python从流数据创建的sparkDataframe:

list = df.select("fileName", "Satellite_PRN_number", "date", "time", "Crs", "Delta_n", "m0", "Cuc",
                 "e_Eccentricity",
                 "Cus",
                 "sqrt_A", "Toe_Time_of_Ephemeris", "Cic", "OMEGA_maiusc", "cis", "i0", "Crc", "omega",
                 "omega_dot",
                 "idot")

下面是我的python函数,它应该插入postgresql表中的每一行。我使用psycopg2在python和postgre之间创建连接,并使用“self.cursor.execute”编写查询。

def process_row(self, row):
  self.cursor.execute(
  'INSERT INTO satellite(fileName,Satellite_PRN_number, date, time,Crs,Delta_n, m0, 
  Cuc,e_Eccentricity,Cus,'
  'sqrt_A, Toe_Time_of_Ephemeris, Cic, OMEGA_maiusc, cis, i0, Crc, omega, omega_dot, idot) VALUES 
  (%s,%s,%s,'
  '%s,%s,%s, %s, %s, %s, %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',
  (row.fileName, row.Satellite_PRN_number, row.date, row.time, row.Crs, row.Delta_n, row.m0, row.Cuc,
  row.e_Eccentricity,
  row.Cus, row.sqrt_A, row.Toe_Time_of_Ephemeris, row.Cic, row.OMEGA_maiusc, row.cis, row.i0, 
  row.Crc,
  row.omega,
  row.omega_dot, row.idot))
  self.connection.commit()

最后,我使用上述方法在postgresql中使用以下命令填充表:

query = list.writeStream.outputMode("append").foreachBatch(process_row)\ 
        .option("checkpointLocation", "C:\\Users\\Admin\\AppData\\Local\\Temp").start()

我得到以下错误: AttributeError: 'DataFrame' object has no attribute 'cursor' .
我认为问题出在row.filename等中。。。或者在“处理行”方法中。我不太明白如何管理“process\u row”方法,以便传递流Dataframe的每一行来填充postesql表。
有人能帮我吗?谢谢。

wqsoz72f

wqsoz72f1#

你的签名好像不对。应该是这样的:

def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    pass

streamingDF.writeStream.foreachBatch(foreach_batch_function).start()

正如您所看到的,foreachbatch函数的第一个参数是一个Dataframe,而不是您所期望的psycopg2类的示例。foreachbatch将有一个Dataframe,它本身将包含当前微批的所有行,而不仅仅是一行。
因此,您可以尝试在该函数中声明postgresql连接的示例以进一步使用它,也可以尝试以下方法:
我将为您的postgresql数据库创建一个基于hive jdbc源代码的表,如下所示:

CREATE TABLE jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:postgresql:dbserver",
  dbtable "schema.tablename",
  user 'username',
  password 'password'
)

这将使您能够像这样使用foreachbatch函数:

def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    df.write.insertInto("jdbcTable")

希望对你有帮助

相关问题