Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Структурированная потоковая передача — это масштабируемый отказоустойчивый обработчик обработки потоков, построенный на Spark. Он обрабатывает поток динамических данных как таблицу, к которым постоянно добавляются новые строки. Структурированная потоковая передача поддерживает встроенные источники файлов, такие как CSV, JSON, ORC и Parquet, а также службы обмена сообщениями, такие как Kafka и Центры событий Azure.
В этой статье описывается настройка источника потоковой передачи, такого как Центры событий Azure, прием потоковой передачи данных в таблицу Delta lakehouse, оптимизация производительности записи с помощью секционирования и пакетирования событий и надежное выполнение заданий потоковой передачи в рабочей среде.
Настройка источника потоковой передачи
Чтобы передавать данные в lakehouse, сначала настройте подключение к источнику потоковой передачи. Центры событий Azure — это распространенный выбор. Используйте соединитель Центров событий Azure для Apache Spark для подключения приложения Spark к Центрам событий Azure.
Для базовой конфигурации Центров событий требуется имя пространства имен Центров событий, имя концентратора, имя общего ключа доступа и группа потребителей.
Группа потребителей — это представление всего концентратора событий. Группы потребителей позволяют нескольким потребляющим приложениям иметь отдельное представление потока событий и читать поток независимо по собственному темпу и с собственными смещениями.
Секции в Центрах событий позволяют параллельно обрабатывать большие объемы событий. Один процессор имеет ограниченную емкость для обработки событий в секунду, а несколько процессоров могут работать параллельно между секциями.
Если используется слишком много разделов с низкой скоростью приема данных, читатели разделов обрабатывают небольшую часть данных, что приводит к неоптимальной нагрузке. Идеальное количество секций зависит от требуемой скорости обработки. При увеличении количества единиц пропускной способности в пространстве имен может потребоваться дополнительные секции, чтобы одновременные читатели могли достичь максимальной пропускной способности.
Проверьте оптимальное количество разделов для сценария пропускной способности. Сценарии с высокой пропускной способностью обычно используют 32 или более секций.
Разностная таблица в качестве приемника потоковой передачи
Delta Lake — это слой хранения с открытым исходным кодом, который предоставляет транзакции ACID (атомарность, согласованность, изоляция и устойчивость) на вершине хранилища озера данных. В службе "Проектирование данных Fabric" Delta Lake поддерживает добавление и обновление, сжатие данных, временные переходы, эволюцию схемы и хранилище с открытым форматом.
При использовании delta в качестве формата вывода в writeStream, потоковые данные направляются непосредственно в таблицу Delta. В следующем примере данные читаются из Центров событий, анализируется тело сообщения и записываются в таблицу Delta.
import pyspark.sql.functions as f
from pyspark.sql.types import *
df = (
spark.readStream
.format("eventhubs")
.options(**ehConf)
.load()
)
Schema = StructType([
StructField("<column_name_01>", StringType(), False),
StructField("<column_name_02>", StringType(), False),
StructField("<column_name_03>", DoubleType(), True),
StructField("<column_name_04>", LongType(), True),
StructField("<column_name_05>", LongType(), True),
])
rawData = (
df
.withColumn("bodyAsString", f.col("body").cast("string"))
.select(f.from_json("bodyAsString", Schema).alias("events"))
.select("events.*")
.writeStream
.format("delta")
.option("checkpointLocation", "Files/checkpoint")
.outputMode("append")
.toTable("deltaeventstable")
)
В коде format("delta") задает delta в качестве выходного формата, outputMode("append") записывает только новые строки в таблицу и toTable("deltaeventstable") сохраняет потоковые данные в управляемую таблицу Delta.
Оптимизация производительности потоковой передачи
После работы базового приема потоковой передачи можно улучшить пропускную способность и организацию файлов с помощью методов оптимизации в следующих разделах.
Данные секционирования для записи
Чтобы оптимизировать пропускную способность, эффективно секционируйте данные. Секционирование повышает пропускную способность записи и производительность нижестоящего запроса. Можно разделять данные в памяти, на диске или на обоих.
На диске — используйте partitionBy() для упорядочивания данных в подкаталогах на основе значений столбцов. Выберите столбцы с хорошей кардинальностью, которые приводят к оптимальному размеру файлов. Избегайте столбцов, создающих слишком много очень маленьких разделов или очень немного больших.
В памяти — перед записью используйте repartition() или coalesce() распределяйте данные между рабочими узлами:
-
repartition()увеличивает или уменьшает разделы с полной перетасовкой, равномерно распределяя данные. -
coalesce()уменьшает только секции, минимизируя перемещение данных.
Объединение обоих подходов хорошо подходит для сценариев высокой пропускной способности. В следующем примере данные разделены на 48 секций в памяти (соответствующие доступные ядра ЦП), а затем секции на диске по двум столбцам:
rawData = (
df
.withColumn("bodyAsString", f.col("body").cast("string"))
.select(f.from_json("bodyAsString", Schema).alias("events"))
.select("events.*")
.repartition(48)
.writeStream
.format("delta")
.option("checkpointLocation", "Files/checkpoint")
.outputMode("append")
.partitionBy("<column_name_01>", "<column_name_02>")
.toTable("deltaeventstable")
)
Используйте оптимизированную запись
В качестве альтернативы ручному разбиению оптимизированная запись объединяет или разделяет разделы перед записью, максимизируя пропускную способность диска без вызовов repartition() или coalesce(). Включите его с помощью конфигурации Spark:
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", True)
С включенной оптимизированной записью можно удалить repartition() или coalesce() из кода и разрешить Spark обрабатывать размер секций. Вы по-прежнему можете использовать partitionBy() для организации уровня диска.
Пакетные события с триггерами
Для дальнейшей оптимизации производительности записи события следует собирать в пакеты перед записью на диск. По умолчанию Spark обрабатывает каждый микробатч сразу после завершения предыдущего. Настройка интервала срабатывания триггера накапливает данные в течение заданного времени и затем записывает их в меньшем числе более объемных операций. Большие пакеты создают Delta-файлы большего размера и сокращают издержки на небольшие файлы.
В следующем примере события обрабатываются в течение одной минуты:
rawData = (
df
.withColumn("bodyAsString", f.col("body").cast("string"))
.select(f.from_json("bodyAsString", Schema).alias("events"))
.select("events.*")
.writeStream
.format("delta")
.option("checkpointLocation", "Files/checkpoint")
.outputMode("append")
.partitionBy("<column_name_01>", "<column_name_02>")
.trigger(processingTime="1 minute")
.toTable("deltaeventstable")
)
Проанализируйте объем входящих данных и выберите интервал обработки, который создает файлы Parquet подходящего размера в таблице Delta.
Выполнение заданий потоковой передачи в рабочей среде
Записные книжки Spark — это эффективное средство для разработки и тестирования логики потоковой передачи. Однако для рабочих нагрузок, которые должны выполняться непрерывно, используйте вместо этого определения заданий Spark. Определения заданий Spark — это неинтерактивные, ориентированные на код задачи, которые выполняются в кластере Spark и обеспечивают большую надежность и доступность.
Инфраструктура, выполняющая задание потоковой передачи, может столкнуться с проблемами, которые останавливают задание, например сбои оборудования или исправление инфраструктуры. Политика повторных попыток автоматически перезапускает задание при неожиданной остановке. Настройте политику повторных попыток в определении задания Spark, чтобы указать, сколько раз перезапустить задание (до бесконечных повторных попыток) и интервал времени между повторными попытками. Если включена политика повторных запусков, задание потоковой передачи продолжает выполняться до тех пор, пока вы его явно не остановите.
Концентратор мониторинга Fabric включает вкладку структурированной потоковой передачи с метриками, включая скорость ввода, скорость обработки, входные строки, длительность пакетной службы и длительность операции.
Связанный контент
- Получите потоковые данные в хранилище данных и доступ к ним через конечную точку аналитики SQL
- Получение потоковых данных в Lakehouse с помощью событий