如何枚举spark的Dataframe中的列?如果列是嵌套的呢?

vqlkdk9b  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(498)

表面上它没有 items() 方法。那怎么办?
我正在尝试用以下代码将行发送到数据库:

  1. def write_row(table_name, cur, row):
  2. data = []
  3. for key, value in row.items():
  4. data.append((key, value))
  5. data = zip(*data)
  6. columns = ", ".join(data[0])
  7. values = data[1]
  8. questionmarks = ", ".join(["?"] * len(columns))
  9. query = f"INSERT INTO {table_name} ({columns}) VALUES ({questionmarks})"
  10. cur.execute(query, values)
  11. def write_data_frame(df, epoch1):
  12. conn = mariadb.connect(**config["mariadb"])
  13. cur = conn.cursor()
  14. table_name = "pysparktest"
  15. rows = df.collect()
  16. for row in rows:
  17. write_row(table_name, cur, row)
  18. conn.commit()

它发誓

  1. AttributeError: items

如果行是嵌套的呢?

  1. root
  2. |-- track: struct (nullable = true)
  3. | |-- name: string (nullable = true)
  4. | |-- version: string (nullable = true)
  5. |-- car: struct (nullable = true)
  6. | |-- name: string (nullable = true)
  7. | |-- version: string (nullable = true)
  8. |-- cnt: long (nullable = false)
  9. |-- minBestLapTime: double (nullable = true)
guz6ccqo

guz6ccqo1#

就像编译器发誓的那样,row类中没有名为“items()”的方法。
你需要做的是使用“asdict”方法。它以python dict的形式输出行中的键、值。
对于嵌套列,asdict函数中有一个名为recursive的参数,将其设置为true。默认情况下,设置为false。
例如:

  1. row = Row(name="Alice", age=11)
  2. row_as_dict = row.asDict()
  3. row_as_dict

输出:

  1. {'name': 'Alice', 'age': 11}

对于迭代:

  1. for key in row_as_dict:
  2. print("{} : {}".format(key, row_as_dict[key]))

输出:

  1. name : Alice
  2. age : 11

如果是嵌套列

  1. row = Row(key=1, value=Row(name='a', age=2))
  2. row_as_dict = row.asDict(recursive=True)
  3. row_as_dict

输出:

  1. {'key': 1, 'value': {'name': 'a', 'age': 2}}
展开查看全部

相关问题