我对spark还比较陌生,我正在尝试使用spotipy调用spotifyapi。我有一个艺术家ID列表,可以用来获取艺术家信息。spotifyapi允许一次最多批量调用50个id。我从mysql数据库加载艺术家id,并将它们存储在Dataframe中。
我现在的问题是,我不知道如何有效地将Dataframe批处理成50行或更少的数据块。
在下面的示例中,我将dataframe转换为一个常规的python列表,从中可以对50个批调用api。
你知道我怎么做而不必回到python列表吗?
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from pyspark.sql import SparkSession
import os
spark = SparkSession\
.builder\
.appName("GetArtists")\
.getOrCreate()
df = spark.read.format('jdbc') \
.option("url", "jdbc:mysql://"+os.getenv("DB_SERVER")+":"+os.getenv("DB_PORT")+"/spotify_metadata")\
.option("user", os.getenv("DB_USER"))\
.option("password", os.getenv("DB_PW"))\
.option("query", "SELECT artist_id FROM artists")\
.load()
sp = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())
ids = [row['artist_id'] for row in df.collect()]
batch_size = 50
for i in range(0,len(ids), batch_size):
artists = sp.artists( ids[i:i+batch_size] )
# process the JSON response
我想用 foreach
并为每个id调用api,但这会导致不必要的请求。结果也存储在数据库中,这意味着我正在向数据库写入许多单行。
1条答案
按热度按时间ahy6op9u1#
如果要根据行号划分Dataframe,可以按以下方式进行:
但是如果可以直接将df传递给api,那么传递df或收集df每次只有50个值。