Поделиться через


Анализ сложных типов данных в Azure Synapse Analytics

This article is relevant for Parquet files and containers in Azure Synapse Link for Azure Cosmos DB. Spark или SQL можно использовать для чтения или преобразования данных со сложными схемами, такими как массивы или вложенные структуры. Результатом следующего примера является один документ, который можно легко масштабировать в миллиарды документов с помощью Spark или SQL. Код, прилагаемый к этой статье, использует PySpark (Python).

Вариант использования

Сложные типы данных встречаются все чаще и представляют трудности для специалистов по обработке данных. При анализе вложенной схемы и массивов обычно используются длительно выполняющиеся и сложные SQL-запросы. Кроме того, переименовать или перевести тип данных вложенных столбцов может оказаться очень сложно. При работе с глубоко вложенными объектами также могут возникнуть проблемы с производительностью.

Специалистам по обработке данных необходимо понять, как организовать эффективную обработку сложных типов данных и сделать их доступными для всех. В следующем примере для чтения и преобразования объектов в плоскую структуру через кадры данных используется Spark в Azure Synapse Analytics. Используйте бессерверную модель SQL в Azure Synapse Analytics, чтобы напрямую запрашивать такие объекты и возвращать эти результаты в виде обычной таблицы.

Что собой представляют массивы и вложенные структуры?

Следующий объект взят из Application Insights. В этом объекте есть вложенные структуры и массивы, содержащие вложенные структуры.

{
    "id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
    "context": {
        "data": {
            "eventTime": "2020-06-10T13:43:34.553Z",
            "samplingRate": "100.0",
            "isSynthetic": "false"
        },
        "session": {
            "isFirst": "false",
            "id": "38619c14-7a23-4687-8268-95862c5326b1"
        },
        "custom": {
            "dimensions": [
                {
                    "customerInfo": {
                        "ProfileType": "ExpertUser",
                        "RoomName": "",
                        "CustomerName": "diamond",
                        "UserName": "[email protected]"
                    }
                },
                {
                    "customerInfo": {
                        "ProfileType": "Novice",
                        "RoomName": "",
                        "CustomerName": "topaz",
                        "UserName": "[email protected]"
                    }
                }
            ]
        }
    }
}

Пример схемы массивов и вложенных структур

При печати схемы кадра данных объекта (называется df) с помощью команды df.printschema вы увидите следующее представление:

  • Желтый цвет обозначает вложенные структуры.
  • Зеленый цвет обозначает массив с двумя элементами.

Код с выделениями желтого и зеленого цвета, показывающий источник схемы

_rid, _ts и _etag добавлены в систему во время приема документа в хранилище транзакций Azure Cosmos DB.

Приведенный кадр данных подсчитывает только 5 столбцов и одну строку. После преобразования кадр курированных данных будет содержать 13 столбцов и 2 строки в табличном формате.

Flatten nested structures and explode arrays

С помощью Spark в Azure Synapse Analytics можно с легкостью преобразовывать вложенные структуры в столбцы, а элементы массива — в несколько строк. Для реализации выполните следующие шаги.

Блок-схема с шагами для преобразований с помощью Spark

Определение функции для преобразования вложенной схемы в плоскую структуру

Эту функцию можно использовать без изменения. Создайте ячейку в записной книжке PySpark, используя следующую функцию:

from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

Use the function to flatten the nested schema

In this step, you flatten the nested schema of the data frame (df) into a new data frame (df_flat):

from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))

Функция отображения должна вернуть 10 столбцов и 1 строку. The array and its nested elements are still there.

Преобразование массива

В этом случае вы преобразуете массив context_custom_dimensions в кадре данных df_flat в новый кадр данных df_flat_explode. В следующем коде вы также определите, какой столбец следует выбрать:

from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))

Функция отображения должна вернуть 10 столбцов и 2 строки. Следующим шагом будет преобразование в плоскую структуру вложенных схем с помощью функции, определенной на шаге 1.

Use the function to flatten the nested schema

Finally, you use the function to flatten the nested schema of the data frame df_flat_explode, into a new data frame, df_flat_explode_flat:

df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))

Функция отображения должна показать 13 столбцов и 2 строки.

Функция printSchema кадра данных df_flat_explode_flat возвращает следующий результат:

Код, демонстрирующий окончательную схему

Read arrays and nested structures directly

С помощью бессерверной модели SQL можно запрашивать и создавать представления и таблицы на таких объектах.

Во первых, в зависимости от способа хранения данных пользователи должны использовать следующую таксономию. Все, что указано прописными буквами, относится к вашему варианту использования:

Bulk Формат
'https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet' 'Parquet' (ADLSg2)
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/;account=ACCOUNTNAME;database=DATABASENAME;collection=COLLECTIONNAME;region=REGIONTOQUERY', SECRET='YOURSECRET' CosmosDB (Azure Synapse Link)

Замените каждое поле следующим образом:

  • 'YOUR BULK ABOVE' is the connection string of the data source you connect to.
  • 'YOUR TYPE ABOVE' is the format you use to connect to the source.
select *
FROM
openrowset(
    BULK 'YOUR BULK ABOVE',
    FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
        contextdataeventTime varchar(50) '$.context.data.eventTime',
        contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
        contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
        contextsessionisFirst varchar(50) '$.context.session.isFirst',
        contextsessionid varchar(50) '$.context.session.id',
        contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q 
cross apply openjson (contextcustomdimensions) 
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
            RoomName varchar(50) '$.customerInfo.RoomName',
            CustomerName varchar(50) '$.customerInfo.CustomerName',
            UserName varchar(50) '$.customerInfo.UserName'
    )

Существует два разных типа операций:

  • Первый тип операции указывается в следующей строке кода, определяющей столбец с именем contextdataeventTime, который ссылается на вложенный элемент Context.Data.eventTime.

    contextdataeventTime varchar(50) '$.context.data.eventTime'
    

    Эта строка определяет столбец с именем contextdataeventTime, который ссылается на вложенный элемент Context>Data>eventTime.

  • Второй тип операции использует cross apply, чтобы создать строки для каждого элемента в массиве. Затем он определяет каждый вложенный объект.

    cross apply openjson (contextcustomdimensions) 
    with ( ProfileType varchar(50) '$.customerInfo.ProfileType', 
    

    Если массив содержит 5 элементов с 4 вложенными структурами, то бессерверная модель SQL возвращает 5 строк и 4 столбца. Бессерверная модель SQL может выполнять запросы на месте, сопоставлять массив в двух строках и отображать все вложенные структуры в столбцах.

Следующие шаги