Поделиться через


Файл Avro

Apache Avro — это система сериализации данных. Avro предоставляет следующие возможности:

  • Обширные структуры данных.
  • Компактный и быстрый двоичный формат данных.
  • Файл контейнера для хранения постоянных данных.
  • Удаленный вызов процедуры (RPC).
  • Простая интеграция с динамическими языками. Создание кода не требуется для чтения или записи файлов данных, а также для использования или реализации протоколов RPC. Создание кода в качестве дополнительной оптимизации, имеет смысл реализовать только для языков со статической типизацией.

Источник данных Avro поддерживает:

  • Преобразование схемы: автоматическое преобразование между Apache Spark SQL и записями Avro.
  • Секционирование: простое чтение и запись секционированных данных без дополнительной настройки.
  • Сжатие: сжатие, используемое при записи Avro на диск. Поддерживаемые типы данных:uncompressed, snappy и deflate. Можно также указать уровень сжатия.
  • Имена записей: имя записи и пространство имен путем передачи схемы параметров с recordName и recordNamespace.

См. также как читать и записывать потоковые данные Avro.

Настройка

Поведение источника данных Avro можно изменить с помощью различных параметров конфигурации.

Чтобы игнорировать файлы без расширения .avro при чтении, можно задать параметр avro.mapred.ignore.inputs.without.extension в конфигурации Hadoop. Значение по умолчанию — false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Чтобы настроить сжатие при записи, задайте следующие свойства Spark:

  • Кодек сжатия: spark.sql.avro.compression.codec. Поддерживаемые кодеки: snappy и deflate. Кодек по умолчанию — snappy.
  • Если используется кодек сжатия deflate, можно задать уровень сжатия следующим образом: spark.sql.avro.deflate.level. По умолчанию используется уровень -1.

Эти свойства можно задать в конфигурации Spark кластера или во время выполнения с помощью spark.conf.set(). Например:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Для Databricks Runtime 9.1 LTS и более поздних версий можно изменить поведение вывода схемы по умолчанию в Avro, указав параметр mergeSchema при чтении файлов. Если для mergeSchema задано значение true, то схема будет выводиться из набора файлов Avro в целевом каталоге и объединяться, вместо того чтобы выводиться из одного файла.

Поддерживаемые типы для Avro: > преобразование Spark SQL

Эта библиотека поддерживает чтение всех типов Avro. В ней используется следующее сопоставление типов Avro с типами Spark SQL:

Тип Avro Тип Spark SQL
булево BooleanType
INT Целочисленный тип
длинный ДлинныйТип
плавающий Тип с плавающей запятой
двойной ДвойнойТип
байт БинарныйТип
строка Тип строки
запись ТипСтруктуры
перечисление Тип строки
массив Тип массива
карта Тип карты
исправлено БинарныйТип
союз См Типы объединения.

Типы объединения

Источник данных Avro поддерживает чтение типов union. Avro рассматривает следующие три типа как типы union:

  • union(int, long) соответствует LongType.
  • union(float, double) соответствует DoubleType.
  • union(something, null), где something — любой поддерживаемый тип Avro. Он сопоставляется с тем же типом Spark SQL, что и something, при этом параметру nullable присваивается значение true.

Все остальные типы union являются сложными типами. Они сопоставляются с StructType, где именами полей являются member0, member1 и т. д. в соответствии с членами union. Это согласуется с поведением при преобразовании форматов Avro и Parquet.

Логические типы

Источник данных Avro поддерживает чтение следующих логических типов Avro:

Логический тип Avro Тип Avro Тип Spark SQL
Дата INT ТипДата
timestamp-millis (временная метка в миллисекундах) длинный Тип временной метки
микросекундные метки времени длинный Тип временной метки
десятичный исправлено ДесятичныйТип
десятичный байт ДесятичныйТип

Примечание.

Источник данных Avro игнорирует документы, псевдонимы и другие свойства, имеющиеся в файле Avro.

Поддерживаемые типы для преобразования Avro - Spark SQL >

Эта библиотека поддерживает запись всех типов Spark SQL в Avro. Для большинства типов сопоставление типов Spark с типами Avro происходит просто (например, IntegerType преобразуется в int). Ниже приведен список некоторых особых случаев.

Тип Spark SQL Тип Avro Логический тип Avro
ByteType INT
ShortType INT
БинарныйТип байт
ДесятичныйТип исправлено десятичный
Тип временной метки длинный метка времени-микросекунд
ТипДата INT Дата

Можно также указать всю выходную схему Avro с помощью параметра avroSchema, чтобы типы Spark SQL можно было преобразовать в другие типы Avro. Следующие преобразования не применяются по умолчанию, и для них требуется определенная пользователем схема Avro:

Тип Spark SQL Тип Avro Логический тип Avro
ByteType исправлено
Тип строки перечисление
ДесятичныйТип байт десятичный
Тип временной метки длинный timestamp-millis (временная метка в миллисекундах)

Примеры

В этих примерах используется файл episodes.avro.

язык программирования Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

В этом примере показана пользовательская схема Avro:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

В этом примере показаны параметры сжатия Avro:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

В этом примере показаны секционированные записи Avro:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

В этом примере показаны имя записи и пространство имен:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Питон

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Чтобы запросить данные Avro в SQL, зарегистрируйте файл данных как таблицу или временное представление:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Пример записной книжки: чтение и запись файлов Avro

В следующей записной книжке показано, как читать и записывать файлы Avro.

Чтение и запись файлов Avro в ноутбуке

Получить ноутбук