Edit

Share via


Upgrade pipelines to SDK v2

In SDK v2, pipelines are consolidated into jobs.

A job has a type. Most jobs are command jobs that run a command, like python main.py. What runs in a job is agnostic to any programming language, so you can run bash scripts, invoke python interpreters, run a bunch of curl commands, or anything else.

A pipeline is another type of job, which defines child jobs that might have input/output relationships, forming a directed acyclic graph (DAG).

To upgrade, change your code for defining and submitting the pipelines to SDK v2. You don't need to upgrade what you run within the child job to SDK v2. However, remove any code specific to Azure Machine Learning from your model training scripts. This separation allows for an easier transition between local and cloud and is considered best practice for mature MLOps. In practice, this best practice means removing azureml.* lines of code. Replace model logging and tracking code with MLflow. For more information, see how to use MLflow in v2.

This article gives a comparison of scenarios in SDK v1 and SDK v2. In the following examples, you build three steps (train, score, and evaluate) into a dummy pipeline job. This comparison demonstrates how to build pipeline jobs using SDK v1 and SDK v2, and how to consume data and transfer data between steps.

Run a pipeline

  • SDK v1

    Important

    Azure Machine Learning SDK v1 (azureml-core) is deprecated as of March 31, 2025. Support ends June 30, 2026. The following code is shown for comparison only. Use the SDK v2 example for new work. For more information, see Upgrade to v2.

    # import required libraries
    import os
    import azureml.core
    from azureml.core import (
        Workspace,
        Dataset,
        Datastore,
        ComputeTarget,
        Experiment,
        ScriptRunConfig,
    )
    from azureml.pipeline.steps import PythonScriptStep
    from azureml.pipeline.core import Pipeline
    
    # check core SDK version number
    print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)
    
    # load workspace
    workspace = Workspace.from_config()
    print(
        "Workspace name: " + workspace.name,
        "Azure region: " + workspace.location,
        "Subscription id: " + workspace.subscription_id,
        "Resource group: " + workspace.resource_group,
        sep="\n",
    )
    
    # create an ML experiment
    experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline")
    
    # create a directory
    script_folder = "./src"
    
    # create compute
    from azureml.core.compute import ComputeTarget, AmlCompute
    from azureml.core.compute_target import ComputeTargetException
    
    # Choose a name for your CPU cluster
    amlcompute_cluster_name = "cpu-cluster"
    
    # Verify that cluster does not exist already
    try:
        aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name)
        print('Found existing cluster, use it.')
    except ComputeTargetException:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2',
                                                               max_nodes=4)
        aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)
    
    aml_compute.wait_for_completion(show_output=True)
    
    # define data set
    data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"]
    input_ds = Dataset.File.from_files(data_urls)
    
    # define steps in pipeline
    from azureml.data import OutputFileDatasetConfig
    model_output = OutputFileDatasetConfig('model_output')
    train_step = PythonScriptStep(
        name="train step",
        script_name="train.py",
        arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    score_output = OutputFileDatasetConfig('score_output')
    score_step = PythonScriptStep(
        name="score step",
        script_name="score.py",
        arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    eval_output = OutputFileDatasetConfig('eval_output')
    eval_step = PythonScriptStep(
        name="eval step",
        script_name="eval.py",
        arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    # built pipeline
    from azureml.pipeline.core import Pipeline
    
    pipeline_steps = [train_step, score_step, eval_step]
    
    pipeline = Pipeline(workspace = workspace, steps=pipeline_steps)
    print("Pipeline is built.")
    
    pipeline_run = experiment.submit(pipeline, regenerate_outputs=False)
    
    print("Pipeline submitted for execution.")
    
    
  • SDK v2. Full sample link

    # import required libraries
    from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
    
    from azure.ai.ml import MLClient, Input
    from azure.ai.ml.dsl import pipeline
    
    try:
        credential = DefaultAzureCredential()
        # Check if given credential can get token successfully.
        credential.get_token("https://management.azure.com/.default")
    except Exception as ex:
        # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
        credential = InteractiveBrowserCredential()
    
    # Get a handle to workspace
    ml_client = MLClient.from_config(credential=credential)
    
    # Retrieve an already attached Azure Machine Learning Compute.
    cluster_name = "cpu-cluster"
    print(ml_client.compute.get(cluster_name))
    # Tip: You can skip provisioning a cluster by using serverless compute.
    # Replace `default_compute=cluster_name` with `default_compute="serverless"`
    # in the @pipeline decorator below. See: https://learn.microsoft.com/azure/machine-learning/how-to-use-serverless-compute
    
    # Import components that are defined with Python function
    with open("src/components.py") as fin:
        print(fin.read())
    
    # You need to install mldesigner package to use command_component decorator.
    # Option 1: install directly
    # !pip install mldesigner
    
    # Option 2: install as an extra dependency of azure-ai-ml
    # !pip install azure-ai-ml[designer]
    
    # import the components as functions
    from src.components import train_model, score_data, eval_model
    
    cluster_name = "cpu-cluster"
    # define a pipeline with component
    @pipeline(default_compute=cluster_name)
    def pipeline_with_python_function_components(input_data, test_data, learning_rate):
        """E2E dummy train-score-eval pipeline with components defined via Python function components"""
    
        # Call component obj as function: apply given inputs & parameters to create a node in pipeline
        train_with_sample_data = train_model(
            training_data=input_data, max_epochs=5, learning_rate=learning_rate
        )
    
        score_with_sample_data = score_data(
            model_input=train_with_sample_data.outputs.model_output, test_data=test_data
        )
    
        eval_with_sample_data = eval_model(
            scoring_result=score_with_sample_data.outputs.score_output
        )
    
        # Return: pipeline outputs
        return {
            "eval_output": eval_with_sample_data.outputs.eval_output,
            "model_output": train_with_sample_data.outputs.model_output,
        }
    
    
    pipeline_job = pipeline_with_python_function_components(
        input_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        test_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        learning_rate=0.1,
    )
    
    # submit job to workspace
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="train_score_eval_pipeline"
    )
    

Mapping of key functionality in SDK v1 and SDK v2

Functionality in SDK v1 Rough mapping in SDK v2
azureml.pipeline.core.Pipeline azure.ai.ml.dsl.pipeline
OutputDatasetConfig Output
dataset as_mount Input
StepSequence Data dependency

Step and job/component type mapping

step in SDK v1 job type in SDK v2 component type in SDK v2
adla_step None None
automl_step automl job automl component
azurebatch_step None None
command_step command job command component
data_transfer_step None None
databricks_step None None
estimator_step command job command component
hyper_drive_step sweep job None
kusto_step None None
module_step None command component
mpi_step command job command component
parallel_run_step Parallel job Parallel component
python_script_step command job command component
r_script_step command job command component
synapse_spark_step spark job spark component

Published pipelines

After you create and run a pipeline, you can publish the pipeline so that it runs with different inputs. This feature was known as Published Pipelines. Batch Endpoint proposes a similar yet more powerful way to handle multiple assets running under a durable API. For this reason, the Published pipelines functionality is now part of Pipeline component deployments in batch endpoints.

Batch endpoints decouple the interface (endpoint) from the actual implementation (deployment). They enable you to decide which deployment serves as the default implementation of the endpoint. Pipeline component deployments in batch endpoints allow you to deploy pipeline components instead of pipelines, which makes better use of reusable assets for organizations looking to streamline their MLOps practice.

The following table shows a comparison of each of the concepts:

Concept SDK v1 SDK v2
Pipeline's REST endpoint for invocation Pipeline endpoint Batch endpoint
Pipeline's specific version under the endpoint Published pipeline Pipeline component deployment
Pipeline's arguments on invocation Pipeline parameter Job inputs
Job generated from a published pipeline Pipeline job Batch job

For specific guidance about how to migrate to batch endpoints, see Upgrade pipeline endpoints to SDK v2.

For more information, see the following documentation: