我想利用spark(它在databricks上运行,我正在使用pyspark)向restapi发送并行请求。现在我可能面临两种情况:RESTAPI 1:返回~mb量级的数据rest api 2:返回~kb的数据。关于如何在节点之间分配请求有什么建议吗?谢谢!
tpgth1q71#
只需创建一个带有url(如果您使用不同的url)和api参数(如果它们不是url的一部分)的数据框——这可以通过从列表等显式创建它来完成,也可以通过从外部数据源读取数据来完成,比如json文件或类似的文件 spark.read 函数)。然后定义用户定义的函数,该函数将执行对restapi的请求并以列的形式返回数据。类似这样的(未测试):
spark.read
import urllib df = spark.createDataFrame([("url1", "params1"), ("url2", "params2")], ("url", "params")) @udf("body string, status int") def do_request(url: str, params: str): with urllib.request.urlopen(url) as f: status = f.status body = f.read().decode("utf-8") return {'status': status, 'body': body} res = df.withColumn("result", do_requests(col("url"), col("params")))
这将返回新列名为 result 将有两个字段- status 以及 body (json答案)。您需要添加错误处理等。
result
status
body
1条答案
按热度按时间tpgth1q71#
只需创建一个带有url(如果您使用不同的url)和api参数(如果它们不是url的一部分)的数据框——这可以通过从列表等显式创建它来完成,也可以通过从外部数据源读取数据来完成,比如json文件或类似的文件
spark.read
函数)。然后定义用户定义的函数,该函数将执行对restapi的请求并以列的形式返回数据。类似这样的(未测试):
这将返回新列名为
result
将有两个字段-status
以及body
(json答案)。您需要添加错误处理等。