高效地批处理sparkDataframe以调用api

htrmnn0y  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(462)

我对spark还比较陌生,我正在尝试使用spotipy调用spotifyapi。我有一个艺术家ID列表,可以用来获取艺术家信息。spotifyapi允许一次最多批量调用50个id。我从mysql数据库加载艺术家id,并将它们存储在Dataframe中。
我现在的问题是,我不知道如何有效地将Dataframe批处理成50行或更少的数据块。
在下面的示例中,我将dataframe转换为一个常规的python列表,从中可以对50个批调用api。
你知道我怎么做而不必回到python列表吗?

  1. import spotipy
  2. from spotipy.oauth2 import SpotifyClientCredentials
  3. from pyspark.sql import SparkSession
  4. import os
  5. spark = SparkSession\
  6. .builder\
  7. .appName("GetArtists")\
  8. .getOrCreate()
  9. df = spark.read.format('jdbc') \
  10. .option("url", "jdbc:mysql://"+os.getenv("DB_SERVER")+":"+os.getenv("DB_PORT")+"/spotify_metadata")\
  11. .option("user", os.getenv("DB_USER"))\
  12. .option("password", os.getenv("DB_PW"))\
  13. .option("query", "SELECT artist_id FROM artists")\
  14. .load()
  15. sp = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())
  16. ids = [row['artist_id'] for row in df.collect()]
  17. batch_size = 50
  18. for i in range(0,len(ids), batch_size):
  19. artists = sp.artists( ids[i:i+batch_size] )
  20. # process the JSON response

我想用 foreach 并为每个id调用api,但这会导致不必要的请求。结果也存储在数据库中,这意味着我正在向数据库写入许多单行。

ahy6op9u

ahy6op9u1#

如果要根据行号划分Dataframe,可以按以下方式进行:

  1. from pyspark.sql import functions as f
  2. from pyspark.sql import Window
  3. df = df.withColumn('row_num', f.row_number().over(Window.orderBy(f.lit(1))))
  4. len = df.count()
  5. for i in range(0,len, 50):
  6. df = df.filter(f.col('row_num')>=i & f.col('row_num')<=i+50)
  7. #api logic goes here

但是如果可以直接将df传递给api,那么传递df或收集df每次只有50个值。

相关问题