3.2 Воркфлоу с Kubeflow Pipelines
Рассмотрим, как оркестрировать и автоматизировать ML‑воркфлоу с помощью Kubeflow Pipelines — открытого фреймворка, упрощающего создание, развертывание и эксплуатацию сложных цепочек шагов для дата‑сайентистов, ML‑инженеров и разработчиков. Автоматизация экономит время, обеспечивает воспроизводимость и стабильность результатов — фундамент надёжных ML‑систем. Начнём с настройки SDK и «строительных блоков» пайплайнов:
Для начала подключим необходимые модули из SDK Kubeflow Pipelines. Эти модули являются «строительными блоками» для определения пайплайнов.
# Импортируем модули предметно-ориентированного языка (DSL) и компилятора из Kubeflow Pipelines SDK
from kfp import dsl
from kfp import compiler
Здесь dsl
предоставляет декораторы и классы для описания компонентов и структуры, а compiler
компилирует пайплайн в исполняемый формат для движка Kubeflow.
Библиотеки активно развиваются, поэтому предупреждения о будущих изменениях или устаревших функциях — не редкость; чтобы не перегружать вывод при обучении или демонстрации, их иногда выборочно скрывают (при этом разумно периодически проверять релиз‑ноты):
# Подавление предупреждений FutureWarning, исходящих из Kubeflow Pipelines SDK
import warnings
warnings.filterwarnings("ignore", category=FutureWarning, module='kfp.*')
Здесь используем стандартный модуль warnings
: фильтрация FutureWarning
из kfp.*
помогает сосредоточиться на важных сообщениях.
Полезно помнить: следите за релизами Kubeflow Pipelines и скрывайте предупреждения выборочно — полное подавление способно скрыть серьёзные проблемы.
За подробностями держите под рукой документацию Kubeflow Pipelines и руководства по MLOps (например, материалы Google Cloud по непрерывной доставке и автоматизированным пайплайнам): освоив их, вы заметно повысите эффективность и надёжность ML‑воркфлоу.
Kubeflow структурирует ML‑воркфлоу в повторно используемые компоненты и пайплайны: компоненты — изолированные шаги (препроцессинг, обучение, деплой и т. п.), а пайплайн — композиция, где выходы одного шага становятся входами следующих, формируя сложный процесс.
Для ориентира начнём с простого компонента «приветствие», который принимает имя и возвращает строку; это базовая демонстрация определения компонента в SDK Kubeflow Pipelines:
# Импортируем модуль DSL из Kubeflow Pipelines для определения компонентов и пайплайнов
from kfp import dsl
# Определяем простой компонент с помощью декоратора @dsl.component
@dsl.component
def greet_person(name: str) -> str:
# Формируем приветственное сообщение, объединяя строку "Hello" с входным именем
greeting_message = f'Hello, {name}!'
# Компонент возвращает сформированное приветственное сообщение
return greeting_message
@dsl.component
помечает функцию как компонент пайплайна; greet_person
принимает аргумент name
и формирует приветствие, которое в реальном пайплайне можно передать дальше.
Старайтесь держать интерфейсы входов/выходов ясными и проектируйте компоненты так, чтобы их было удобно переиспользовать в разных пайплайнах.
Для углубления разборов полезны разделы «Создание компонентов и пайплайнов» в документации и исследования по проектированию ML‑систем. Этот простой пример закладывает основу понимания инкапсуляции задач и автоматизации; дальше мы свяжем несколько компонентов в комплексные пайплайны для сложных ML‑кейсов.
При работе с компонентами важно понимать Outputs и PipelineTask
: функция, помеченная @dsl.component
, при вызове внутри пайплайна не возвращает «готовые» данные, а отдаёт объект PipelineTask
, представляющий выполнение шага и служащий связующим звеном для передачи данных дальше.
# Присваиваем результат вызова функции компонента переменной
hello_task = greet_person(name="Erwin")
print(hello_task)
Компонент возвращает не строку, а PipelineTask
— объект, представляющий выполнение шага в пайплайне.
Доступ к данным через .output
Чтобы использовать вывод компонента внутри пайплайна, необходимо обращаться к атрибуту .output
объекта PipelineTask
. Он позволяет передать результат одного шага на вход следующему, организуя таким образом поток данных в пайплайне.
# Получаем вывод компонента через атрибут .output
print(hello_task.output)
Атрибут .output
имеет встроенный тип данных (String/Integer/Float/Boolean/List/Dict), совместимый между компонентами пайплайна.
Только именованные аргументы
Важно: все параметры компонента передаются только по именам (с использованием ключевых аргументов). Это повышает ясность кода и предотвращает потенциальные ошибки, особенно когда у компонента несколько входов.
# Этот вызов приведет к ошибке, так как используется позиционный аргумент
# hello_task = greet_person("Erwin")
# Корректный способ вызова функции компонента с использованием именованных аргументов
hello_task = greet_person(name="Erwin")
Вызов с позиционными аргументами приведет к ошибке; использование именованных параметров повышает читаемость кода и снижает вероятность ошибок.
Советы
- Имена параметров: При вызове компонентов всегда используйте только именованные аргументы.
- Выводы компонентов: Планируйте передачу данных между шагами пайплайна через
PipelineTask.output
.
Понимание этих аспектов упрощает построение сложных и эффективных ML-пайплайнов, делая их устойчивыми, поддерживаемыми и масштабируемыми.
Связывание компонентов: передача выводов
Опираясь на наше понимание компонентов Kubeflow Pipeline, мы теперь рассмотрим, как создать пайплайн, в котором вывод одного компонента служит входом для другого. Этот процесс демонстрирует мощь Kubeflow Pipelines в оркестровке сложных рабочих процессов.
Компонент с зависимостью
Определим второй компонент, который принимает вывод первого (приветствие) и добавляет уточняющий вопрос. Этот пример иллюстрирует, как один шаг пайплайна может зависеть от результата предыдущего.
# Импортируем модуль DSL из Kubeflow Pipelines для определения компонентов
from kfp import dsl
# Определяем компонент, который зависит от вывода другого компонента
@dsl.component
def ask_about_wellbeing(greeting_message: str) -> str:
# Формируем новое сообщение, включающее приветствие и уточняющий вопрос
follow_up_message = f"{greeting_message}. How are you?"
# Компонент возвращает новое сообщение
return follow_up_message
Передача выходов между компонентами
Передадим вывод первого компонента (greet_person
) на вход второму (ask_about_wellbeing
). Это ключевой шаг в связывании компонентов и организации потока данных в пайплайне.
# Создаем задачу для первого компонента и сохраняем его вывод
greeting_task = greet_person(name="Erwin")
# Передаем вывод первого компонента в качестве входа второму компоненту
wellbeing_task = ask_about_wellbeing(greeting_message=greeting_task.output)
print(wellbeing_task)
print(wellbeing_task.output)
Здесь greeting_task.output
передается как greeting_message
во второй компонент, демонстрируя, как организуется поток данных между шагами пайплайна.
Распространенная ошибка: передача PipelineTask
вместо .output
При связывании компонентов важно передавать именно атрибут PipelineTask.output
, а не сам объект PipelineTask
. Попытка передать объект PipelineTask
приведет к ошибке, так как компонент ожидает встроенный тип данных, а не объект задачи.
# Некорректное использование: передача объекта PipelineTask вместо его вывода
# Этот вызов приведет к ошибке
# wellbeing_task = ask_about_wellbeing(greeting_message=greeting_task)
# Корректное использование: передача атрибута .output объекта PipelineTask
wellbeing_task = ask_about_wellbeing(greeting_message=greeting_task.output)
Практические советы
- Всегда передавайте
.output
для зависимости: При соединении компонентов убедитесь, что вы передаете атрибут.output
объекта задачи предшествующего компонента. Это распространенная ошибка, но ее легко избежать при тщательном обзоре кода. - Тестируйте компоненты индивидуально: Перед интеграцией компонентов в больший пайплайн тестируйте их по отдельности, чтобы убедиться, что они работают как ожидается. Этот подход помогает выявлять и исправлять проблемы на раннем этапе процесса разработки.
Овладев связыванием компонентов в Kubeflow Pipelines, вы сможете конструировать сложные рабочие процессы машинного обучения, которые будут модульными, легко читаемыми и гибкими. Эта методология не только улучшает сотрудничество между членами команды, но и облегчает повторное использование компонентов в различных проектах, значительно ускоряя процесс разработки.
Конструирование и понимание пайплайнов в Kubeflow
Суть Kubeflow Pipelines заключается в его способности оркестрировать сложные рабочие процессы. Пайплайн в Kubeflow — это высокоуровневая структура, которая связывает несколько компонентов, позволяя данным передаваться от одного компонента к другому и создавая сквозной воркфлоу. В этом разделе демонстрируется, как определить простой пайплайн, использующий наши ранее определенные компоненты.
Определение пайплайна
Мы создадим пайплайн, который связывает компоненты greet_person
и ask_about_wellbeing
. Этот пайплайн принимает имя, использует его для приветствия человека, а затем задает уточняющий вопрос. Данный пример показывает, как определить пайплайна и как правильно обрабатывать выводы компонентов внутри него.
# Импортируем модуль DSL для определения пайплайнов
from kfp import dsl
# Определяем пайплайн, который оркестрирует наши компоненты приветствия и уточняющего вопроса
@dsl.pipeline
def hello_and_wellbeing_pipeline(recipient_name: str) -> str:
# Создаем задачу для компонента greet_person
greeting_task = greet_person(name=recipient_name)
# Создаем задачу для компонента ask_about_wellbeing, используя вывод greeting_task
wellbeing_task = ask_about_wellbeing(greeting_message=greeting_task.output)
# Корректно возвращаем вывод wellbeing_task, который является финальным сообщением
return wellbeing_task.output
В этом пайплайне параметр recipient_name
передается напрямую в компонент greet_person
. Вывод greet_person
(greeting_task.output
) затем передается как вход в компонент ask_about_wellbeing
. Пайплайн возвращает вывод wellbeing_task
, демонстрируя, как данные протекают через пайплайн.
Выполнение и обработка вывода
При запуске пайплайна вы можете ожидать получить финальный строковый вывод напрямую ("Hello, Erwin. How are you?"). Однако из-за природы Kubeflow Pipelines сама функция пайплайна возвращает объект PipelineTask
, а не сырые данные вывода.
# Выполняем пайплайн с указанным именем получателя
pipeline_output = hello_and_wellbeing_pipeline(recipient_name="Erwin")
print(pipeline_output)
Это поведение подчеркивает ключевой момент: в Kubeflow Pipelines определение пайплайна описывает рабочий процесс. Фактическое выполнение этого рабочего процесса происходит в среде Kubeflow Pipelines, где данные передаются между компонентами, как указано, и выводы обрабатываются согласно структуре пайплайна.
Обработка ошибок: неправильные типы возврата
Попытка вернуть объект PipelineTask
напрямую из пайплайна, а не его .output
, приведет к ошибке. Это происходит потому, что возвращаемое значение пайплайна должно быть типом данных, производимых финальным компонентом, соответствующим ожидаемым выводам.
# Попытка определить пайплайн, который некорректно возвращает объект PipelineTask
@dsl.pipeline
def hello_and_wellbeing_pipeline_with_error(recipient_name: str) -> str:
greeting_task = greet_person(name=recipient_name)
wellbeing_task = ask_about_wellbeing(greeting_message=greeting_task.output)
# Некорректный возврат самого объекта PipelineTask
return wellbeing_task
# Этот вызов приведет к ошибке
Практические советы
- Типы возвращаемых значений: Убедитесь, что тип возвращаемого значения пайплайна соответствует типу данных, производимых его финальным компонентом. Это критически важно для корректного выполнения и обработки вывода пайплайна.
- Выполнение пайплайна: Помните, что выполнение определения пайплайна в скрипте или Jupyter-ноутбуке подготавливает рабочий процесс. Фактическое выполнение происходит в среде Kubeflow Pipelines, где доступна инфраструктура для запуска пайплайна.
На этом примере вы увидели, как определить простой, но эффективный пайплайн в Kubeflow. Этот процесс подчеркивает важность понимания выводов компонентов, потока данных и возможностей оркестровки Kubeflow Pipelines. Эти концепции являются основополагающими для построения масштабируемых и эффективных рабочих процессов машинного обучения.
Реализация и выполнение Kubeflow Pipeline
Реализация Kubeflow Pipeline включает несколько ключевых шагов: определение компонентов пайплайна, оркестрация этих компонентов в пайплайн, компиляция пайплайна в исполняемый формат и, наконец, запуск пайплайна в подходящей среде. Здесь мы сосредоточимся на этих шагах на примере hello_and_wellbeing_pipeline
.
Компиляция пайплайна
Чтобы развернуть и запустить наш пайплайн, сначала нужно скомпилировать его в формат, понятный среде выполнения. Для этого Kubeflow Pipelines использует файлы YAML. Процесс компиляции переводит определенный на Python пайплайн в статическую конфигурацию, описывающую структуру пайплайна, его компоненты и поток данных.
# Импорт модуля компилятора из Kubeflow Pipelines SDK
from kfp import compiler
# Компиляция пайплайна в YAML‑файл
compiler.Compiler().compile(hello_and_wellbeing_pipeline, 'pipeline.yaml')
Этот шаг создает файл pipeline.yaml
, содержащий скомпилированную версию нашего пайплайна. Именно этот YAML будет развернут в среде выполнения.
Просмотр скомпилированного пайплайна
После компиляции вы можете просмотреть YAML‑файл, чтобы понять, как в нем представлена структура пайплайна. Этот шаг необязателен, но помогает лучше разобраться в процессе компиляции и итоговой конфигурации пайплайна.
# Просмотр содержимого скомпилированного YAML‑файла пайплайна
!cat pipeline.yaml
Эта команда выводит содержимое pipeline.yaml
на экран, позволяя изучить структуру, компоненты и настройки скомпилированного пайплайна.
Запуск пайплайна
Для запуска скомпилированного пайплайна мы воспользуемся Vertex AI Pipelines — управляемой безсерверной средой от Google Cloud. Эта среда позволяет выполнять пайплайн, не заботясь о базовой инфраструктуре.
Сначала необходимо определить аргументы пайплайна. Эти аргументы представляют входные данные, позволяющие настраивать поведение пайплайна для разных запусков.
# Определение аргументов пайплайна
pipeline_arguments = {
"recipient_name": "World!",
}
Затем мы используем класс PipelineJob
из модуля google.cloud.aiplatform
, чтобы сконфигурировать и отправить пайплайн на выполнение.
# Импорт PipelineJob из SDK Google Cloud AI Platform
from google.cloud.aiplatform import PipelineJob
# Конфигурирование задания пайплайна
job = PipelineJob(
# Путь к скомпилированному YAML‑файлу пайплайна
template_path="pipeline.yaml",
# Отображаемое имя задания пайплайна
display_name="hello_and_wellbeing_ai_pipeline",
# Аргументы пайплайна
parameter_values=pipeline_arguments,
# Регион, в котором будет выполняться пайплайн
location="us-central1",
# Директория для хранения временных файлов во время выполнения
pipeline_root="./",
)
# Отправка задания пайплайна на выполнение
job.submit()
# Проверка статуса задания пайплайна
print(job.state)
Этот скрипт настраивает задачу пайплайна с нашим скомпилированным pipeline.yaml
, устанавливает отображаемое имя, указывает входные параметры и определяет регион выполнения и местоположение для хранения временных файлов. Метод job.submit()
отправляет пайплайн для выполнения в Vertex AI Pipelines.
Примечание о выполнении
Из-за ограничений среды класса или ноутбука, фактическое выполнение этого пайплайна в Vertex AI Pipelines не может быть продемонстрировано здесь. Однако, запустив предоставленный код в вашей собственной среде Google Cloud, вы можете развернуть и выполнить пайплайн.
Резюме
Это руководство провело вас через процесс реализации Kubeflow Pipeline: от определения компонентов и их оркестрации в пайплайн до компиляции в развертываемый формат и, наконец, выполнения в управляемой среде. Освоив эти шаги, вы сможете эффективно использовать Kubeflow Pipelines для автоматизации и масштабирования ваших ML-воркфлоу.
Автоматизация и оркестрация пайплайна тонкой настройки с Kubeflow
В этом примере мы изучаем, как автоматизировать и оркестрировать пайплайн тонкой настройки для базовой модели PaLM 2 от Google, используя Kubeflow Pipelines. Этот подход подчеркивает практичность повторного использования существующих пайплайнов для ускорения разработки и развертывания моделей машинного обучения, особенно при работе с большими, сложными моделями, такими как PaLM 2.
Повторное использование существующих пайплайнов для эффективности
Повторное использование существующего пайплайна значительно сокращает время и усилия на разработку, так как используются предварительно построенные воркфлоу. Это не только ускоряет процесс экспериментирования, но и обеспечивает сохранение лучших практик, заложенных в оригинальный пайплайн. В этом сценарии мы фокусируемся на пайплайне параметрически-эффективной тонкой настройки (PEFT) для PaLM 2, предоставленном Google. Это позволяет нам точно настроить модель на нашем конкретном наборе данных, не начиная с нуля.
Подготовка данных и версионирование модели
Для тонкой настройки мы используем два JSONL файла, которые содержат данные для обучения и оценки соответственно. Удаление временных меток из файлов обеспечивает согласованность для всех участников и упрощает процесс подготовки данных.
TRAINING_DATA_URI = "./tune_data_stack_overflow_python_qa.jsonl"
EVALUATION_DATA_URI = "./tune_eval_data_stack_overflow_python_qa.jsonl"
Версионирование модели — это критически важная практика в операциях машинного обучения (MLOps), которая обеспечивает воспроизводимость, аудит и возможность отката. В этом примере мы добавляем текущую дату и время к имени модели для создания уникального идентификатора версии.
import datetime
date = datetime.datetime.now().strftime("%H:%d:%m:%Y")
MODEL_NAME = f"deep-learning-ai-model-{date}"
Настройка пайплайна
Затем мы указываем ключевые параметры для настройки модели PaLM:
TRAINING_STEPS
: Определяет количество шагов обучения. Для экстрактивного QA рекомендуется диапазон 100-500.EVALUATION_INTERVAL
: Устанавливает, как часто модель оценивается во время обучения. По умолчанию каждые 20 шагов.
TRAINING_STEPS = 200
EVALUATION_INTERVAL = 20
Аутентификация и настройка проекта - это необходимые шаги для доступа к ресурсам и сервисам Google Cloud. Функция authenticate
из утилитарного скрипта предоставляет учетные данные и ID проекта, необходимых для настройки среды выполнения пайплайна.
from utils import authenticate
credentials, PROJECT_ID = authenticate()
REGION = "us-central1"
Определение аргументов пайплайна
Следующий шаг включает определение аргументов пайплайна. Эти аргументы указывают входы и конфигурации для пайплайна, адаптируя процесс тонкой настройки к нашим конкретным требованиям.
pipeline_arguments = {
"model_display_name": MODEL_NAME,
"location": REGION,
"large_model_reference": "text-bison@001",
"project": PROJECT_ID,
"train_steps": TRAINING_STEPS,
"dataset_uri": TRAINING_DATA_URI,
"evaluation_interval": EVALUATION_INTERVAL,
"evaluation_data_uri": EVALUATION_DATA_URI,
}
Выполнение пайплайна
Наконец, мы настраиваем и отправляем задачу пайплайна для выполнения, используя класс PipelineJob
. Этот шаг включает указание пути к многократно используемому YAML-файлу пайплайна, отображаемого имени для задачи, аргументов пайплайна, региона выполнения и корневой директории пайплайна для временных файлов. Включение кэширования оптимизирует выполнение за счет повторного использования выводов компонентов, которые не изменились.
pipeline_root = "./"
job = PipelineJob(
template_path=template_path,
display_name=f"deep_learning_ai_pipeline-{date}",
parameter_values=pipeline_arguments,
location=REGION,
pipeline_root=pipeline_root,
enable_caching=True,
)
job.submit()
print(job.state)
Заключение
Этот пример иллюстрирует процесс автоматизации и оркестрации пайплайна тонкой настройки для базовой модели с использованием Kubeflow Pipelines. Повторно используя существующий пайплайн, указывая ключевые параметры и выполняя пайплайн в управляемой среде, мы можем эффективно донастроить большие модели, такие как PaLM 2, на конкретных наборах данных. Этот подход не только ускоряет процесс разработки модели, но и включает лучшие практики MLOps, такие как версионирование, воспроизводимость и эффективное использование ресурсов.
Теоретические вопросы
- Роль Kubeflow Pipelines в автоматизации рабочих процессов машинного обучения (ML-воркфлоу) и обеспечении воспроизводимости.
- Функции модулей
dsl
иcompiler
в SDK. - Как управлять
FutureWarning
, сохраняя читаемость и не пропуская важные изменения? - Почему важны четкие интерфейсы и повторное использование компонентов?
- Назначение декоратора
@dsl.component
. - Что представляет собой объект
PipelineTask
при вызове компонента и какова его польза? - Как передать вывод одного компонента на вход другому?
- Почему компоненты принимают только именованные аргументы?
- Как связать компоненты и какова роль атрибута
.output
? - Как определяется пайплайн и на что следует обратить внимание для корректного вывода?
- Шаги компиляции, просмотра и запуска пайплайна, а также роль YAML.
- Как повторное использование пайплайнов (например, PEFT для PaLM 2) ускоряет работу и сохраняет лучшие практики?
- Зачем версионировать данные и модели в MLOps, и приведите пример идентификатора версии.
- Как задаются аргументы пайплайна для тонкой настройки модели?
- Плюсы и минусы автоматизации и оркестрации сложных рабочих процессов в Kubeflow (для больших моделей).
Практические задания
- Импортируйте
dsl
иcompiler
из SDK Kubeflow и подавитеFutureWarning
изkfp.*
. - Определите компонент
add_numbers(a:int, b:int)->int
с помощью@dsl.component
. - Подавите
DeprecationWarning
из любых модулей (черезwarnings
). - Создайте два компонента: один возвращает число, второй удваивает его; свяжите их в пайплайне.
- Скомпилируйте простой пайплайн в YAML с помощью
compiler
. - Покажите, как получить
PipelineTask
при вызове компонента и обратиться к его атрибуту.output
. - Продемонстрируйте ошибку возврата
PipelineTask
из функции пайплайна и исправьте её, добавив комментарии. - Напишите скрипт предобработки JSON в JSON (фильтрация/маппинг), имитирующий компонент препроцессинга.
- Функция версионирования: добавьте текущие дату и время к базовому имени модели.
- Задайте аргументы и отправьте скомпилированный YAML в среду исполнения (псевдо‑API).