pyspark 代码存储库-数据集的每一行上的外部API调用

f3temu5u  于 2023-05-16  发布在  Spark
关注(0)|答案(1)|浏览(152)

我有一个两步API调用我需要做。我想让它灵活地适应未来的用例,但我遇到了麻烦。
我通过REST API端点发出初始get请求以获取源数据的模式。它的输出将为后续端点提供输入,我需要访问这些端点以获取每个表中的数据。
架构终结点输出:

{
  "tables": [
    {
      "id": "table1",
      "name": "A"
    },
    {
      "id": "table2",
      "name": "B"
    },
    {
      "id": "table3",
      "name": "C"
    },
    {
      "id": "table4",
      "name": "D"
    },
    {
      "id": "table5",
      "name": "E"
    }
  ]
}

我想访问另一个端点,它将此作为输入,并将输出动态存储在另一列中。理想结束状态:
| 表ID| tableName| tableData|
| --------------|--------------|--------------|
| 表1|名称A| JSON响应1|
| 表2|名称B| JSON响应2|
| ......这是什么?|......这是什么?|......这是什么?|
| 表N|名称N| json响应N|
我已经成功地得到了第一个响应,但是当我把它放在一个pyspark数据框中时,当我试图点击每一行来检索数据时,我得到了错误。有什么想法吗?我知道有一堆细节缺失,但也许有一些一般的指导,因为缺乏这方面的文档。
变换步骤:
1.首先获取schema并转换为pyspark Dataframe 。然后尝试在代码存储库中使用pandas dataframe执行以下工作流。
1.无法弄清楚如何使用UDF或仅使用常规函数来命中数据集每行的另一个端点。

9jyewag0

9jyewag01#

我以前用这样的两步API调用解决过类似的问题。我使用 * 数据连接任务 * 解决了这个问题,这在文档中不容易找到。
1.如果第一个API调用中的源数据来自data connection sync并且在数据集中,则只需为第二个API调用创建数据连接任务。转到compass文件夹中的数据连接器,* 右键单击 *,然后单击“创建数据连接任务”。然后,您可以在YAML中配置API请求,将输入数据集设置为第一个API调用的数据集,并输出到包含第二个API调用的数据的新数据集。生成此输出数据集以运行数据连接任务。对于YAML配置,您可以在 * 高级 * 模式下打开原始数据连接器同步的配置,并将YAML复制到那里进行格式化。
1.如果您的源数据不是来自数据连接同步,我建议将其迁移到一个,而不是直接在代码中进行API调用。通过这种方式,您可以更好地围绕同步中的API调用设置调度和健康检查,而不是将其嵌入到代码中。你也可以按照我上面的建议来解决你的问题。
从API调用的两个步骤中收集数据后,您可以根据需要将这些数据集连接在一起,以实现理想的最终状态格式。

相关问题