Поделиться через


cloud_files_state табличное значение функции

Область применения:флажок Databricks SQL флажок Databricks Runtime 11.3 LTS и выше

Возвращает состояние автозагрузчика или read_files потока на уровне файла.

Синтаксис

cloud_files_state( { TABLE ( table_name ) | checkpoint } )

Аргументы

  • table_name: идентификатор потоковой таблицы, записываемой в read_files. Имя не должно содержать временную спецификацию. Доступно в Databricks Runtime 13.3 LTS и более поздних версиях.
  • checkpoint: литерал STRING . Каталог контрольной точки для потока с источником Auto Loader. См. статью об автозагрузчике.

Возвраты

Возвращает таблицу со следующей схемой:

  • path STRING NOT NULL PRIMARY KEY

    Путь к файлу.

  • size BIGINT NOT NULL

    Размер файла в байтах.

  • create_time TIMESTAMP NOT NULL

    Время создания файла.

  • discovery_time TIMESTAMP NOT NULL

    Применяется к:отмечено да Databricks SQL отмечено да Databricks Runtime 16.4 и более поздние версии

    Время обнаружения файла.

  • processed_time TIMESTAMP NOT NULL

    Применяется к:установлен флажок да Databricks SQL установлен флажок да Databricks Runtime 16.4 и выше, когда cloudFiles.cleanSource включен. См. параметры автозагрузчика.

    Время обработки файла. Если пакет встречает сбой и выполняется повторно, файл может обрабатываться несколько раз. При выполнении повторных попыток это поле содержит последнее время обработки.

  • commit_time TIMESTAMP

    Применяется к:установлен флажок да Databricks SQL установлен флажок да Databricks Runtime 16.4 и выше, когда cloudFiles.cleanSource включен. См. параметры автозагрузчика.

    Время фиксации файла в контрольной точке после обработки. NULL если файл еще не обработан. Не существует гарантированной задержки для маркировки файла как зафиксированного; Файл может обрабатываться, но помечен как зафиксированный произвольно позже. Пометка файла как зафиксированного означает, что автозагрузчик не требует повторной обработки файла.

  • archive_time TIMESTAMP

    Применяется к:установлен флажок да Databricks SQL установлен флажок да Databricks Runtime 16.4 и выше, когда cloudFiles.cleanSource включен. См. параметры автозагрузчика.

    Время архивирования файла. NULL если файл не был архивирован.

  • archive_mode STRING

    Применяется к:установлен флажок да Databricks SQL установлен флажок да Databricks Runtime 16.4 и выше, когда cloudFiles.cleanSource включен. См. параметры автозагрузчика.

    Если MOVE был установлен на cloudFiles.cleanSource, когда файл был архивирован, MOVE.

    Если DELETE был установлен на cloudFiles.cleanSource, когда файл был архивирован, DELETE.

    NULL Значение cloudFiles.cleanSource if было задано OFF (по умолчанию).

  • move_location STRING

    Применяется к:установлен флажок да Databricks SQL установлен флажок да Databricks Runtime 16.4 и выше, когда cloudFiles.cleanSource включен. См. параметры автозагрузчика.

    Полный путь, куда файл был перемещен во время архивной операции, когда cloudFiles.cleanSource было установлено на MOVE.

    NULL если файл не был архивирован или cloudFiles.cleanSource является одним из DELETE или OFF.

  • source_id STRING

    Идентификатор источника автозагрузчика в потоковом запросе. Это значение '0' для потоков, которые получают данные из одного расположения облачного хранилища объектов.

  • flow_name STRING

    Область применения:отмечено Databricks SQL отмечено Databricks Runtime 13.3 и выше

    Представляет определенный поток потоковой передачи в декларативных конвейерах Lakeflow, содержащих один или несколько источников облачных файлов. NULL если имя_таблицы не было задано.

  • ingestion_state STRING

    Применяется к:установлен флажок да Databricks SQL установлен флажок да Databricks Runtime 16.4 и выше, когда cloudFiles.cleanSource включен. См. параметры автозагрузчика.

    Указывает, был ли файл принят, что обозначается переходом в одно из следующих состояний:

    • NULL: файл еще не обработан или состояние файла невозможно определить автоматическим загрузчиком.
    • PROCESSING: обрабатывается файл.
    • SKIPPED_CORRUPTED: файл не был загружен, потому что он был поврежден.
    • SKIPPED_MISSING: файл не был приемлен, так как он не найден во время обработки.
    • INGESTED: файл обрабатывается приемником по крайней мере один раз. Он может обрабатываться повторно неидемпотентными приемниками, как foreachBatch в случае сбоев в потоке. Обработку завершили только файлы без значения NULL в поле commit_time, которые находятся в состоянии INGESTED.
    • NOT_RECOGNIZED_BY_DBR: зарезервировано для совместимости версий. Это состояние будет отображаться для состояний, представленных в более поздних версиях Databricks Runtime, которые не распознаются более ранними версиями Databricks Runtime.

Разрешения

Вам нужно:

  • OWNER привилегии в таблице потоковой передачи при использовании идентификатора потоковой таблицы.
  • READ FILESпривилегии на месте контрольной точки, если предоставляется контрольная точка во внешнем местоположении.

Примеры

-- Simple example from checkpoint
> SELECT path FROM CLOUD_FILES_STATE('/some/checkpoint');
  /some/input/path
  /other/input/path

-- Simple example from source subdir
> SELECT path FROM CLOUD_FILES_STATE('/some/checkpoint/sources/0');
  /some/input/path
  /other/input/path

-- Simple example from streaming table
> SELECT path FROM CLOUD_FILES_STATE(TABLE(my_streaming_table));
  /some/input/path
  /other/input/path

Ограничения

  • Табличное значение функции нельзя использовать в конвейере DLT для чтения состояния таблицы потоковой передачи , созданной другим конвейером.
  • Запрос не может ссылаться более чем на одну табличную функцию cloud_files_state на разных потоковых таблицах.

Чтобы обойти первое ограничение, можно создать задание Databricks и использовать табличную функцию в задаче записной книжки для извлечения и загрузки состояния потоковой таблицы на уровне файлов. Рассмотрим пример.

  1. На боковой панели рабочей области щелкните "Рабочие процессы".

  2. Щелкните задание.

  3. Создайте новую задачу конвейера DLT, которая обеспечивает создание потоковой таблицы. Рассмотрим пример.

    CREATE OR REFRESH STREAMING LIVE TABLE <table-name>
    AS SELECT <select clause expressions>
    FROM STREAM read_files('/path/to/files', format => '<format>', ...)
    
  4. Создайте задачу "Записная книжка ", которая считывает состояние потоковой таблицы на уровне файла с помощью cloud_files_state функции с табличным значением и загружает ее в таблицу. Рассмотрим пример.

    spark
      .sql("""SELECT * FROM cloud_files_state(TABLE(<table-name>)""")
      .write
      .saveAsTable('<table-file-state>')
    
  5. Настройте поле "Зависит от " задачи "Блокнот " и выберите задачу DLT-пайплайн .