使用配置单元表迭代sparkDataframe

dz6r00yl  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(280)

我有一个非常大的csv文件,所以我使用spark并将其加载到sparkDataframe中。
我需要从csv上的每一行中提取纬度和经度,以便创建一个foliumMap。
有了Pandas,我可以用一个循环来解决我的问题:

for index, row in locations.iterrows():    

    folium.CircleMarker(location=(row["Pickup_latitude"],
                              row["Pickup_longitude"]),
                    radius=20,
                    color="#0A8A9F",fill=True).add_to(marker_cluster)

我发现与pandasDataframe不同,sparkDataframe不能由循环处理=>如何循环遍历pyspark中的每一行Dataframe。
所以我想,为了解决这个问题,我可以把大数据切割成Hive表,然后迭代它们。
是否可以在配置单元表中剪切巨大的sparkDataframe,然后用循环迭代行?

vngu2lb8

vngu2lb81#

一般来说,您不需要迭代Dataframe或rdd。你只会创造 transformations (如map)将应用于每个记录,然后调用一些 action 称之为处理。
你需要这样的东西:

dataframe.withColumn("latitude", <how to extract latitude>)
         .withColumn("longitude", <how to extract longitude>)
         .select("latitude", "longitude")
         .rdd
         .map(row => <extract values from Row type>)
         .collect()         // this will move data to local collection

如果不能使用sql,则需要使用rdd:

dataframe
     .rdd
     .map(row => <create new row with latitude and longitude>)
     .collect()

相关问题