我正在尝试将管道从Azure机器学习SDK V1迁移到V2,但有时我不理解V2背后的逻辑,我会卡住。
在V1中,我只需要创建PythonScriptStep并将其 Package 到StepSequence中并部署管道。我的脚本很简单,没有输入,没有输出。我们将数据存储在ADLS Gen 2中,并使用数据块表作为输入。这就是为什么我没有任何输入/输出。
script_step_1 = PythonScriptStep(
name="step1",
script_name="main.py",
arguments=arguments, # list of PipelineParameter
compute_target=ComputeTarget(workspace=ws, name="cpu-16-128"),
source_directory="./my_project_folder",
runconfig=runconfig, # Conda + extra index url + custom dockerfile
allow_reuse=False,
)
script_step_2 = PythonScriptStep(
name="step2",
...
)
step_sequence = StepSequence(
steps=[
script_step_1,
script_step_2,
]
)
# Create Pipeline
pipeline = Pipeline(
workspace=ws,
steps=step_sequence,
)
pipeline_run = experiment.submit(pipeline)
对于V2,我们需要在组件中创建一个将由管道使用的“节点”。
我用带有BuildContext的dockerfile创建了我的Environment,并将requirements.txt的表示形式提供给我添加了额外索引URL的条件字典。
azureml_env = Environment(
build=BuildContext(
path="./docker_folder", # With Dockerfile and requirements.txt
),
name="my-project-env",
)
现在我创建一个命令组件,它将调用python和一个带有一些参数的脚本:
step_1 = command(
environment=azureml_env ,
command="python main.py",
code="./my_project_folder",
)
现在我在SDK V2中有了step1和step2,我不知道如何在没有输入/输出的情况下创建序列
@pipeline(compute="serverless")
def default_pipeline():
return {
"my_pipeline": [step_1, step_2]
}
我不能设法使pipeline
的工作,使一个基本的运行2连续的步骤。
我想在我成功地做到这一点之后,我可以像这样创建/更新管道:
my_pipeline = default_pipeline()
# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
my_pipeline,
experiment_name=experiment_name,
)
更新1:
尝试创建我自己的StepSequence
(非常天真)与假人输入/输出
class CommandSequence:
def __init__(self, commands, ml_client):
self.commands = commands
self.ml_client = ml_client
def build(self):
for i in range(len(self.commands)):
cmd = self.commands[i]
if i == 0:
cmd = command(
display_name=cmd.display_name,
description=cmd.description,
environment=cmd.environment,
command=cmd.command,
code=cmd.code,
is_deterministic=cmd.is_deterministic,
outputs=dict(
my_output=Output(type="uri_folder", mode="rw_mount"),
),
)
else:
cmd = command(
display_name=cmd.display_name,
description=cmd.description,
environment=cmd.environment,
command=cmd.command,
code=cmd.code,
is_deterministic=cmd.is_deterministic,
inputs=self.commands[i - 1].outputs.my_output,
outputs=dict(
my_output=Output(type="uri_folder", mode="rw_mount"),
),
)
cmd = self.ml_client.create_or_update(cmd.component)
self.commands[i] = cmd
print(self.commands[i])
return self.commands
我不得不重新创建command
,因为它们保护了对象中的很多东西。
@pipeline(compute="serverless")
def default_pipeline():
command_sequence = CommandSequence([step_1, step_2], ml_client).build()
return {
"my_pipeline": command_sequence[-1].outputs.my_output
}
但它未能将步骤1的输出链接到步骤2的输入。
inputs=self.commands[i - 1]. outputs.my_output,AttributeError:“dict”对象没有属性“my_output”
1条答案
按热度按时间e4yzc0pl1#
基于Pipeline SDK v2 documentation,V2中的管道组件似乎需要显式定义输入/输出。与SDK v1不同,它目前没有StepSequence Class功能。
但是,由于脚本没有任何显式的输入/输出,一种可能的解决方法是使用虚拟输出来满足管道组件的需求并维护步骤顺序。
下面是伪代码: