Поделиться через


Руководство по Azure Data Lake Storage, Azure Databricks и Spark

В этом руководстве показано, как подключить кластер Azure Databricks к данным, хранящимся в учетной записи хранения Azure с включенным Azure Data Lake Storage. Благодаря этому подключению можно непосредственно выполнять запросы и аналитику на ваших данных из вашего кластера.

При работе с этим руководством вы сделаете следующее:

  • Загрузка неструктурированных данных в учетную запись хранения
  • Выполняйте аналитическую обработку данных в хранилище BLOB-объектов.

Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.

Предварительные условия

Создание рабочей области Azure Databricks и записной книжки

  1. Создайте рабочую область Azure Databricks. См. статью "Создание рабочей области Azure Databricks".

  2. Создайте записную книжку. См. статью "Создание записной книжки". Выберите Python в качестве языка записной книжки по умолчанию.

Оставьте записную книжку открытой. Вы используете это в следующих разделах.

Скачивание данных о рейсах

В этом руководстве используются данные о своевременности выполнения рейсов за январь 2016 года из Бюро статистики транспорта США, чтобы продемонстрировать, как осуществлять операцию ETL. Для завершения обучающего руководства нужно скачать эти данные.

  1. Скачайте файл On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip. Этот файл содержит данные о рейсах.

  2. Распакуйте содержимое ZIP-файла и запомните имя файла и путь к файлу. Они понадобятся для выполнения последующего шага.

Если вы хотите узнать информацию о данных об эффективности своевременной отчетности, вы можете просмотреть описания полей на веб-сайте Бюро статистики транспорта.

Прием данных

В этом разделе вы загружаете данные .csv полета в свою учетную запись Azure Data Lake Storage, а затем монтируете учетную запись хранения к своему кластеру Databricks. Наконец, вы используете Databricks для чтения данных о полетах из файла .csv и записи их обратно в хранилище в формате Apache Parquet.

Отправка данных о полете в учетную запись хранения

Используйте AzCopy, чтобы скопировать файл .csv в учетную запись Azure Data Lake Storage. Вы используете команду azcopy make для создания контейнера в вашей учетной записи хранения. Затем вы используете azcopy copy команду, чтобы скопировать данные CSV , которые вы только что скачали в каталог в этом контейнере.

На следующих этапах вам нужно будет ввести имена для создаваемого контейнера, а также имя каталога и идентификатор BLOB-объекта, куда вы хотите загрузить данные о полете в контейнер. Вы можете использовать предлагаемые имена на каждом шаге или указать собственные, соблюдая соглашения об именовании для контейнеров, каталогов и объектов BLOB.

  1. Откройте окно командной строки и введите следующую команду, чтобы войти в Azure Active Directory, чтобы получить доступ к учетной записи хранения.

    azcopy login
    

    Для аутентификации учетной записи пользователя следуйте инструкциям, появляющимся в окне командной строки.

  2. Чтобы создать контейнер в учетной записи хранения для хранения данных о полете, введите следующую команду:

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • Замените значение заполнителя <storage-account-name> именем вашей учетной записи хранения.

    • <container-name> Замените заполнитель именем контейнера, который вы хотите создать для хранения csv-данных, например flight-data-container.

  3. Чтобы отправить (копировать) csv-данные в учетную запись хранения, введите следующую команду.

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • Замените значение заполнителя <csv-folder-path> путем указания пути к файлу .csv.

    • Замените значение заполнителя <storage-account-name> именем вашей учетной записи хранения.

    • Замените <container-name> заполнитель именем контейнера в вашей учетной записи хранения.

    • Замените <directory-name> заполнитель именем каталога для хранения данных в контейнере, например jan2016.

Подключение учетной записи хранения к кластеру Databricks

В этом разделе описано, как подключить облачное хранилище объектов Azure Data Lake Storage к файловой системе Databricks (DBFS). Вы используете объект службы Azure AD, созданный ранее для аутентификации с учетной записью хранения данных. Дополнительные сведения см. в статье Подключение облачного хранилища объектов в Azure Databricks.

  1. Подключите записную книжку к кластеру.

    1. В созданной ранее записной книжке нажмите кнопку "Подключиться " в правом верхнем углу панели инструментов записной книжки. Эта кнопка открывает селектор вычислительных операций. (Если вы уже подключили записную книжку к кластеру, имя этого кластера отображается в тексте кнопки, а не Подключение).

    2. В раскрывающемся меню кластера выберите любой созданный ранее кластер.

    3. Обратите внимание, что текст в селекторе кластера изменяется на запуск. Дождитесь завершения запуска кластера и появления имени кластера в кнопке, прежде чем продолжить.

  2. Скопируйте и вставьте следующий блок кода в первую ячейку, но не запускайте этот код.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. В этом блоке кода:

    • В configs замените значения заполнителей <appId>, <clientSecret>, и <tenantId> идентификатором приложения, секретом клиента и идентификатором клиента, которые вы скопировали при создании субъекта-службы в предварительных требованиях.

    • В URI source замените значения-заполнители <storage-account-name>, <container-name> и <directory-name> на имя вашей учетной записи хранения Azure Data Lake Storage, а также на имя контейнера и директории, которые вы указали при загрузке данных о полете в эту учетную запись хранения.

      Примечание.

      Идентификатор схемы в URI abfss сообщает Databricks о необходимости использовать драйвер файловой системы Azure Blob с протоколом TLS (Transport Layer Security). Дополнительные сведения о URI см. в статье Использование URI в хранилище Azure Data Lake.

  4. Прежде чем продолжить работу, убедитесь, что кластер завершит работу.

  5. Нажмите клавиши SHIFT + ВВОД, чтобы запустить код в этом блоке.

Контейнер и каталог, где вы загрузили данные о полете в учетную запись хранения, теперь доступны в записной книжке через точку монтирования /mnt/flightdata.

Используйте Databricks Notebook для преобразования CSV в Parquet.

Теперь, когда данные о полетах в формате csv доступны через монтажную точку DBFS, вы можете использовать DataFrame Apache Spark для загрузки их в вашу рабочую область и записать обратно в объектное хранилище Azure Data Lake Storage в формате Apache Parquet.

  • Кадр данных Spark — это двухмерная структура данных со столбцами потенциально различных типов. Фрейм данных можно использовать для упрощения чтения и записи данных в различных поддерживаемых форматах. С помощью кадра данных можно загружать данные из облачного хранилища объектов и выполнять анализ и преобразования в кластере вычислений, не влияя на базовые данные в облачном хранилище объектов. Дополнительные сведения см. в статье «Работа с DataFrames PySpark в Azure Databricks».

  • Apache Parquet — это формат столбцов с оптимизацией, которая ускоряет запросы. Это более эффективный формат файла, чем CSV или JSON. Дополнительные сведения см. в разделе "Файлы Parquet".

В записной книжке добавьте новую ячейку и вставьте в нее следующий код.

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

Нажмите клавиши SHIFT + ВВОД, чтобы запустить код в этом блоке.

Прежде чем перейти к следующему разделу, убедитесь, что все данные в формате parquet были записаны, и "Готово" отображается в выходных данных.

Изучение данных

В этом разделе вы используете служебную программу файловой системы Databricks для изучения хранилища объектов Azure Data Lake Storage с помощью точки подключения DBFS, созданной в предыдущем разделе.

В новой ячейке вставьте следующий код, чтобы получить список файлов в точке подключения. Первая команда выводит список файлов и каталогов. Вторая команда отображает выходные данные в табличном формате для упрощения чтения.

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

Нажмите клавиши SHIFT + ВВОД, чтобы запустить код в этом блоке.

Обратите внимание, что каталог Parquet отображается в списке. Вы сохранили данные полетов в формате parquet в каталог parquet/flights в предыдущем разделе. Чтобы получить список файлов в каталоге parquet/flight, вставьте следующий код в новую ячейку и запустите его:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

Чтобы создать файл и перечислить его, вставьте следующий код в новую ячейку и запустите его:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

Так как файл 1.txt в этом руководстве не требуется, вы можете вставить следующий код в ячейку и запустить его для рекурсивного удаления mydirectory. Параметр True указывает рекурсивное удаление.

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

В качестве удобства можно использовать команду справки для получения подробных сведений о других командах.

dbutils.fs.help("rm")

В этих примерах кода вы изучили иерархический характер HDFS с помощью данных, хранящихся в учетной записи хранения с включенным Azure Data Lake Storage.

Запрос данных

Теперь вы можете выполнять запросы к данным, загруженным в вашу учетную запись хранения. Введите каждый из следующих блоков кода в новую ячейку и нажмите клавиши SHIFT+ ВВОД , чтобы запустить скрипт Python.

Фреймы данных предоставляют богатый набор функций (выборка столбцов, фильтрация, соединение, агрегация), которые позволяют эффективно решать распространенные задачи анализа данных.

Чтобы загрузить DataFrame из ранее сохраненных данных о перелётах в формате parquet и изучить некоторые поддерживаемые функции, введите этот сценарий в новую ячейку и запустите её.

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected columns for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

Введите этот скрипт в новую ячейку, чтобы выполнить некоторые базовые запросы анализа данных. Можно выбрать запуск всего скрипта (SHIFT+ВВОД), выделить каждый запрос и запустить его отдельно с помощью CTRL+SHIFT+ВВОД или ввести каждый запрос в отдельную ячейку и запустить его там.

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

Итоги

Изучив это руководство, вы:

  • Создали ресурсы Azure, включая учетную запись хранения Azure Data Lake Storage и служебный принципал Azure AD, и назначили разрешения для доступа к учетной записи хранения.

  • Создана рабочая область Azure Databricks и записная книжка.

  • Используется AzCopy для отправки неструктурированных .csv данных о полете в учетную запись хранения Azure Data Lake Storage.

  • Используйте утилиты файловой системы Databricks для монтирования вашей учетной записи хранения Azure Data Lake Storage и исследования её иерархической файловой системы.

  • С помощью Apache Spark DataFrames можно преобразовать данные .csv в формат Apache Parquet и сохранить их обратно в учетную запись хранения Azure Data Lake Storage.

  • Использовали DataFrames для изучения данных о полетах и выполнения простого запроса.

  • Используется Apache Spark SQL для запроса данных о рейсах для общего количества рейсов для каждой авиакомпании в январе 2016 года, аэропортов Техаса, авиакомпаний, которые летают из Техаса, средняя задержка прибытия в минутах для каждой авиакомпании на национальном уровне, а также процент рейсов каждой авиакомпании, которые задерживают вылеты или прибытия.

Очистка ресурсов

Если вы хотите сохранить записную книжку и вернуться к ней позже, рекомендуется завершить работу кластера, чтобы избежать дополнительных затрат. Чтобы завершить работу кластера, выберите его в селекторе вычислений, расположенном в правом верхнем углу панели инструментов записной книжки, выберите " Завершить " в меню и подтвердите выбор. (По умолчанию кластер будет автоматически завершаться через 120 минут бездействия.)

Если вы хотите удалить отдельные ресурсы рабочей области, такие как записные книжки и кластеры, можно сделать это с левой боковой панели рабочей области. Подробные инструкции см. в разделе "Удаление кластера " или "Удаление записной книжки".

Удалите группу ресурсов и все связанные с ней ресурсы, когда надобность в них отпадет. Чтобы сделать это в портал Azure, выберите группу ресурсов для учетной записи хранения и рабочей области и нажмите кнопку "Удалить".

Следующие шаги