Основы PySpark

В этой статье рассматриваются простые примеры использования PySpark. Предполагается, что вы понимаете основные понятия Apache Spark и выполняете команды в записной книжке Azure Databricks, подключенной к вычислениям. Вы создаете кадры данных с помощью примеров данных, выполняете основные преобразования, включая операции строк и столбцов с данными, объединяете несколько кадров данных и объединяете эти данные, визуализируете эти данные, а затем сохраняете их в таблице или файле.

Отправка данных

Некоторые примеры в этой статье используют образцы данных, предоставленные Databricks, для демонстрации загрузки, преобразования и сохранения данных с помощью DataFrames. Если вы хотите использовать собственные данные, которые еще не находятся в Databricks, вы можете сначала загрузить их и создать DataFrame из них. См. статьи "Создание или изменение таблицы с помощью загрузки файлов" и "Работа с файлами в томах каталога Unity".

Сведения о примерах данных Databricks

Databricks предоставляет примеры данных в каталоге samples и в каталоге /databricks-datasets .

  • Чтобы получить доступ к примеру данных в каталоге samples , используйте формат samples.<schema-name>.<table-name>. В этой статье используются таблицы схемы samples.tpch , содержащие данные из вымышленного бизнеса. Таблица customer содержит сведения о клиентах и orders содержит сведения о заказах, размещенных этими клиентами.
  • Используйте dbutils.fs.ls для изучения данных в /databricks-datasets. Используйте Spark SQL или DataFrames для выполнения запросов к данным в этом расположении через пути к файлам. Дополнительные сведения о примерах данных, предоставленных Databricks, см. в разделе "Примеры наборов данных".

Импорт типов данных

Многие операции PySpark требуют использования функций SQL или взаимодействия с собственными типами Spark. Либо напрямую импортируйте только необходимые функции и типы, либо чтобы избежать переопределения встроенных функций Python, импортируйте эти модули с помощью общего псевдонима.

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

# import modules using an alias
import pyspark.sql.types as T
import pyspark.sql.functions as F

Полный список типов данных см. в разделе "Типы данных PySpark".

Полный список функций PySpark SQL см. в разделе "Функции PySpark".

Создайте кадр данных

Существует несколько способов создать DataFrame. Обычно вы определяете кадр данных для источника данных, например таблицы или коллекции файлов. Затем, как описано в разделе основных понятий Apache Spark, используйте действие, напримерdisplay, для активации преобразований для выполнения. Метод display выводит DataFrame.

Создать DataFrame с указанными значениями

Чтобы создать DataFrame с указанными значениями, используйте метод createDataFrame, где строки выражены как список кортежей.

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

Обратите внимание на вывод, где типы данных столбцов df_children автоматически определяются. Кроме того, можно указать типы, добавив схему. Схемы определяются с помощью StructType, который состоит из StructFields, определяющих имя, тип данных и логический флаг, указывающий, содержит ли он значение NULL или нет. Необходимо импортировать типы данных из pyspark.sql.types.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

Создать DataFrame из таблицы в каталоге Unity

Чтобы создать кадр данных из таблицы в каталоге Unity, используйте table метод, определяющий таблицу с помощью формата <catalog-name>.<schema-name>.<table-name>. Щелкните каталог на левой панели навигации, чтобы использовать обозреватель каталогов для перехода к таблице. Щелкните по нему, затем выберите Копировать путь таблицы, чтобы вставить путь в записную книжку.

В следующем примере загружается таблица samples.tpch.customer, но можно также указать путь к собственной таблице.

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

Создание DataFrame из загруженного файла

Чтобы создать DataFrame из файла, загруженного в тома Unity Catalog, используйте свойство read. Этот метод возвращает значение DataFrameReader, которое можно использовать для чтения соответствующего формата. Щелкните параметр каталога на небольшой боковой панели слева и используйте браузер каталога для поиска файла. Выберите том, а затем нажмите Копировать путь к файлу тома.

В приведенном ниже *.csv примере осуществляется чтение из файла, но DataFrameReader поддерживает загрузку файлов в многих других форматах. См. методы DataFrameReader.

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

Дополнительные сведения о томах каталога Unity см. в разделе "Что такое тома каталога Unity?".

Создайте DataFrame из ответа JSON

Чтобы создать DataFrame из JSON-ответа, возвращаемого REST API, используйте пакет Python requests для выполнения запроса и анализа ответа. Необходимо импортировать пакет для его использования. В этом примере используются данные из базы данных заявок на лекарственные средства Управления по санитарному надзору за качеством пищевых продуктов и медикаментов США.

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

Сведения о работе с JSON и другими частично структурированными данными в Databricks см. в разделе "Модель полуструктурированных данных".

Выбор поля или объекта JSON

Чтобы выбрать определенное поле или объект из преобразованного JSON, используйте [] нотацию. Например, чтобы выбрать products поле, которое является массивом продуктов:

display(df_drugs.select(df_drugs["products"]))

Также можно связать вызовы методов для обхода нескольких полей. Например, чтобы вывести название бренда первого продукта в приложении лекарственного средства:

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

Создание DataFrame из файла

Чтобы продемонстрировать создание DataFrame из файла, в этом примере данные CSV загружаются из каталога /databricks-datasets.

Чтобы перейти к примерам наборов данных, можно использовать команды файловой системы Databricks Utilties . Следующий пример используется dbutils для перечисления наборов данных, доступных в /databricks-datasets:

display(dbutils.fs.ls('/databricks-datasets'))

Кроме того, можно использовать %fs для доступа к командам файловой системы Databricks CLI, как показано в следующем примере:

%fs ls '/databricks-datasets'

Чтобы создать кадр данных из файла или каталога файлов, укажите путь в методе load :

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

Преобразование данных с помощью DataFrames

DataFrame упрощает преобразование данных с помощью встроенных методов для сортировки, фильтрации и агрегации данных. Многие преобразования не указываются как методы в фреймах данных, но вместо этого они предоставляются в пакете pyspark.sql.functions. См. сведения о функциях SQL Databricks PySpark.

Операции столбцов

Spark предоставляет множество основных операций с колонками.

Tip

Чтобы вывести все имеющиеся столбцы в кадре данных, используйте columns, например df_customer.columns.

Выбор столбцов

Вы можете выбрать определенные столбцы с помощью select и col. Функция col находится в подмодуле pyspark.sql.functions .

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

Можно также ссылаться на столбец с помощью expr, который принимает выражение, определенное как строка.

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

Вы также можете использовать selectExpr, который принимает выражения SQL:

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

Чтобы выбрать столбцы с помощью строкового литерала, сделайте следующее:

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

Чтобы явно выбрать столбец из определенного кадра данных, можно использовать [] оператор или . оператор. (Оператор . не может использоваться для выбора столбцов, начиная с целого числа, или тех, которые содержат пробел или специальный символ.) Это может быть особенно полезно при присоединении к кадрам данных, где некоторые столбцы имеют то же имя.

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

Создание столбцов

Чтобы создать новый столбец, используйте withColumn метод. В следующем примере создается новый столбец, содержащий логическое значение, основанное на том, превышает ли баланс учетной записи клиента c_acctbal1000.

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

Переименование столбцов

Чтобы переименовать столбец, используйте withColumnRenamed метод, который принимает существующие и новые имена столбцов:

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

Этот alias метод особенно полезен, если вы хотите переименовать столбцы в рамках агрегаций.

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

Типы столбцов приведения

В некоторых случаях может потребоваться изменить тип данных для одного или нескольких столбцов в DataFrame. Для этого используйте cast метод для преобразования между типами данных столбцов. В следующем примере показано, как преобразовать столбец из целого числа в тип строки, используя col метод для ссылки на столбец:

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

Удаление столбцов

Чтобы удалить столбцы, можно не включать их при выборке или воспользоваться методом select(*) except или drop.

df_customer_flag_renamed.drop("balance_flag_renamed")

Вы также можете одновременно удалить несколько столбцов:

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

Строковые операции

Spark предоставляет множество основных операций строк:

Фильтрация строк

Чтобы отфильтровать строки, используйте filter метод или where метод в кадре данных, чтобы возвращать только определенные строки. Чтобы определить столбец для фильтрации, используйте col метод или выражение, которое вычисляет столбец.

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

Чтобы отфильтровать несколько условий, используйте логические операторы. Например, & и | дают возможность вам AND и OR условия соответственно. В следующем примере выполняется фильтрация строк, в которых c_nationkey значение равно 20 и c_acctbal больше 1000.

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

Удаление повторяющихся строк

Для удаления дубликатов строк используйте distinct, который возвращает уникальные строки.

df_unique = df_customer.distinct()

Обработка значений NULL

Чтобы обрабатывать значения NULL, удалите строки, содержащие значения NULL с помощью na.drop метода. Этот метод позволяет указать, нужно ли удалять строки, any содержащие значения NULL или all значения NULL.

Чтобы удалить значения NULL, используйте любой из следующих примеров.

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

Если вместо этого требуется отфильтровать только строки, содержащие все значения NULL, используйте следующее:

df_customer_no_nulls = df_customer.na.drop("all")

Это можно применить для подмножества столбцов, указав это, как показано ниже:

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

Чтобы заполнить отсутствующие значения, используйте fill метод. Вы можете применить это ко всем столбцам или подмножества столбцов. В приведенном ниже примере балансы учетных записей с пустым значением для баланса c_acctbal учетной записи заполняются 0.

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

Чтобы заменить строки другими значениями, используйте метод replace. В приведенном ниже примере все пустые строки адресов заменяются словом UNKNOWN:

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

Добавление строк

Чтобы добавить строки, используйте метод union для создания нового DataFrame. В следующем примере кадр данных, созданный ранее и df_that_one_customer объединенный, возвращает кадр данных df_filtered_customer с тремя клиентами:

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

Note

Вы также можете объединить DataFrames, записав их в таблицу, а затем добавив новые строки. Для рабочих нагрузок добавочная обработка источников данных в целевую таблицу может значительно сократить задержку и затраты на вычисления по мере увеличения размера данных. См. статью "Стандартные соединители" в Lakeflow Connect.

Сортировка строк

Important

Сортировка может быть дорогостоящей на большом масштабе, и если вы храните отсортированные данные и при перезагрузке данных с помощью Spark, порядок не гарантируется. Убедитесь, что вы намеренно используете сортировку.

Чтобы отсортировать строки по одному или нескольким столбцам, используйте sort или orderBy метод. По умолчанию эти методы сортируются по возрастанию:

df_customer.orderBy(col("c_acctbal"))

Чтобы отфильтровать по убыванию, используйте desc:

df_customer.sort(col("c_custkey").desc())

В следующем примере показано, как сортировать по двум столбцам:

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

Чтобы ограничить количество строк, возвращаемых после сортировки DataFrame, используйте метод limit. В следующем примере отображаются только первые 10 результаты:

display(df_sorted.limit(10))

Объединение DataFrame

Чтобы объединить два или более фреймов данных, используйте join. Вы можете указать, как нужно объединить фреймы данных в параметрах how (тип соединения) и on (на каких столбцах основывать соединение). Распространенные типы соединения включают:

  • inner: это тип соединения по умолчанию, который возвращает кадр данных, который сохраняет только строки, в которых есть совпадение для on параметра в кадрах данных.
  • left: это сохраняет все строки первого указанного кадра данных и только строки из второго указанного кадра данных, которые имеют совпадение с первым.
  • outer: Внешнее соединение сохраняет все строки из обоих DataFrame вне зависимости от соответствия.

Подробные сведения о соединениях см. в статье "Работа с соединениями в Azure Databricks". Список соединений, поддерживаемых в PySpark, см. в разделе "Соединения с кадрами данных".

В следующем примере возвращается один DataFrame, в котором каждая строка orders DataFrame присоединена к соответствующей строке из customers DataFrame. Используется внутреннее соединение, так как ожидается, что каждый заказ соответствует ровно одному клиенту.

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

Чтобы присоединиться к нескольким условиям, используйте логические операторы, такие как & и | для указания AND и ORсоответственно. В следующем примере добавляется дополнительное условие, фильтрация только к строкам, имеющим o_totalprice больше 500,000:

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

Статистические данные

Чтобы агрегировать данные в DataFrame, аналогичном GROUP BY в SQL, используйте метод groupBy, чтобы указать столбцы для группировки и метод agg, чтобы указать агрегации. Импорт общих агрегатов, включая avg, sum, max, и min из pyspark.sql.functions. В следующем примере показан средний баланс клиентов по сегменту рынка:

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

Некоторые агрегации являются действиями, что означает, что они запускают вычисления. В этом случае вам не нужно использовать другие действия для вывода результатов.

Чтобы посчитать количество строк в DataFrame, воспользуйтесь методом count.

df_customer.count()

Цепочка вызовов

Методы, которые преобразуют DataFrames, возвращают DataFrames, и Spark не выполняет преобразования, пока не будут вызваны действия. Эта отложенная оценка означает, что можно объединять несколько методов для удобства и читаемости. В следующем примере показано, как выполнять фильтрацию, агрегирование и упорядочение:

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

Визуализируйте ваш DataFrame

Чтобы визуализировать кадр данных в записной книжке, щелкните + знак рядом с таблицей слева от кадра данных, а затем выберите визуализацию , чтобы добавить одну или несколько диаграмм на основе кадра данных. Дополнительные сведения о визуализациях см. в разделе "Визуализации" в записных книжках Databricks и редакторе SQL.

display(df_order)

Для выполнения дополнительных визуализаций Databricks рекомендует использовать API pandas для Spark. Элемент .pandas_api() позволяет привести к соответствующему API pandas для кадра данных Spark. Дополнительные сведения см. в разделе API Pandas в Spark.

Сохранение данных

После преобразования данных его можно сохранить с помощью DataFrameWriter методов. Полный список этих методов можно найти в DataFrameWriter. ** В следующих разделах показано, как сохранить DataFrame в качестве таблицы и в виде набора файлов данных.

Сохраните кадр данных как таблицу

Чтобы сохранить кадр данных в качестве таблицы в каталоге Unity, используйте write.saveAsTable метод и укажите путь в формате <catalog-name>.<schema-name>.<table-name>.

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

Запишите DataFrame как CSV

Чтобы записать DataFrame в формат *.csv, используйте метод write.csv, указав формат и параметры. По умолчанию, если данные существуют по указанному пути, операция записи завершается ошибкой. Чтобы выполнить другое действие, можно указать один из следующих режимов:

  • overwrite перезаписывает все существующие данные в целевом пути данными из DataFrame.
  • append добавляет содержимое DataFrame к данным в целевом пути.
  • ignore Автоматически завершается сбоем записи, если данные существуют в целевом пути.

В следующем примере показано, как перезаписать данные, используя содержимое DataFrame в виде CSV-файлов.

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

Дальнейшие шаги

Чтобы использовать больше возможностей Spark на Databricks, см. подробнее: