Azure Databricks API用于创建作业,成功调用API后不会创建作业

wmtdaxz3  于 2023-10-22  发布在  其他
关注(0)|答案(1)|浏览(133)

我正在使用Python 3.6对Azure Databricks进行API调用,以创建一个作业来运行特定的笔记本。我已经按照这个链接上的说明使用了API。唯一的区别是我使用python而不是curl。我写的代码如下:

import requests
import os
import json

dbrks_create_job_url =  "https://"+os.environ['DBRKS_INSTANCE']+".azuredatabricks.net//2.0/jobs/create"

DBRKS_REQ_HEADERS = {
    'Authorization': 'Bearer ' + os.environ['DBRKS_BEARER_TOKEN'],
    'X-Databricks-Azure-Workspace-Resource-Id': '/subscriptions/'+ os.environ['DBRKS_SUBSCRIPTION_ID'] +'/resourceGroups/'+ os.environ['DBRKS_RESOURCE_GROUP'] +'/providers/Microsoft.Databricks/workspaces/' + os.environ['DBRKS_WORKSPACE_NAME'],
    'X-Databricks-Azure-SP-Management-Token': os.environ['DBRKS_MANAGEMENT_TOKEN']}

body_json = """
    {
    "name": "A sample job to trigger from DevOps",
    "tasks": [
        {
        "task_key": "ExecuteNotebook",
        "description": "Execute uploaded notebook including tests",
        "depends_on": [],
        "existing_cluster_id": """ + os.environ["DBRKS_CLUSTER_ID"] + """,
        "notebook_task": {
          "notebook_path": "/Users/myuser/sample-notebook",
          "base_parameters": {}
        },
        "timeout_seconds": 300,
        "max_retries": 1,
        "min_retry_interval_millis": 5000,
        "retry_on_timeout": false
      }
],
    "email_notifications": {},
    "name": "my_test_job",
    "max_concurrent_runs": 1}
"""

print("Request body in json format:")
print(body_json)

response = requests.post(dbrks_create_job_url, headers=DBRKS_REQ_HEADERS, data=body_json) 

if response.status_code == 200:
    print("Job created successfully!")
    print(response.status_code)
    print(response.content)
else:
    print("job failed!")
    raise Exception(response.content)

所有操作系统环境变量都是从我的Azure DevOps管道发送的。但是,您不需要从管道执行脚本。您可以从本地计算机执行它,只要您有一个可以访问数据块工作区的服务主体。要运行python脚本,您可以将这些环境变量替换为您自己的凭据。

解释脚本中的变量:

  • os.environ [“DBRKS_EQUANCE”]:数据块示例的名称
  • os.environ [DBRKS_BEARER_TOKEN]:不记名代币您需要它来验证您的服务主体或您的用户到数据砖。后来我解释了你如何得到它。
  • os.environ [DBRKS_MANAGEMENT_TOKEN]:如果您正在使用的服务原则未添加为数据块工作区用户或管理员,则需要此令牌。后来我解释了你如何得到它。
  • os.environ“DBRKS_SUBARTITION_ID”]:databricks工作区所在的Azure订阅ID。
  • os.environ [DBRKS_RESOURCE_GROUP']:数据块工作区的Azure资源组的名称。
  • os.environ [“DBRKS_WORKSPACE_NAME”]:Azure数据块工作区的名称。
  • os.environ[“DBRKS_CLUSTER_ID”]:将在数据块中执行作业的群集ID。

当我运行我的脚本时,我得到状态码200,这意味着它应该正常工作,如下所示:

然而,当我看工作列表时,尽管收到了200个状态代码,但没有创建新的工作!你可以在下面看到我创建的工作不存在。

我还将API端点从www.example.com更改azuredatabricks.net//2.0/jobs/create为azuredatabricks.net//2.1/jobs/create,仍然可以成功运行,但没有创建作业!我不明白我做错了什么。如果我做错了什么,为什么它不会引发异常并给我200个状态码。
最后一点是能够重新产生我所面临的问题:要获取DBRKS_BEARER_TOKEN和DBRKS_MANAGEMENT_TOKEN的上述两个变量,您可以运行以下脚本,并在脚本执行后手动将os.environ“DBRKS_BEARER_TOKEN ']和os.environ”DBRKS_MANAGEMENT_TOKEN']替换为打印的值:

import requests
import json
import os

TOKEN_BASE_URL = 'https://login.microsoftonline.com/' + os.environ['SVCDirectoryID'] + '/oauth2/token'
TOKEN_REQ_HEADERS = {'Content-Type': 'application/x-www-form-urlencoded'}
TOKEN_REQ_BODY = {
       'grant_type': 'client_credentials',
       'client_id': os.environ['SVCApplicationID'],
       'client_secret': os.environ['SVCSecretKey']}


def dbrks_management_token():
        TOKEN_REQ_BODY['resource'] = 'https://management.core.windows.net/'
        response = requests.get(TOKEN_BASE_URL, headers=TOKEN_REQ_HEADERS, data=TOKEN_REQ_BODY)
        if response.status_code == 200:
            print(response.status_code)
        else:
            raise Exception(response.text)
        return response.json()['access_token']

def dbrks_bearer_token():
        TOKEN_REQ_BODY['resource'] = '2ff814a6-3304-4ab8-85cb-cd0e6f879c1d'
        response = requests.get(TOKEN_BASE_URL, headers=TOKEN_REQ_HEADERS, data=TOKEN_REQ_BODY)
        if response.status_code == 200:
            print(response.status_code)
        else:
            raise Exception(response.text)
        return response.json()['access_token']

DBRKS_BEARER_TOKEN = dbrks_bearer_token()
DBRKS_MANAGEMENT_TOKEN = dbrks_management_token()

os.environ['DBRKS_BEARER_TOKEN'] = DBRKS_BEARER_TOKEN 
os.environ['DBRKS_MANAGEMENT_TOKEN'] = DBRKS_MANAGEMENT_TOKEN 

print("DBRKS_BEARER_TOKEN",os.environ['DBRKS_BEARER_TOKEN'])
print("DBRKS_MANAGEMENT_TOKEN",os.environ['DBRKS_MANAGEMENT_TOKEN'])
  • SVCDirectoryID是Azure Active Directory(AAD)服务主体租户ID
  • SVCApplicationID是AAD服务主体客户端ID的值。
  • SVCSecretKey是AAD服务主体密钥。

感谢您发送编修。

f1tvaqid

f1tvaqid1#

您混淆了API版本-tasks阵列只能与Jobs API 2.1一起使用,但您使用的是Jobs API 2.0。另一个错误是在主机名和路径之间有//
只需将dbrks_create_job_url更改为"https://"+os.environ['DBRKS_INSTANCE']+".azuredatabricks.net/api/2.1/jobs/create"

相关问题