scala 从笔记本运行数据库作业

a64a0gku  于 2022-11-09  发布在  Scala
关注(0)|答案(2)|浏览(205)

我想知道是否可以使用代码从笔记本中运行Databricks作业,以及如何操作
我有一个具有多个任务和许多贡献者的作业,我们创建了一个作业来执行所有这些任务,现在我们希望从笔记本运行作业以测试新功能,而不在作业中创建新任务,还可以在循环中多次运行作业,例如:

for i in [1,2,3]:
    run job with parameter i

问候

ymdaylpp

ymdaylpp1#

您需要做的是:
1.安装数据库KSAPI。%pip install databricksapi==1.8.1
1.创建作业并返回输出。您可以按如下方式退出笔记本电脑:
import json dbutils.notebook.exit(json.dumps({"result": f"{_result}"}))
如果你想传递一个 Dataframe ,你也必须把它们作为json转储来传递,Databricks提供了一些关于这方面的官方文档。看看这个。
1.获取您稍后需要的工作ID。你可以从Databricks中的工作详细信息中获得它。
1.在Executors笔记本中,可以使用以下代码。

def run_ks_job_and_return_output(params):
   context = json.loads(dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson())
     # context
   url = context['extraContext']['api_url']
   token = context['extraContext']['api_token']

   jobs_instance = Jobs.Jobs(url, token) # initialize a jobs_instance
   runs_job_id = jobs_instance.runJob(****************, 'notebook',
                      params) #****is the job id

   run_is_not_completed = True
   while run_is_not_completed:
     current_run = [run for run in jobs_instance.runsList('completed')['runs'] if run['run_id'] == runs_job_id['run_id'] and run['number_in_job'] == runs_job_id['number_in_job']]
     if len(current_run) == 0:
       time.sleep(30)
     else:
       run_is_not_completed = False
       current_run = current_run[0]
       print( f"Result state:   {current_run['state']['result_state']}, You can check the resulted output in the following link: {current_run['run_page_url']}")
       note_output = jobs_instance.runsGetOutput(runs_job_id['run_id'])['notebook_output']
       return note_output

 run_ks_job_and_return_output( { 'parm1' : 'george',
                                    'variable': "values1"})

如果您希望多次并行运行该作业,您可以执行以下操作。(首先,确保您已在作业设置中增加了最大并发运行次数)

from multiprocessing.pool import ThreadPool 
pool = ThreadPool(1000) 
results = pool.map(lambda j: run_ks_job_and_return_output( { 'table' : 'george',
                                   'variable': "values1",
                                         'j': j}), 
         [str(x) for x in range(2,len(snapshots_list))])

也可以保存整个html输出,但您可能对此不感兴趣。无论如何,我会在StackOverflow上的另一篇帖子中回答这个问题。
希望能有所帮助。

fiei3ece

fiei3ece2#

您可以使用以下步骤:
注-01:

dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
dbutils.widgets.text("foo2", "foo2Default", "foo2EmptyLabel")
result = dbutils.widgets.get("foo")+"-"+dbutils.widgets.get("foo2")
def display():
     print("Function Display: "+result)
dbutils.notebook.exit(result)

注-02:

thislist = ["apple", "banana", "cherry"]
for x in thislist:
  dbutils.notebook.run("Note-01 path", 60, {"foo": x,"foo2":'Azure'})

相关问题