Загрузка данных с помощью Mosaic Streaming

В этой статье описывается, как использовать Mosaic Streaming для преобразования данных из Apache Spark в формат, поддерживаемый PyTorch.

Mosaic Streaming — это библиотека загрузки данных с открытым исходным кодом. Он обеспечивает одноузловую или распределенную подготовку и оценку моделей глубокого обучения из наборов данных, которые уже загружены в качестве кадров данных Apache Spark. Mosaic Streaming в основном поддерживает Mosaic Composer, но также интегрируется с PyTorch, PyTorch Lightning и TorchDistributor. Потоковая передача Mosaic предоставляет ряд преимуществ перед традиционными загрузчиками данных PyTorch, включая:

  • Совместимость с любым типом данных, включая изображения, текст, видео и многомодальные данные.
  • Поддержка крупных поставщиков облачных хранилищ (AWS, OCI, GCS, Azure, Databricks UC Volume и любых объектных хранилищ, совместимых с S3, таких как Cloudflare R2, Coreweave, Backblaze B2 и т. д.)
  • Максимизация гарантий правильности, производительности, гибкости и простоты использования. Дополнительные сведения см. на странице основных функций .

Общие сведения о потоковой передаче Мозаики см. в документации по API потоковой передачи.

Примечание.

Мозаичная потоковая передача предварительно установлена во всех версиях Databricks Runtime 15.2 ML и выше.

Загрузка данных из Spark DataFrames с использованием Mosaic Streaming

Mosaic Streaming предоставляет простой рабочий процесс для преобразования из Apache Spark в формат Mosaic Data Shard (MDS), который затем можно загрузить для использования в распределенной среде.

Рекомендуемый рабочий процесс:

  1. Используйте Apache Spark для загрузки и, при необходимости, предварительной обработки данных.
  2. Используйте streaming.base.converters.dataframe_to_mds для сохранения фрейма данных на диск для временного хранения и/или в каталог Unity для постоянного хранения. Эти данные будут храниться в формате MDS и могут быть оптимизированы с поддержкой сжатия и хэширования. Дополнительные варианты использования также могут включать предварительную обработку данных с помощью пользовательских функций. Для получения дополнительной информации см. руководство по Spark DataFrame в MDS.
  3. Используется streaming.StreamingDataset для загрузки необходимых данных в память. StreamingDataset — это версия PyTorch IterableDataset, которая включает эластично детерминированное перемешивание, обеспечивающее быстрое возобновление в середине эпохи. Дополнительные сведения см. в документации StreamingDataset.
  4. Используется streaming.StreamingDataLoader для загрузки необходимых данных для обучения, оценки и тестирования. StreamingDataLoader — это версия DataLoader из PyTorch, которая предоставляет дополнительный интерфейс контрольных точек и возобновления, с помощью которого отслеживается количество примеров, обработанных моделью на этом ранге.

Полный пример см. в следующей записной книжке:

Упростите загрузку данных из Spark в PyTorch с помощью записной книжки Mosaic Streaming

Получить записную книжку

Устранение неполадок

Ошибка проверки подлинности

Если при загрузке данных из тома каталога Unity с помощью StreamingDatasetотображается следующая ошибка, настройте переменные среды, как показано ниже.

ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.

Примечание.

Если эта ошибка возникает при выполнении распределенного обучения с помощью TorchDistributor, необходимо также задать переменные среды на рабочих узлах.

db_host = "https://your-databricks-host.databricks.com"
db_token = "YOUR API TOKEN" # Create a token with either method from https://docs.databricks.com/en/dev-tools/auth/index.html#databricks-authentication-methods

def your_training_function():
  import os
  os.environ['DATABRICKS_HOST'] = db_host
  os.environ['DATABRICKS_TOKEN'] = db_token

# The above function can be distributed with TorchDistributor:
# from pyspark.ml.torch.distributor import TorchDistributor
# distributor = TorchDistributor(...)
# distributor.run(your_training_function)

Проблемы с разделяемой памятью в Python 3.11

Из-за проблем с реализацией общей памяти в Python 3.11, в Databricks Runtime 15.4 LTS для машинного обучения могут возникать временные проблемы. Эти проблемы можно избежать, выполнив обновление до Databricks Runtime 16.4 LTS для машинного обучения, так как Python 3.12 устраняет эти проблемы.