Skip to content

3.2 Mastering LLM Workflows with Kubeflow Pipelines

In this section, we will delve into how to orchestrate and automate machine learning workflows using Kubeflow Pipelines. Kubeflow Pipelines is an open-source framework designed to simplify the construction and management of machine learning pipelines. This powerful tool enables data scientists, ML engineers, and developers to define, deploy, and manage complex workflows efficiently.

Before we begin, it's important to understand the significance of automating workflows in machine learning projects. Automation not only saves time but also ensures consistency and repeatability in experiments, which are crucial for developing robust machine learning models.

Setting Up Kubeflow Pipelines

First, we need to import the necessary modules from the Kubeflow Pipelines SDK. The SDK provides us with the building blocks needed to define our pipeline.

# Importing the Domain-Specific Language (DSL) and compiler modules from Kubeflow Pipelines SDK
from kfp import dsl
from kfp import compiler
  • The dsl module contains the decorators and classes for defining the components and structure of the pipeline.
  • The compiler module is used to compile the pipeline into a format that can be executed by the Kubeflow Pipelines engine.

Handling Warnings

Machine learning projects often involve using libraries and frameworks that are under active development. This means you may occasionally encounter warnings about future changes or deprecations in the tools you're using. While it's important to be aware of these warnings, they can clutter your output and distract from more immediate concerns. For the purposes of our guidebook, we'll demonstrate how to suppress specific types of warnings, particularly those related to future changes in Kubeflow Pipelines.

# Suppressing FutureWarnings that originate from the Kubeflow Pipelines SDK
import warnings
warnings.filterwarnings("ignore", category=FutureWarning, module='kfp.*')
  • We import the warnings module, which provides functions to manage warnings issued by Python programs.
  • The warnings.filterwarnings function is used to ignore specific categories of warnings. In this case, we're ignoring FutureWarning categories that originate from modules starting with kfp.. This makes our output cleaner and focuses on warnings that are more relevant to our immediate development tasks.

Practical Tips

  • Stay Informed: While suppressing warnings can be useful for readability, it's important to periodically check the documentation or release notes of the Kubeflow Pipelines project. This ensures you're aware of any significant changes that may affect your pipelines.
  • Selective Suppression: Apply warning suppression selectively. Suppressing all warnings indiscriminately can hide important issues that need attention. Focus on warnings that you've evaluated and determined to be non-critical for your current project phase.

Further Reading

For those interested in exploring more about Kubeflow Pipelines and how it fits into the broader landscape of machine learning operations (MLOps), the following resources might be helpful:

By understanding and utilizing Kubeflow Pipelines, you can significantly enhance the efficiency and reliability of your machine learning workflows.

Understanding Components and Pipelines in Kubeflow

Kubeflow Pipelines is a versatile framework that structures machine learning workflows into manageable, reusable components and pipelines. This structure is fundamental for building scalable and maintainable machine learning systems. Let's break down these key concepts:

  • Components: These are self-contained sets of code that perform specific tasks within your machine learning workflow. Each component can be thought of as a building block that does one part of the job. For example, one component might preprocess your data, another could train a model, and yet another might deploy the model to a production environment.
  • Pipelines: A pipeline is a collection of components arranged in a specific order to form an end-to-end machine learning workflow. The pipeline defines how the outputs of one component can be used as inputs to another, facilitating a seamless flow of data through the entire process.

Simple Pipeline Example: Creating a "Say Hello" Component

To illustrate how components work within Kubeflow Pipelines, let's start with a simple example. We'll create a component that takes a name as input and returns a greeting message. This example, while basic, demonstrates how to define a component using the Kubeflow Pipelines SDK.

# Importing the DSL module from Kubeflow Pipelines to define components and pipelines
from kfp import dsl

# Defining a simple component using the @dsl.component decorator
@dsl.component
def greet_person(name: str) -> str:
    # Constructing a greeting message by combining the string "Hello" with the input name
    greeting_message = f'Hello, {name}!'

    # The component returns the constructed greeting message
    return greeting_message
  • The @dsl.component decorator is used to define a function as a pipeline component. This tells Kubeflow Pipelines to treat this function as a self-contained step in a pipeline.
  • The function greet_person takes a single argument, name, which it expects to be a string. It uses this name to construct a greeting message.
  • The function then returns this greeting message, also as a string. In a more complex pipeline, this output could be passed to other components for further processing.

Practical Tips

  • Define Clear Component Interfaces: When designing components, clearly define the inputs and outputs. This clarity is crucial for integrating the component into pipelines and ensuring compatibility with other components.
  • Reusability: Design components with reusability in mind. A well-designed component can be a valuable asset in multiple pipelines or different projects.

Further Reading

To deepen your understanding of Kubeflow Pipelines and how to construct more complex workflows, consider exploring:

This simple component example lays the groundwork for understanding how tasks in machine learning workflows can be encapsulated and automated using Kubeflow Pipelines. As we progress, we'll explore how to chain multiple components together to form comprehensive pipelines capable of handling sophisticated machine learning tasks.

Working with Kubeflow Pipeline Components: Outputs and Task Objects

After defining a Kubeflow Pipeline component with the @dsl.component decorator, it's crucial to understand how the function behaves within the context of a pipeline. Unlike typical Python functions that return specific data types directly, a function decorated with @dsl.component behaves differently when called within a pipeline.

Understanding PipelineTask Objects

When you invoke a function defined as a Kubeflow Pipeline component, instead of returning the expected data directly (like a string or integer), it returns an instance of a PipelineTask object. This object represents the execution of the component within a pipeline and can be used to pass data to subsequent components.

# Assigning the result of the component function to a variable
hello_task = greet_person(name="Erwin")
print(hello_task)

In this code snippet, greet_person(name="Erwin") doesn't return a greeting message directly. Instead, it returns a PipelineTask object, which we assign to hello_task. Printing hello_task will not show the greeting message but information about the PipelineTask instance.

Accessing Output Data with PipelineTask.output

To utilize the output of a component within a pipeline, we access the .output attribute of the PipelineTask object. This attribute allows the output data of one component to be passed as input to another, facilitating data flow through the pipeline.

# Accessing the output of the component
print(hello_task.output)

The .output attribute of a PipelineTask object will be of a built-in data type that the Kubeflow Pipelines framework recognizes, such as 'String', 'Integer', 'Float', 'Boolean', 'List', or 'Dict'. This ensures compatibility and ease of data exchange between components in the pipeline.

Keyword Arguments in Component Functions

It's important to note that when passing values to a component function, you must use keyword arguments. This requirement ensures clarity and prevents errors that could arise from positional arguments, especially in complex pipelines where components have multiple inputs.

# This will result in an error
# hello_task = greet_person("Erwin")

# Correct way to call the component function, using keyword arguments
hello_task = greet_person(name="Erwin")

Attempting to call a component function with positional arguments will result in an error. This design encourages explicit specification of parameters, enhancing code readability and reducing the likelihood of mistakes.

Practical Tips

  • Explicit Argument Naming: Always use keyword arguments when calling component functions. This practice improves code clarity and ensures that your pipeline definitions are easy to read and maintain.
  • Managing Outputs: Remember that component outputs are accessed via the PipelineTask.output attribute. Plan your pipeline's data flow carefully, considering how data will be passed between components.

By understanding these aspects of Kubeflow Pipeline components, you're better equipped to build complex and efficient machine learning workflows. These principles ensure that your pipelines are robust, maintainable, and scalable.

Chain Components in Kubeflow Pipelines: Passing Outputs

Building upon our understanding of Kubeflow Pipeline components, we'll now explore how to create a pipeline where one component's output serves as input to another. This process exemplifies the power of Kubeflow Pipelines in orchestrating complex workflows.

Defining a Dependent Component

Let's define a second component that takes the output of the first component (our greeting message) and appends a question to it, asking how the person is doing. This example illustrates how to define components that depend on the output of preceding components in a pipeline.

# Importing the DSL module from Kubeflow Pipelines to define components
from kfp import dsl

# Defining a component that depends on the output of another component
@dsl.component
def ask_about_wellbeing(greeting_message: str) -> str:
    # Constructing a new message that includes the greeting and a follow-up question
    follow_up_message = f"{greeting_message}. How are you?"

    # The component returns the new message
    return follow_up_message

Passing Component Outputs

After defining the second component, we demonstrate how to pass the output of the first component (greet_person) as an input to the second component (ask_about_wellbeing). This step is crucial for chaining components together in a workflow.

# Creating a task for the first component and storing its output
greeting_task = greet_person(name="Erwin")

# Passing the output of the first component to the second component
wellbeing_task = ask_about_wellbeing(greeting_message=greeting_task.output)
print(wellbeing_task)
print(wellbeing_task.output)

In this code, greeting_task.output is passed as the greeting_message input to ask_about_wellbeing. This showcases how data flows from one component to another in a Kubeflow Pipeline.

Common Mistakes: Passing the Correct Object Type

It's important to pass the .output attribute of a PipelineTask object rather than the PipelineTask object itself. Passing the wrong type will lead to errors, as the component expects a built-in data type, not a PipelineTask.

# Incorrect usage: passing the PipelineTask object instead of its output
# This will result in an error
# wellbeing_task = ask_about_wellbeing(greeting_message=greeting_task)

# Correct usage: passing the output of the PipelineTask object
wellbeing_task = ask_about_wellbeing(greeting_message=greeting_task.output)

Practical Tips

  • Always Pass .output for Dependency: When connecting components, ensure you're passing the .output attribute of the preceding component's task object. This mistake is common but easy to avoid with careful code review.
  • Test Components Individually: Before integrating components into a larger pipeline, test them individually to ensure they work as expected. This approach helps identify and fix issues early in the development process.

By mastering the chaining of components in Kubeflow Pipelines, you can construct sophisticated machine learning workflows that are modular, easy to read, and flexible. This methodology not only enhances collaboration among team members but also facilitates the reuse of components across different projects, significantly speeding up the development process.

Constructing and Understanding Pipelines in Kubeflow

The essence of Kubeflow Pipelines lies in its ability to orchestrate complex workflows. A pipeline in Kubeflow is a high-level structure that connects multiple components, allowing data to flow from one component to another and creating an end-to-end workflow. This section demonstrates how to define a simple pipeline that utilizes our previously defined components.

Defining a Pipeline

We'll create a pipeline that strings together the greet_person and ask_about_wellbeing components. The pipeline takes a name, uses it to greet the person, and then asks how they are doing. This example showcases how to define a pipeline and the correct way to handle component outputs within it.

# Importing the DSL module to define pipelines
from kfp import dsl

# Defining a pipeline that orchestrates our greeting and follow-up components
@dsl.pipeline
def hello_and_wellbeing_pipeline(recipient_name: str) -> str:
    # Creating a task for the greet_person component
    greeting_task = greet_person(name=recipient_name)

    # Creating a task for the ask_about_wellbeing component, using the output of the greeting_task
    wellbeing_task = ask_about_wellbeing(greeting_message=greeting_task.output)

    # Correctly returning the output of the wellbeing_task, which is the final message
    return wellbeing_task.output

In this pipeline, the recipient_name parameter is passed directly to the greet_person component. The output of greet_person (greeting_task.output) is then passed as an input to the ask_about_wellbeing component. The pipeline returns the output of the wellbeing_task, demonstrating how data flows through the pipeline.

Execution and Output Handling

When you run the pipeline, you might expect to directly receive the final string output ("Hello, Erwin. How are you?"). However, due to the nature of Kubeflow Pipelines, the pipeline function itself returns a PipelineTask object, not the raw output data.

# Executing the pipeline with a specified recipient name
pipeline_output = hello_and_wellbeing_pipeline(recipient_name="Erwin")
print(pipeline_output)

This behavior underscores a key point: in Kubeflow Pipelines, the execution of a pipeline defines a workflow. The actual running of this workflow happens within a Kubeflow Pipelines environment, where the data flows between components as specified, and outputs are handled according to the pipeline's structure.

Error Handling: Incorrect Return Types

Attempting to return a PipelineTask object directly from a pipeline, rather than its .output, results in an error. This is because the pipeline's return value should be the type of data produced by the final component, consistent with the expected outputs.

# Attempting to define a pipeline that incorrectly returns a PipelineTask object
@dsl.pipeline
def hello_and_wellbeing_pipeline_with_error(recipient_name: str) -> str:
    greeting_task = greet_person(name=recipient_name)
    wellbeing_task = ask_about_wellbeing(greeting_message=greeting_task.output)

    # Incorrectly returning the PipelineTask object itself
    return wellbeing_task
    # This will result in an error

Practical Tips

  • Return Types: Ensure that the return type of a pipeline matches the type of data produced by its final component. This is crucial for the correct execution and output handling of the pipeline.
  • Pipeline Execution: Remember that executing a pipeline definition in a script or notebook prepares the workflow. Actual execution occurs within the Kubeflow Pipelines environment, where the infrastructure to run the pipeline is available.

Through this example, you've seen how to define a simple yet effective pipeline in Kubeflow. This process highlights the importance of understanding component outputs, the flow of data, and the orchestration capabilities of Kubeflow Pipelines. These concepts are foundational for building scalable and efficient machine learning workflows.

Implementing and Executing a Kubeflow Pipeline

Implementing a Kubeflow Pipeline involves several key steps: defining the pipeline components, orchestrating these components into a pipeline, compiling the pipeline into an executable format, and finally, executing the pipeline in a suitable environment. Here, we will focus on these steps using the hello_and_wellbeing_pipeline as our example.

Compiling the Pipeline

To deploy and run our pipeline, we must first compile it into a format that the execution environment understands. Kubeflow Pipelines uses YAML files for this purpose. The compilation process translates our Python-defined pipeline into a static configuration that outlines the pipeline's structure, components, and data flow.

# Importing the compiler module from Kubeflow Pipelines SDK
from kfp import compiler

# Compiling the pipeline into a YAML file
compiler.Compiler().compile(hello_and_wellbeing_pipeline, 'pipeline.yaml')

This step generates a file named pipeline.yaml, which contains the compiled version of our pipeline. This YAML file is what we will deploy to our execution environment.

Viewing the Compiled Pipeline

After compiling the pipeline, you might want to inspect the YAML file to understand how the pipeline is structured in this format. While this step is optional, it can provide valuable insights into the compilation process and the resulting pipeline configuration.

# Viewing the contents of the compiled pipeline YAML file
!cat pipeline.yaml

This command prints the contents of pipeline.yaml to your screen, allowing you to review the compiled pipeline's structure, components, and configurations.

Executing the Pipeline

To execute the compiled pipeline, we'll use Vertex AI Pipelines, a managed, serverless environment provided by Google Cloud. This environment allows you to run your pipeline without worrying about the underlying infrastructure.

First, we need to define the pipeline arguments. These arguments represent the inputs to our pipeline, allowing us to customize its behavior for different runs.

# Defining the pipeline arguments
pipeline_arguments = {
    "recipient_name": "World!",
}

Next, we use the PipelineJob class from the google.cloud.aiplatform module to configure and submit our pipeline for execution.

# Importing PipelineJob from the Google Cloud AI Platform SDK
from google.cloud.aiplatform import PipelineJob

# Configuring the pipeline job
job = PipelineJob(
        # Path to the compiled pipeline YAML file
        template_path="pipeline.yaml",
        # Display name for the pipeline job
        display_name="hello_and_wellbeing_ai_pipeline",
        # Pipeline arguments
        parameter_values=pipeline_arguments,
        # Region where the pipeline will be executed
        location="us-central1",
        # Directory for storing temporary files during execution
        pipeline_root="./",
)

# Submitting the pipeline job for execution
job.submit()

# Checking the status of the pipeline job
print(job.state)

This script configures a pipeline job with our compiled pipeline.yaml, sets the display name, specifies the input parameters, and defines the execution region and temporary file storage location. The job.submit() method submits the pipeline for execution in Vertex AI Pipelines.

Note on Execution

Due to classroom or notebook environment restrictions, the actual execution of this pipeline in Vertex AI Pipelines cannot be demonstrated here. However, by running the provided code in your own Google Cloud environment, you can deploy and execute the pipeline.

Summary

This guide has walked you through the process of implementing a Kubeflow Pipeline, from defining components and orchestrating them into a pipeline, to compiling the pipeline into a deployable format, and finally, executing the pipeline in a managed environment. By mastering these steps, you can leverage Kubeflow Pipelines to automate and scale your machine learning workflows efficiently.

Automating and Orchestrating a Supervised Tuning Pipeline with Kubeflow

In this example, we explore how to automate and orchestrate a Supervised Tuning Pipeline for a foundation model called PaLM 2 by Google, using Kubeflow Pipelines. This approach emphasizes the practicality of reusing existing pipelines to accelerate the development and deployment of machine learning models, especially when dealing with large, complex models like PaLM 2.

Reusing Existing Pipelines for Efficiency

Reusing an existing pipeline significantly reduces the development time and effort by leveraging pre-built workflows. This not only speeds up the experimentation process but also ensures that best practices embedded in the original pipeline are maintained. In this scenario, we focus on a Parameter-Efficient Fine-Tuning (PEFT) pipeline for PaLM 2, provided by Google. This allows us to fine-tune the model on our specific dataset without starting from scratch.

Preparing the Data and Model Versioning

For fine-tuning, we utilize two JSONL files that contain training and evaluation data respectively. Removing timestamps from the files ensures consistency for all learners and simplifies the data preparation process.

TRAINING_DATA_URI = "./tune_data_stack_overflow_python_qa.jsonl"
EVALUATION_DATA_URI = "./tune_eval_data_stack_overflow_python_qa.jsonl"

Versioning the model is a critical practice in machine learning operations (MLOps), enabling reproducibility, auditing, and rollbacks. In this example, we append the current date and time to the model name to create a unique version identifier.

import datetime
date = datetime.datetime.now().strftime("%H:%d:%m:%Y")
MODEL_NAME = f"deep-learning-ai-model-{date}"

Configuring the Pipeline

We then specify key parameters for the PaLM model tuning:

  • TRAINING_STEPS: Defines the number of training steps. For extractive QA, a range of 100-500 is recommended.
  • EVALUATION_INTERVAL: Sets how frequently the model is evaluated during training. The default is every 20 steps.
TRAINING_STEPS = 200
EVALUATION_INTERVAL = 20

Authentication and project setup are necessary steps to access Google Cloud resources and services. The authenticate function from a utility script provides the credentials and project ID needed to configure the pipeline execution environment.

from utils import authenticate
credentials, PROJECT_ID = authenticate()
REGION = "us-central1"

Defining Pipeline Arguments

The next step involves defining the pipeline arguments. These arguments specify the inputs and configurations for the pipeline, tailoring the fine-tuning process to our specific requirements.

pipeline_arguments = {
    "model_display_name": MODEL_NAME,
    "location": REGION,
    "large_model_reference": "text-bison@001",
    "project": PROJECT_ID,
    "train_steps": TRAINING_STEPS,
    "dataset_uri": TRAINING_DATA_URI,
    "evaluation_interval": EVALUATION_INTERVAL,
    "evaluation_data_uri": EVALUATION_DATA_URI,
}

Executing the Pipeline

Finally, we set up and submit the pipeline job for execution using the PipelineJob class. This step includes specifying the path to the reusable pipeline YAML file, the display name for the job, pipeline arguments, execution region, and pipeline root for temporary files. Enabling caching optimizes the execution by reusing outputs of components that have not changed.

pipeline_root = "./"

job = PipelineJob(
        template_path=template_path,
        display_name=f"deep_learning_ai_pipeline-{date}",
        parameter_values=pipeline_arguments,
        location=REGION,
        pipeline_root=pipeline_root,
        enable_caching=True,
)

job.submit()
print(job.state)

Conclusion

This example illustrates the process of automating and orchestrating a Supervised Tuning Pipeline for fine-tuning a foundation model using Kubeflow Pipelines. By reusing an existing pipeline, specifying key parameters, and executing the pipeline in a managed environment, we can efficiently fine-tune large models like PaLM 2 on specific datasets. This approach not only accelerates the model development process but also incorporates best practices for MLOps, including versioning, reproducibility, and efficient resource utilization.

Theory questions:

  1. Explain the role of Kubeflow Pipelines in automating machine learning workflows and its significance in ensuring consistency and repeatability in experiments.
  2. Describe the function of the dsl and compiler modules within the Kubeflow Pipelines SDK.
  3. How can future warnings related to Kubeflow Pipelines be managed to maintain output readability without overlooking important updates?
  4. Why is it important to design machine learning pipeline components with clear interfaces and reusability in mind?
  5. Explain the purpose of the @dsl.component decorator in defining a Kubeflow Pipeline component.
  6. Describe the behavior and utility of the PipelineTask object when a function decorated with @dsl.component is called within a pipeline.
  7. How can outputs from one Kubeflow Pipeline component be passed as inputs to another component?
  8. Discuss the significance of using keyword arguments when invoking Kubeflow Pipeline component functions.
  9. Outline the process of chaining components in Kubeflow Pipelines and the importance of passing the .output attribute for data flow.
  10. How is a pipeline in Kubeflow defined, and what are the key considerations for ensuring its correct execution and output handling?
  11. Describe the steps involved in compiling, viewing, and executing a Kubeflow Pipeline, including the role of YAML files in the compilation process.
  12. Explain how reusing existing pipelines, like the Supervised Tuning Pipeline for PaLM 2, can enhance efficiency and best practices in machine learning projects.
  13. Discuss the importance of data and model versioning in machine learning operations (MLOps) and provide an example of creating a unique model version identifier.
  14. How are pipeline arguments defined and used in the configuration of a Kubeflow Pipeline for model tuning?
  15. Reflect on the benefits and challenges of automating and orchestrating complex machine learning workflows using Kubeflow Pipelines, especially in the context of fine-tuning large models like PaLM 2.

Practice questions:

  1. Setup Kubeflow Pipelines SDK: Write a Python script to import the necessary modules (dsl and compiler) from the Kubeflow Pipelines SDK. Suppress FutureWarning warnings originating from the Kubeflow Pipelines SDK using the warnings module.

  2. Define a Simple Pipeline Component: Using the Kubeflow Pipelines SDK, define a simple component named add_numbers that takes two integers as input and returns their sum. Use the @dsl.component decorator to define this component.

  3. Suppress Specific Warnings: Modify the Python script provided for suppressing warnings so that it suppresses warnings of the category DeprecationWarning originating from any module. Use the warnings module for this task.

  4. Chain Components in a Pipeline: Create two Kubeflow Pipeline components where the first component generates a number (e.g., returns a fixed integer), and the second component receives this number and doubles it. Define a pipeline that chains these two components, demonstrating how to pass outputs from one component to another.

  5. Compile and Prepare a Pipeline for Execution: Given a simple pipeline definition (like the one from the previous task), write a Python script to compile this pipeline into a YAML file using Kubeflow Pipeline's compiler module.

  6. Handling PipelineTask Objects: Write a Python function that demonstrates how to call a Kubeflow Pipeline component, capture its return value (a PipelineTask object), and access its output. This function does not need to be part of a pipeline; it should simply illustrate the concept with a theoretical component.

  7. Error Handling in Pipeline Definitions: Write a Python script that tries to define a Kubeflow Pipeline in an incorrect manner (for example, by attempting to return a PipelineTask object directly from the pipeline function) and then corrects the mistake. Include comments explaining why the initial attempt would fail and how the correction fixes the issue.

  8. Automating Data Preparation for Model Training: Create a Python script that simulates the process of preparing data for a machine learning model. The script should read data from a JSON file, perform a simple transformation (e.g., filtering or mapping), and save the result to another JSON file. This task mimics the kind of component that might be used in a data preprocessing step in a Kubeflow Pipeline.

  9. Implementing Model Versioning in a Pipeline: Write a Python function that generates a unique model name by appending the current date and time to a base model name string. This function illustrates a practice for versioning models in machine learning operations.

  10. Parameterize and Execute a Kubeflow Pipeline: Assuming the existence of a compiled Kubeflow Pipeline YAML file, write a Python script that defines pipeline arguments (such as recipient_name for a greeting pipeline) and demonstrates how you would submit this pipeline for execution using a hypothetical execution environment's API.