Перенос данных пользовательских моделей на каталог Unity

Это важно

Эта функция доступна в бета-версии. Она не включена автоматически для всех клиентов, и функциональность может измениться. Чтобы запросить доступ, обратитесь к группе учетной записи Azure Databricks.

Узнайте, как настроить телеметрию конечных точек для сохранения журналов, трассировок и метрик OpenTelemetry из конечных точек, обслуживающих пользовательские модели , в таблицы каталога Unity. Используйте сохраненные данные телеметрии для выполнения анализа первопричин, мониторинга работоспособности конечной точки и соответствия требованиям к стандартным запросам SQL.

Требования

  • Рабочая область должна быть активирована для Unity Catalog. Хранилище по умолчанию (Arclight) не поддерживается.

  • У вас должны быть разрешения USE CATALOG, USE SCHEMA, CREATE TABLE и MODIFY на каталог Unity Catalog и схему, где хранятся журналы.

  • Существующая пользовательская модель, обслуживающая конечную точку или разрешения для создания.

  • Рабочая область должна находиться в поддерживаемом регионе:

    • canadacentral
    • westus
    • westus2
    • southcentralus
    • eastus
    • eastus2
    • centralus
    • northcentralus
    • swedencentral
    • westeurope
    • northeurope
    • uksouth
    • australiaeast
    • southeastasia

Шаг 1. Подготовка кода модели к измерениям

Добавьте инструментирование в код модели для записи данных телеметрии.

  1. Добавьте ведение журнала приложений в модель. Данные телеметрии конечной точки автоматически фиксируют стандартные выходные данные Python logging . Для простой регистрации логов не требуется инструментирование SDK OpenTelemetry.

    import logging
    
    class MyCustomModel(mlflow.pyfunc.PythonModel):
        def predict(self, context, model_input):
            # This log will be persisted to the <prefix>_otel_logs table
            logging.warning("Received inference request")
    
            try:
                # Your model logic here
                result = model_input * 2
                return result
            except Exception as e:
                # Error logs are also captured with severity 'ERROR'
                logging.error(f"Inference failed: {e}")
                raise e
    

    Для корневого уровня ведения журнала задано значение WARNING. Смотрите раздел "Устранение неполадок", чтобы изменить уровень ведения журнала.

  2. (Необязательно) Настройка специфицированных метрик и трассировок с помощью OpenTelemetry. Чтобы записать пользовательские метрики и трассировки за пределами базового ведения журнала, добавьте инструментирование пакета SDK OpenTelemetry в модель. Разверните следующий раздел для полного примера, в который показано, как создавать счетчики, диапазоны записей и присоединять настраиваемые атрибуты.

    Значок квадратных скобок. Пример: пользовательские метрики, диапазоны и ведение журнала моделей с помощью OpenTelemetry

    Замечание

    Из-за ограничений сериализации модели необходимо записать модель в отдельный файл, прежде чем начать ведение журнала, чтобы избежать ошибок, как показано ниже %%writefile return_input_model.py.

    %%writefile return_input_model.py
    import os
    
    import mlflow
    from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
    from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
    from opentelemetry.metrics import get_meter, set_meter_provider
    from opentelemetry.sdk.metrics import MeterProvider
    from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
    from opentelemetry.sdk.resources import Resource
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.trace.export import BatchSpanProcessor
    from opentelemetry.trace import get_tracer, set_tracer_provider
    
    # highlight-start
    # ---- OTel initialization (per-worker) ----
    resource = Resource.create({
        "worker.pid": str(os.getpid()),
    })
    
    otlp_trace_exporter = OTLPSpanExporter()
    tracer_provider = TracerProvider(resource=resource)
    tracer_provider.add_span_processor(BatchSpanProcessor(otlp_trace_exporter))
    set_tracer_provider(tracer_provider)
    
    otlp_metric_exporter = OTLPMetricExporter()
    metric_reader = PeriodicExportingMetricReader(otlp_metric_exporter)
    meter_provider = MeterProvider(metric_readers=[metric_reader], resource=resource)
    set_meter_provider(meter_provider)
    
    _tracer = get_tracer(__name__)
    _meter = get_meter(__name__)
    _prediction_counter = _meter.create_counter(
        name="prediction_count",
        description="Number of predictions made",
        unit="1"
    )
    # highlight-end
    
    
    class ReturnInputModel(mlflow.pyfunc.PythonModel):
        def load_context(self, context):
            # highlight-start
            self.tracer = _tracer
            self.prediction_counter = _prediction_counter
            # highlight-end
    
        def predict(self, context, model_input):
            # highlight-next-line
            with self.tracer.start_as_current_span("ReturnInputModel.predict") as span:
                # highlight-next-line
                span.set_attribute("input_shape", str(model_input.shape))
                # highlight-next-line
                span.set_attribute("input_columns", str(list(model_input.columns)))
                # highlight-next-line
                self.prediction_counter.add(1)
                return model_input
    
    mlflow.models.set_model(ReturnInputModel())
    
  3. Логируйте и зарегистрируйте модель.

    import pandas as pd
    import mlflow
    from mlflow.models import infer_signature
    
    # Prepare tabular input/output for signature (pyfunc expects DataFrame)
    input_df = pd.DataFrame({"inputs": ["hello world"]})
    output_df = input_df.copy()  # model returns input unchanged
    
    # Log the model with OpenTelemetry dependencies (using code-based logging to avoid serialization issues)
    with mlflow.start_run():
        signature = infer_signature(input_df, output_df)
    
        model_info = mlflow.pyfunc.log_model(
            name="model",
            python_model="return_input_model.py",
            signature=signature,
            input_example=input_df,
            pip_requirements=[
                "mlflow==3.1",
                # highlight-next-line
                "opentelemetry-sdk",
                # highlight-next-line
                "opentelemetry-exporter-otlp-proto-http",
            ],
        )
    
    # Register with express deployment environment packing
    # Use Unity Catalog name: catalog.schema.model_name
    registered = mlflow.register_model(
        model_info.model_uri,
        MODEL_NAME,
        env_pack="databricks_model_serving"
    )
    

Шаг 2. Подготовка места назначения каталога Unity

Перед созданием конечной точки убедитесь, что у вас есть каталог и схема, готовые к получению данных телеметрии. Azure Databricks автоматически создает необходимые таблицы в этой схеме, если они еще не существуют.

  1. В обозревателе каталогов перейдите к каталогу и схеме, которую вы хотите использовать (например, my_catalog.observability).

Шаг 3. Включение телеметрии конечной точки

Можно включить данные телеметрии при создании новой конечной точки или добавить ее в существующую.

Новая конечная точка

Чтобы включить данные телеметрии в пользовательском интерфейсе, выполните следующие действия.

  1. Перейдите к обслуживанию в левой боковой панели.
  2. Нажмите кнопку "Создать конечную точку обслуживания".
  3. В разделе телеметрии конечной точки (помеченная предварительная версия) разверните параметры конфигурации.
  4. Расположение каталога Unity: выберите целевой каталог и схему , подготовленную на шаге 2.
  5. (Необязательно) Префикс таблицы: введите префикс для созданных таблиц. Если осталось пустым, префикс отсутствует. Таблицы называются <prefix>_otel_logs, <prefix>_otel_spansи <prefix>_otel_metrics.
  6. Завершите оставшуюся часть конфигурации конечной точки (выбор модели, параметры вычислений) и нажмите кнопку "Создать".

Для этого с помощью API сделайте следующее:

Значок квадратных скобок. Включение телеметрии с помощью API
curl -X POST -H "Authorization: Bearer <your-token>" \
https://<workspace-url>/api/2.0/serving-endpoints \
-d '{
  "name": "my-custom-logging-endpoint",
  "config": {
    "served_entities": [
      {
        "name": "my-model",
        "entity_name": "my-model",
        "entity_version": "1",
        "workload_size": "Small",
        "scale_to_zero_enabled": true
      }
    ]
  },
  "telemetry_config": {
    "table_names": {
      "logs_table": "my_catalog.observability.custom_endpoint_logs",
      "metrics_table": "my_catalog.observability.custom_endpoint_metrics",
      "traces_table": "my_catalog.observability.custom_endpoint_spans"
    }
  }
}'

Существующая конечная точка

Замечание

Обновление активирует новое развертывание. Изменения вступили в силу после завершения развертывания.

Чтобы включить данные телеметрии в пользовательском интерфейсе, выполните следующие действия.

  1. На странице представления конечной точки на правой боковой панели в разделе телеметрии конечной точки нажмите кнопку "Добавить".
  2. Расположение каталога Unity: выберите целевой каталог и схему , подготовленную на шаге 2.
  3. (Необязательно) Префикс таблицы: введите префикс для созданных таблиц. Если осталось пустым, префикс отсутствует. Таблицы называются <prefix>_otel_logs, <prefix>_otel_spansи <prefix>_otel_metrics.
  4. Нажмите кнопку "Обновить".

Шаг 4. Проверка и запрос данных телеметрии

Когда конечная точка получает трафик, данные телеметрии передаются в настроенные таблицы каталога Unity.

  1. Перейдите в обозреватель каталога или редактор SQL.

  2. Найдите таблицу с именем <prefix>_otel_logs в настроенной схеме.

  3. Выполните запрос для проверки потока данных:

    SELECT * FROM <catalog>.<schema>.<prefix>_otel_logs
    LIMIT 10;
    

Запросы к данным телеметрии

В следующих примерах показаны распространенные запросы.

Чтобы просмотреть полную схему любой таблицы телеметрии, выполните следующую команду:

DESCRIBE TABLE <catalog>.<schema>.<prefix>_otel_logs;

Используйте эти столбцы для фильтрации и сопоставления данных телеметрии:

  • timestamp
  • severity_text
  • body
  • trace_id
  • span_id
  • attributes — карта, содержащая метаданные, относящиеся к событиям.

Проверка ошибок за последний час

SELECT
  timestamp,
  severity_text,
  body,
  attributes
FROM <catalog>.<schema>.<prefix>_otel_logs
WHERE
  severity_text = 'ERROR'
  AND timestamp > current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC;

Troubleshooting

Журналы не отображаются в таблице: корневой уровень ведения журнала по умолчанию настроен на WARNING, чтобы снизить нагрузку на систему. Чтобы записать логи с более низкой степенью серьезности, измените уровень в программном коде модели.

class MyModel(mlflow.pyfunc.PythonModel):
    def load_context(self, context):
        root = logging.getLogger()
        root.setLevel(logging.DEBUG)
        for handler in root.handlers:
            handler.setLevel(logging.DEBUG)

Ограничения

Следующие ограничения применяются к телеметрии конечных точек:

  • Эволюция схемы в целевой таблице не поддерживается.

  • Поддерживаются только управляемые таблицы Delta. Внешнее хранилище и хранилище Arclight по умолчанию не поддерживаются.

  • Расположение таблицы должно находиться в том же регионе, что и рабочая область.

  • Поддерживаются только имена таблиц с буквами ASCII, цифрами и символами подчеркивания.

  • Повторное создание целевой таблицы не поддерживается.

  • Поддерживается только стойкость данных в одной зоне доступности (single-az).

  • Доставка может быть выполнена хотя бы один раз. Подтверждение с сервера означает, что запись надежно сохранена и находится в таблице Delta.

  • Записи должны быть меньше 10 МБ.

  • Запросы должны составлять менее 30 МБ.

  • Строки журнала должны быть меньше 1 МБ.

  • Задержка телеметрии снижается за пределами 2500 QPS.

  • Журналы отображаются в таблице каталога Unity через несколько секунд после их выдачи.