我写了下面的Spark逻辑。
高级:代码循环遍历一些数据,将一些记录分批拉回来,对这些记录应用一些逻辑,并将输出附加到运行时创建的另一个表中。作业成功完成,但表为空。
详细说明:代码应该创建一个带有3个名称的sparkDataframe。对于每个名称,代码构造一个查询,使用名称作为过滤条件,对返回的数据应用一些逻辑,并将其存储在新的sparkDataframe(output\u spark\u df)中。然后,将此Dataframe转换为临时表,然后使用spark.sql将结果插入myu数据库myu results。我的\数据库。我的\结果应该有三次加载到它的数据。尽管作业已成功完成,但我的\数据库。我的\结果仍为空。
任何指导都将不胜感激。
if __name__ == "__main__":
spark = SparkSession.builder.appName('batch_job').config("spark.kryoserializer.buffer.max", "2047mb").config("spark.sql.broadcastTimeout", "-1").config("spark.sql.autoBroadcastJoinThreshold","-1").getOrCreate()
# Set up hive table to capture results
#-------------------------------------
spark.sql("DROP TABLE IF EXISTS my_database.my_results")
spark.sql("CREATE TABLE IF NOT EXISTS my_database.my_results (field1 STRING, field2 INT) STORED AS PARQUET")
names = spark.sql("select distinct name from my_database.my_input where name IN ('mike','jane','ryan')")
for n in names:
input_spark_df = spark.sql("select * from my_database.my_input where name = '{}'".format(n))
.
.
.
<APPLY LOGIC>
.
.
.
output_spark_df = <logic applied>
# Capture output and append to pre-created hive table
#----------------------------------------------------
output_spark_df.registerTempTable("results")
spark.sql("INSERT INTO TABLE my_database.my_results SELECT * FROM results")
spark.stop()
1条答案
按热度按时间iklwldmw1#
names
仍然是代码中的一个Dataframe,因为在Dataframe上循环会导致for循环中没有匹配的记录。使
names
变量作为我们需要做的列表flatMap and collect
创建一个列表,然后在列表上循环。Fix:
```create names list
names=spark.sql("select distinct id as id from default.i").
rdd.
flatMap(lambda z:z).
collect()
to print values in the list
for n in names:
print(n)
`Example with sample data:`
sample data
spark.sql("select distinct id as id from default.i").show()
+---+
| id|
+---+
| 1|
| 2|
| 3|
+---+
creating a list
names=spark.sql("select distinct id as id from default.i").flatMap(lambda z:z).collect()
looping over the list
for n in names:
spark.sql("select * from default.i where id = '{}'".format(n)).show()
result
+---+
| id|
+---+
| 1|
+---+
+---+
| id|
+---+
| 2|
+---+
+---+
| id|
+---+
| 3|
+---+