Распределенное обучение моделей XGBoost с помощью xgboost.spark

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии.

Пакет Python xgboost>=1.7 содержит новый модуль xgboost.spark. Этот модуль включает оценщики PySpark xgboost xgboost.spark.SparkXGBRegressor, xgboost.spark.SparkXGBClassifier и xgboost.spark.SparkXGBRanker. Эти новые классы поддерживают включение оценщиков XGBoost в конвейеры SparkML. Дополнительные сведения об API см. в документации по API Spark для XGBoost.

Требования

Databricks Runtime 12.0 ML и более поздних версий.

параметры xgboost.spark

Оценки, определенные в модуле xgboost.spark , поддерживают большинство одинаковых параметров и аргументов, используемых в стандартном XGBoost.

  • Параметры конструктора класса, fit метода и predict метода в значительной степени идентичны параметрам в модуле xgboost.sklearn .
  • Именование, значения и значения по умолчанию в основном идентичны тем, которые описаны в параметрах XGBoost.
  • Исключения — это несколько неподдерживаемых параметров (напримерgpu_id, , nthreadsample_weight, eval_set) и pyspark определенных параметров оценки ,которые были добавлены (напримерfeaturesCol, labelCol, , ). use_gpuvalidationIndicatorCol Дополнительные сведения см. в документации по API Spark для XGBoost.

Распределенное обучение

Оценки PySpark, определенные в модуле xgboost.spark , поддерживают распределенное обучение XGBoost с помощью num_workers параметра. Чтобы использовать распределенное обучение, создайте классификатор или регрессию и задайте num_workers количество параллельных задач Spark во время распределенного обучения. Чтобы использовать все слоты задач Spark, задайте параметр num_workers=sc.defaultParallelism.

Рассмотрим пример.

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

Примечание.

  • Нельзя использовать mlflow.xgboost.autolog с распределенным XGBoost. Чтобы регистрировать модель Spark xgboost с помощью MLflow, используйте mlflow.spark.log_model(spark_xgb_model, artifact_path).
  • Вы не можете использовать распределенный XGBoost в кластере с включенным автомасштабированием. Новые рабочие узлы, начинающиеся в этой парадигме эластичного масштабирования, не могут получать новые наборы задач и оставаться бездействующими. Инструкции по отключению автомасштабирования см. в разделе "Включение автомасштабирования".

Включить оптимизацию для обучения на разреженном наборе данных признаков

Оценщики PySpark, определенные в модуле xgboost.spark, поддерживают оптимизацию для обучения на наборах данных с разреженными признаками. Чтобы включить оптимизацию разреженных наборов компонентов, необходимо предоставить набор данных методу fit, содержащему столбец признаков, состоящий из значений типа pyspark.ml.linalg.SparseVector, и задать параметр оценки enable_sparse_data_optim, равным True. Кроме того, необходимо установить параметр missing в 0.0.

Рассмотрим пример.

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

Обучение GPU

Оценки PySpark, определенные в модуле xgboost.spark , поддерживают обучение на GPU. Установите параметр use_gpu на True, чтобы включить обучение на GPU.

Примечание.

Для каждой задачи Spark, используемой в распределенном обучении XGBoost, используется только один GPU при установке аргумента use_gpuTrue. Databricks рекомендует использовать значение 1 по умолчанию для конфигурации spark.task.resource.gpu.amountкластера Spark. В противном случае дополнительные графические процессоры, выделенные этой задачей Spark, неактивны.

Рассмотрим пример.

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

Устранение неполадок

Во время обучения с несколькими узлами, если вы столкнулись с NCCL failure: remote process exited or there was a network error сообщением, это обычно указывает на проблему с сетевым обменом данными между графическими процессорами. Эта проблема возникает, когда NCCL (библиотека коллективных коммуникаций NVIDIA) не может использовать определенные сетевые интерфейсы для взаимодействия с GPU.

Чтобы устранить эту проблему, задайте для кластера параметр sparkConf на spark.executorEnv.NCCL_SOCKET_IFNAMEeth. Это по сути задает переменную среды NCCL_SOCKET_IFNAME_eth для всех рабочих в узле.

Пример записной книжки

В этой записной книжке показано использование пакета xgboost.spark Python с Spark MLlib.

Записная книжка PySpark-XGBoost

Получить ноутбук

Руководство по миграции для устаревшего sparkdl.xgboost модуля

  • Замените from sparkdl.xgboost import XgboostRegressor на from xgboost.spark import SparkXGBRegressor и замените from sparkdl.xgboost import XgboostClassifier на from xgboost.spark import SparkXGBClassifier.
  • Измените все имена параметров в конструкторе оценщика с верблюжьего стиля на стиль snake_case. Например, измените XgboostRegressor(featuresCol=XXX) на SparkXGBRegressor(features_col=XXX).
  • Параметры use_external_storage и external_storage_precision удалены. xgboost.spark Средства оценки используют API итерации данных DMatrix для более эффективного использования памяти. Больше не требуется использовать неэффективный внешний режим хранения. Для очень больших наборов данных Databricks рекомендует увеличить num_workers параметр, что делает так, что каждая задача обучения разделяет данные на меньшие, более управляемые разделы данных. Рассмотрим параметр num_workers = sc.defaultParallelism, который задает num_workers общее количество слотов задач Spark в кластере.
  • Для оценщиков, определенных в xgboost.spark, параметр num_workers=1 выполняет обучение модели с помощью одной задачи Spark. Это использует количество ядер ЦП, указанных параметром spark.task.cpusконфигурации кластера Spark, которое по умолчанию равно 1. Чтобы использовать больше ядер ЦП для обучения модели, увеличьте num_workers или spark.task.cpus. Нельзя задать параметр nthread или параметр n_jobs для оценщиков, определенных в xgboost.spark. Это поведение отличается от предыдущего поведения оценщиков, определенных в устаревшем пакете sparkdl.xgboost .

Преобразование sparkdl.xgboost модели в xgboost.spark модель

sparkdl.xgboost модели сохраняются в другом формате, чем xgboost.spark модели, и имеют разные параметры параметров. Используйте следующую служебную функцию для преобразования модели:

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

Если у вас есть модель pyspark.ml.PipelineModel, содержащая модель sparkdl.xgboost на последнем этапе, вы можете заменить этап модели sparkdl.xgboost преобразованной моделью xgboost.spark.

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)