Skip to content

3.2 Workflow with Kubeflow Pipelines

Let’s look at how to orchestrate and automate ML workflows with Kubeflow Pipelines — an open framework that makes it easier for data scientists, ML engineers, and developers to build, deploy, and operate complex chains of steps. Automation saves time and ensures reproducibility and stability — the foundation of reliable ML systems. We start by setting up the SDK and the “building blocks” of pipelines.

First, import the required modules from the Kubeflow Pipelines SDK. These modules are the building blocks for defining pipelines.

# Import the DSL (domain‑specific language) and the compiler from the Kubeflow Pipelines SDK
from kfp import dsl
from kfp import compiler

Here, dsl provides decorators and classes for describing components and structure, and compiler compiles a pipeline to an executable format for the Kubeflow engine.

The libraries evolve quickly, so warnings about upcoming changes or deprecations are common. To keep output uncluttered during learning or demos, you can selectively hide them (but it’s wise to review release notes regularly):

# Suppress FutureWarning originating from the Kubeflow Pipelines SDK
import warnings
warnings.filterwarnings("ignore", category=FutureWarning, module='kfp.*')

This uses the standard warnings module to filter FutureWarning from kfp.*, helping you focus on important messages.

Keep in mind: follow Kubeflow Pipelines releases and suppress warnings selectively — fully silencing them can hide real problems.

For details, keep the Kubeflow Pipelines docs and MLOps guides handy (for example, Google Cloud materials on continuous delivery and automated pipelines). Mastering them markedly improves the efficiency and reliability of ML workflows.

Kubeflow structures an ML workflow into reusable components and pipelines: components are isolated steps (preprocessing, training, deployment, etc.), and a pipeline is the composition in which outputs of one step become inputs to subsequent ones, forming an end‑to‑end process.

As a reference point, start with a simple “greeting” component that takes a name and returns a string. This is a basic demonstration of defining a component with the Kubeflow Pipelines SDK:

# Import the DSL module to define components and pipelines
from kfp import dsl

# Define a simple component using the @dsl.component decorator
@dsl.component
def greet_person(name: str) -> str:
    # Form a greeting by combining "Hello" with the input name
    greeting_message = f'Hello, {name}!'

    # Return the constructed greeting message
    return greeting_message

The @dsl.component decorator marks the function as a pipeline component; greet_person accepts name and forms a greeting you can pass downstream in a real pipeline.

Keep input/output interfaces clear, and design components so they can be reused across pipelines.

When working with components, understand outputs and PipelineTask: a function marked with @dsl.component, when called inside a pipeline, doesn’t return “ready” data. It returns a PipelineTask object representing the step execution and acting as the link for passing data further.

# Assign the result of calling the component function to a variable
hello_task = greet_person(name="Erwin")
print(hello_task)

The component returns a PipelineTask, not a string.

Accessing data via .output

To use a component’s output inside a pipeline, refer to the .output attribute of the PipelineTask object. It lets you feed the result of one step into the next, organizing the pipeline’s dataflow.

# Access the component’s output via the .output attribute
print(hello_task.output)

The .output attribute has a built‑in data type (String/Integer/Float/Boolean/List/Dict) compatible across pipeline components.

Named arguments only

Important: all component parameters are passed by name (keyword arguments). This increases clarity and prevents errors, especially when a component has multiple inputs.

# This will raise an error because it uses a positional argument
# hello_task = greet_person("Erwin")

# Correct: call with a named argument
hello_task = greet_person(name="Erwin")

Tips - Parameter names: always call components with named arguments only. - Component outputs: plan data hand‑off between steps via PipelineTask.output.

Wiring components: passing outputs

Building on components, let’s create a pipeline where one component’s output serves as another’s input — a core capability of Kubeflow Pipelines.

A dependent component

Define a second component that accepts the first component’s greeting and appends a follow‑up question. This shows how one pipeline step can depend on the previous step’s result.

# Import DSL to define components
from kfp import dsl

# Define a component that depends on another component’s output
@dsl.component
def ask_about_wellbeing(greeting_message: str) -> str:
    # Form a new message that includes the greeting and a follow‑up question
    follow_up_message = f"{greeting_message}. How are you?"

    # Return the new message
    return follow_up_message

Passing outputs between components

Now pass the output of the first component (greet_person) as the input to the second (ask_about_wellbeing). This is the key step in wiring components and organizing the pipeline’s dataflow.

# Create a task for the first component and keep its output
greeting_task = greet_person(name="Erwin")

# Feed the first component’s output into the second component
wellbeing_task = ask_about_wellbeing(greeting_message=greeting_task.output)
print(wellbeing_task)
print(wellbeing_task.output)

Here, greeting_task.output is passed as greeting_message to the second component, demonstrating how data flows between pipeline steps.

A common mistake: passing PipelineTask instead of .output

When wiring components, be sure to pass the PipelineTask.output attribute — not the PipelineTask object itself. Passing the task object will fail because the component expects a built‑in data type, not a task object.

# Incorrect: passing a PipelineTask instead of its output — this will error
# wellbeing_task = ask_about_wellbeing(greeting_message=greeting_task)

# Correct: pass the task’s .output attribute
wellbeing_task = ask_about_wellbeing(greeting_message=greeting_task.output)

Practical tips

  • Always pass .output for dependencies: when wiring components, make sure to pass the predecessor task’s .output.
  • Test components individually: validate each component before integrating, to catch issues early.

Mastering component wiring in Kubeflow Pipelines lets you construct modular, readable, and flexible ML workflows. It also improves collaboration and encourages reuse across projects, accelerating development.

Building and understanding pipelines in Kubeflow

Kubeflow Pipelines orchestrate complex workflows. A pipeline links multiple components, letting data flow from one to another to form an end‑to‑end process. Here’s how to define a simple pipeline using the components above.

Defining a pipeline

We’ll create a pipeline that chains greet_person and ask_about_wellbeing. It accepts a name, uses it to greet the person, then asks a follow‑up. This shows how to define a pipeline and handle component outputs correctly.

# Import DSL to define pipelines
from kfp import dsl

# Define a pipeline that orchestrates the greeting and follow‑up components
@dsl.pipeline
def hello_and_wellbeing_pipeline(recipient_name: str) -> str:
    # Task for the greet_person component
    greeting_task = greet_person(name=recipient_name)

    # Task for ask_about_wellbeing, using greeting_task’s output
    wellbeing_task = ask_about_wellbeing(greeting_message=greeting_task.output)

    # Return the final message produced by wellbeing_task
    return wellbeing_task.output

The recipient_name parameter is passed to greet_person. Its output (greeting_task.output) becomes the input to ask_about_wellbeing. The pipeline returns wellbeing_task.output, illustrating dataflow through the pipeline.

Executing and handling output

When you “run” the pipeline definition in code, you might expect the final string directly (for example, "Hello, Erwin. How are you?"). But because of how Kubeflow Pipelines work, the pipeline function itself returns a PipelineTask, not raw output data.

# Run the pipeline with a recipient name
pipeline_output = hello_and_wellbeing_pipeline(recipient_name="Erwin")
print(pipeline_output)

This highlights a key point: a pipeline function describes a workflow; actual execution happens in the Kubeflow Pipelines environment, where data is passed between components and outputs are handled according to the pipeline graph.

Error handling: wrong return types

If you try to return a PipelineTask itself rather than its .output, the pipeline will fail. The pipeline’s return must be the data type produced by the final component, matching expected outputs.

# Incorrect pipeline that returns a PipelineTask object
@dsl.pipeline
def hello_and_wellbeing_pipeline_with_error(recipient_name: str) -> str:
    greeting_task = greet_person(name=recipient_name)
    wellbeing_task = ask_about_wellbeing(greeting_message=greeting_task.output)

    # Incorrect: returning the PipelineTask itself
    return wellbeing_task
    # This will error

Practical tips

  • Return types: ensure the pipeline’s return type matches the data type produced by its final component. This is critical for correct execution and output handling.
  • Pipeline execution: calling the pipeline definition in a script or notebook prepares the workflow. Actual execution happens in Kubeflow Pipelines, where the infrastructure runs the pipeline.

This example shows how to define a simple yet effective pipeline in Kubeflow. It underscores the importance of understanding component outputs, dataflow, and Kubeflow’s orchestration features. These concepts are foundational for building scalable, reliable ML workflows.

Implementing and Running a Kubeflow Pipeline

Implementing a Kubeflow Pipeline involves key steps: define components, orchestrate them into a pipeline, compile the pipeline to an executable format, and finally run it in a suitable environment. We illustrate these using hello_and_wellbeing_pipeline.

Compile the pipeline

Kubeflow Pipelines use YAML for the executable specification. Compilation converts the Python definition into a static configuration describing the pipeline DAG, components, and dataflow.

# Import the compiler from the Kubeflow Pipelines SDK
from kfp import compiler

# Compile the pipeline to a YAML file
compiler.Compiler().compile(hello_and_wellbeing_pipeline, 'pipeline.yaml')

This generates pipeline.yaml, a compiled representation of the pipeline. That YAML is what you deploy to the runtime.

Inspect the compiled pipeline

Viewing the YAML helps understand how the structure is captured. Optional but useful for learning and debugging.

# Inspect the compiled pipeline YAML
!cat pipeline.yaml

Run the pipeline

Use Vertex AI Pipelines (a managed, serverless environment on Google Cloud) to run the compiled pipeline without managing infrastructure.

First, define pipeline arguments — inputs that parameterize runs:

# Define pipeline arguments
pipeline_arguments = {
    "recipient_name": "World!",
}

Then use google.cloud.aiplatform.PipelineJob to configure and submit the run:

from google.cloud.aiplatform import PipelineJob

job = PipelineJob(
    template_path="pipeline.yaml",
    display_name="hello_and_wellbeing_ai_pipeline",
    parameter_values=pipeline_arguments,
    location="us-central1",
    pipeline_root="./",
)

job.submit()
print(job.state)

Note: due to class/notebook constraints, we don’t execute this here. Run it in your own Google Cloud project.

Summary

We covered implementing a Kubeflow Pipeline: defining components and a pipeline, compiling it to a deployable format, and running it in a managed environment. With these steps, you can automate and scale ML workflows effectively.

Automating and Orchestrating a Fine‑Tuning Pipeline with Kubeflow

As a practical example, automate and orchestrate a parameter‑efficient fine‑tuning (PEFT) pipeline for Google’s PaLM 2 using Kubeflow Pipelines. Reusing existing pipelines significantly reduces development time and preserves best practices.

Reusing existing pipelines for efficiency

Reusing a provided pipeline accelerates experimentation and deployment, especially with large models. Here we focus on Google’s PEFT pipeline for PaLM 2, which lets us fine‑tune a base model on our dataset without starting from scratch.

Data preparation and model versioning

Use two JSONL files for training and evaluation. Removing timestamps ensures consistency across collaborators.

TRAINING_DATA_URI = "./tune_data_stack_overflow_python_qa.jsonl"
EVALUATION_DATA_URI = "./tune_eval_data_stack_overflow_python_qa.jsonl"

import datetime
date = datetime.datetime.now().strftime("%H:%d:%m:%Y")
MODEL_NAME = f"deep-learning-ai-model-{date}"

Set core hyperparameters:

TRAINING_STEPS = 200
EVALUATION_INTERVAL = 20

Authenticate and set project context (example helper):

from utils import authenticate
credentials, PROJECT_ID = authenticate()
REGION = "us-central1"

Define pipeline arguments:

pipeline_arguments = {
    "model_display_name": MODEL_NAME,
    "location": REGION,
    "large_model_reference": "text-bison@001",
    "project": PROJECT_ID,
    "train_steps": TRAINING_STEPS,
    "dataset_uri": TRAINING_DATA_URI,
    "evaluation_interval": EVALUATION_INTERVAL,
    "evaluation_data_uri": EVALUATION_DATA_URI,
}

Submit the job via PipelineJob (enable caching to reuse unchanged step outputs):

from google.cloud.aiplatform import PipelineJob

pipeline_root = "./"

job = PipelineJob(
    template_path=template_path,
    display_name=f"deep_learning_ai_pipeline-{date}",
    parameter_values=pipeline_arguments,
    location=REGION,
    pipeline_root=pipeline_root,
    enable_caching=True,
)

job.submit()
print(job.state)

Conclusion

This example illustrates automating and orchestrating a fine‑tuning pipeline for a base model with Kubeflow Pipelines. By reusing an existing pipeline, specifying key parameters, and executing in a managed environment, you can efficiently fine‑tune large models like PaLM 2 on specific datasets. This approach accelerates development and embeds MLOps best practices such as versioning, reproducibility, and efficient resource use.

Theory Questions

  1. The role of Kubeflow Pipelines in automating ML workflows and ensuring reproducibility.
  2. The functions of the dsl and compiler modules in the SDK.
  3. How to manage FutureWarning while keeping logs readable without missing important changes.
  4. Why clear interfaces and reuse improve modularity and efficiency.
  5. The purpose of the @dsl.component decorator.
  6. What the PipelineTask object represents when calling a component and why it’s useful.
  7. How to pass one component’s output as another’s input.
  8. Why components accept only named arguments.
  9. How to wire components and the role of the .output attribute.
  10. How a pipeline is defined and what to watch for to return the correct value.
  11. Steps for compiling, inspecting, and running a pipeline, and the role of YAML.
  12. How reusing pipelines (e.g., PEFT for PaLM 2) speeds work and preserves best practices.
  13. Why to version data and models in MLOps; give an example of a version identifier.
  14. How to specify pipeline arguments for model fine‑tuning.
  15. Pros and cons of automating and orchestrating complex workflows in Kubeflow for large models.

Practical Tasks

  1. Import dsl and compiler from the Kubeflow SDK and suppress FutureWarning from kfp.*.
  2. Define a component add_numbers(a: int, b: int) -> int with @dsl.component.
  3. Suppress DeprecationWarning from any modules (via warnings).
  4. Create two components: one returns a number, the other doubles it; wire them in a pipeline.
  5. Compile a simple pipeline to YAML using compiler.
  6. Show how calling a component returns a PipelineTask and how to access .output.
  7. Demonstrate the error from returning a PipelineTask from a pipeline function, then fix it with comments.
  8. Write a JSON‑to‑JSON preprocessing script (filter/map) that mimics a preprocessing component.
  9. Add a function for versioning: append current date/time to a base model name.
  10. Provide arguments and submit the compiled YAML to a runtime (pseudo‑API).