使用spark的并行restapi请求(databricks)

jk9hmnmh  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(407)

我想利用spark(它在databricks上运行,我正在使用pyspark)向restapi发送并行请求。现在我可能面临两种情况:
RESTAPI 1:返回~mb量级的数据
rest api 2:返回~kb的数据。
关于如何在节点之间分配请求有什么建议吗?
谢谢!

tpgth1q7

tpgth1q71#

只需创建一个带有url(如果您使用不同的url)和api参数(如果它们不是url的一部分)的数据框——这可以通过从列表等显式创建它来完成,也可以通过从外部数据源读取数据来完成,比如json文件或类似的文件 spark.read 函数)。
然后定义用户定义的函数,该函数将执行对restapi的请求并以列的形式返回数据。类似这样的(未测试):

import urllib
df = spark.createDataFrame([("url1", "params1"), ("url2", "params2")], ("url", "params"))

@udf("body string, status int")
def do_request(url: str, params: str):
  with urllib.request.urlopen(url) as f:
    status = f.status
    body = f.read().decode("utf-8")

  return {'status': status, 'body': body}

res = df.withColumn("result", do_requests(col("url"), col("params")))

这将返回新列名为 result 将有两个字段- status 以及 body (json答案)。您需要添加错误处理等。

相关问题