向pyspark Dataframe添加新行

7ajki6be  于 2023-10-15  发布在  Spark
关注(0)|答案(4)|浏览(156)

我是一个很新的pyspark,但熟悉Pandas。我有一个pyspark数据框架

# instantiate Spark
spark = SparkSession.builder.getOrCreate()

# make some test data
columns = ['id', 'dogs', 'cats']
vals = [
     (1, 2, 0),
     (2, 0, 1)
]

# create DataFrame
df = spark.createDataFrame(vals, columns)

我想添加一个新的Row(4,5,7),所以它会输出:

df.show()
+---+----+----+
| id|dogs|cats|
+---+----+----+
|  1|   2|   0|
|  2|   0|   1|
|  4|   5|   7|
+---+----+----+
6l7fqoea

6l7fqoea1#

正如thebluephantom已经说过的那样,联合是一条路。我只是回答你的问题,给你一个给予的例子:

# if not already created automatically, instantiate Sparkcontext
spark = SparkSession.builder.getOrCreate()

columns = ['id', 'dogs', 'cats']
vals = [(1, 2, 0), (2, 0, 1)]

df = spark.createDataFrame(vals, columns)

newRow = spark.createDataFrame([(4,5,7)], columns)
appended = df.union(newRow)
appended.show()

也请看看数据块常见问题解答:https://kb.databricks.com/data/append-a-row-to-rdd-or-dataframe.html

zi8p0yeb

zi8p0yeb2#

也可以使用collect方法将行追加到嵌套框中。collect()函数将点阵转换为列表,你可以直接将数据追加到列表中,然后再将列表转换为点阵。
我的spark框架叫做df

+---+----+------+
| id|name|gender|
+---+----+------+
|  1|   A|     M|
|  2|   B|     F|
|  3|   C|     M|
+---+----+------+

转换这个框架到列表使用收集

collect_df = df.collect()
print(collect_df)

[Row(id=1, name='A', gender='M'),
 Row(id=2, name='B', gender='F'),
 Row(id=3, name='C', gender='M')]

将新行附加到此列表

collect_df.append({"id" : 5, "name" : "E", "gender" : "F"})
print(collect_df)

[Row(id=1, name='A', gender='M'),
 Row(id=2, name='B', gender='F'),
 Row(id=3, name='C', gender='M'),
 {'id': 5, 'name': 'E', 'gender': 'F'}]

将此列表转换为嵌套框架

added_row_df = spark.createDataFrame(collect_df)
added_row_df.show()

+---+----+------+
| id|name|gender|
+---+----+------+
|  1|   A|     M|
|  2|   B|     F|
|  3|   C|     M|
|  5|   E|     F|
+---+----+------+
gdrx4gfi

gdrx4gfi3#

从我做的事情,使用union,显示一个块部分编码-你需要适应当然你自己的情况:

val dummySchema = StructType(
StructField("phrase", StringType, true) :: Nil)
var dfPostsNGrams2 = spark.createDataFrame(sc.emptyRDD[Row], dummySchema)
for (i <- i_grams_Cols) {
    val nameCol = col({i})
    dfPostsNGrams2 = dfPostsNGrams2.union(dfPostsNGrams.select(explode({nameCol}).as("phrase")).toDF )
 }

DF与自身的结合是一条路要走。

j5fpnvbx

j5fpnvbx4#

另一种方法是使用分区的parquet格式,并为您想要附加的每个子框架添加一个额外的parquet文件。通过这种方式,您可以创建(数百、数千、数百万)个parquet文件,当您稍后读取目录时,spark会将它们作为一个联合体读取。
本例使用Pyarrow
注意我还展示了如何编写一个没有分区的parquet(例如.parquet),如果你已经知道你想把这个parquet文件放在哪里的话。

import pyarrow.parquet as pq
import pandas as pd

headers=['A', 'B', 'C']

row1 = ['a1', 'b1', 'c1']
row2 = ['a2', 'b2', 'c2']

df1 = pd.DataFrame([row1], columns=headers)
df2 = pd.DataFrame([row2], columns=headers)

df3 = df1.append(df2, ignore_index=True)

table = pa.Table.from_pandas(df3)

pq.write_table(table, 'example.parquet', flavor='spark')
pq.write_to_dataset(table, root_path="test_part_file", partition_cols=['B', 'C'], flavor='spark')

# Adding a new partition (B=b2/C=c3

row3 = ['a3', 'b3', 'c3']
df4 = pd.DataFrame([row3], columns=headers)

table2 = pa.Table.from_pandas(df4)
pq.write_to_dataset(table2, root_path="test_part_file", partition_cols=['B', 'C'], flavor='spark')

# Add another parquet file to the B=b2/C=c2 partition
# Note this does not overwrite existing partitions, it just appends a new .parquet file.
# If files already exist, then you will get a union result of the two (or multiple) files when you read the partition
row5 = ['a5', 'b2', 'c2']
df5 = pd.DataFrame([row5], columns=headers)
table3 = pa.Table.from_pandas(df5)
pq.write_to_dataset(table3, root_path="test_part_file", partition_cols=['B', 'C'], flavor='spark')

之后阅读输出

from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .appName("testing parquet read")
         .getOrCreate())

df_spark = spark.read.parquet('test_part_file')
df_spark.show(25, False)

你应该看看这样的东西

+---+---+---+
|A  |B  |C  |
+---+---+---+
|a5 |b2 |c2 |
|a2 |b2 |c2 |
|a1 |b1 |c1 |
|a3 |b3 |c3 |
+---+---+---+

如果你再次首尾相连地运行相同的东西,你应该会看到像这样的重复(因为所有以前的parquet文件仍然存在,所以spark联合它们)。

+---+---+---+
|A  |B  |C  |
+---+---+---+
|a2 |b2 |c2 |
|a5 |b2 |c2 |
|a5 |b2 |c2 |
|a2 |b2 |c2 |
|a1 |b1 |c1 |
|a1 |b1 |c1 |
|a3 |b3 |c3 |
|a3 |b3 |c3 |
+---+---+---+

相关问题