Сравнение Spark Connect с классической версией Spark

Spark Connect — это протокол на основе gRPC в Apache Spark, который указывает, как клиентское приложение может взаимодействовать с удаленным сервером Spark. Это позволяет удаленно выполнять задачи Spark с помощью API DataFrame.

Spark Connect используется в следующих случаях:

  • Записные книжки Scala с Databricks Runtime версии 13.3 и более поздних версий на стандартных вычислительных узлах
  • Ноутбуки Python с Databricks Runtime версии 14.3 и выше на стандартных вычислительных ресурсах
  • Бессерверные вычисления
  • Сервис Databricks Connect

Хотя и Spark Connect и Spark Classic используют отложенное выполнение для преобразований, существуют важные различия, чтобы избежать непредвиденных проблем с поведением и производительностью при переносе существующего кода из Spark Classic в Spark Connect или при написании кода, который должен работать с обоими.

Ленивый и жадный

Ключевое различие между Spark Connect и Spark Classic заключается в том, что Spark Connect откладывает анализ и разрешение имен на время выполнения, как описано в следующей таблице.

Аспект Классическая версия Spark Spark Connect
Выполнение запросов Ленивый Ленивый
Анализ схемы Стремящийся Ленивый
Доступ к схеме Local Активирует RPC и кэширует схему при первом доступе
Временные представления Встроенный план Поиск имен
Сериализация UDF При создании При выполнении

Выполнение запросов

И Spark Classic, и Spark Connect следуют одной и той же ленивой модели выполнения для выполнения запросов.

В классической версии Spark преобразования DataFrame (например, filter и limit) являются ленивыми. Это означает, что они не выполняются немедленно, но кодируются в логическом плане. Фактические вычисления активируются только с действием (напримерshow(), ). collect()

Spark Connect следует аналогичной модели отложенной оценки. Преобразования создаются на стороне клиента и передаются на сервер в виде неразрешенных планов. Затем сервер выполняет необходимый анализ и выполнение при вызове действия.

Аспект Классическая версия Spark Spark Connect
Преобразования: df.filter(...), df.select(...)df.limit(...) Отложенное выполнение Отложенное выполнение
Sql-запросы: spark.sql("select …") Отложенное выполнение Отложенное выполнение
Действия: df.collect(), df.show() Нетерпеливое выполнение Нетерпеливое выполнение
Команды SQL: spark.sql("insert …"), spark.sql("create …") Нетерпеливое выполнение Нетерпеливое выполнение

Анализ схемы

Классический Spark выполняет анализ незамедлительно во время построения логического плана. Этот этап анализа преобразует неразрешенный план в полностью разрешенный логический план и проверяет, может ли операция выполняться Spark. Одним из ключевых преимуществ выполнения этой работы с энтузиазмом является то, что пользователи сразу получают обратную связь, когда совершают ошибку. Например, выполнение spark.sql("select 1 as a, 2 as b").filter("c > 1") сразу приведет к ошибке, указывая, что столбец c не найден.

Spark Connect отличается от классической, так как клиент создает неразрешенные планы во время преобразования и откладывает их анализ. Любая операция, требующая разрешенного плана( например, доступа к схеме, объяснения плана, сохранения кадра данных или выполнения действия), приводит к отправке клиентом неразрешенных планов на сервер через RPC. Затем сервер выполняет полный анализ, чтобы получить определённый логический план и выполнить операцию. Например, spark.sql("select 1 as a, 2 as b").filter("c > 1") не вызовет никаких ошибок, так как неразрешенный план находится только на стороне клиента, но с df.columns или df.show() будет вызвана ошибка, так как неразрешенный план отправляется на сервер для анализа.

В отличие от выполнения запросов, классический и Spark Connect отличаются по времени анализа схем.

Аспект Классическая версия Spark Spark Connect
Преобразования: df.filter(...), df.select(...)df.limit(...) Стремящийся Ленивый
Доступ к схеме: df.columns, df.schema, df.isStreaming Стремящийся Стремящийся
Отправляет запрос RPC для аналитического анализа, в отличие от классического Spark.
Действия: df.collect(), df.show() Стремящийся Стремящийся
Зависимое состояние сеансов DataFrames: UDFs, временные представления, конфиги Стремящийся Ленивый
Оценивается во время выполнения плана DataFrame
Состояние зависимого сеанса временных представлений: UDFs, другие временные представления, конфигурации Стремящийся Стремящийся
Анализ активируется с нетерпением при создании временного представления

Лучшие практики

Разница между отложенным и жадным анализом означает, что существуют некоторые рекомендации, чтобы избежать непредвиденных проблем с поведением и производительностью, в частности тех, которые вызваны перезаписью временных имен представлений, захватом внешних переменных в определяемых пользователем функциях, отложенным обнаружением ошибок и чрезмерным доступом к схеме на новых DataFrame.

Создайте уникальные временные имена представлений

В Spark Connect DataFrame хранит только ссылку на временное представление по имени. В результате, если временное представление будет заменено позже, данные в DataFrame также изменятся, так как во время выполнения он ссылается на представление по имени.

Это поведение отличается от Spark Classic, где логический план временного представления интегрируется в план кадра данных на этапе создания. Любая последующая замена временного представления не влияет на исходный кадр данных.

Чтобы устранить разницу, всегда создавайте уникальные имена временных представлений. Например, включите идентификатор UUID в имя представления. Это позволяет избежать воздействия на существующие DataFrames, которые ссылаются на ранее зарегистрированное временное представление.

Питон

import uuid
def create_temp_view_and_create_dataframe(x):
  temp_view_name = f"`temp_view_{uuid.uuid4()}`"  # Use a random name to avoid conflicts.
  spark.range(x).createOrReplaceTempView(temp_view_name)
  return spark.table(temp_view_name)

df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10

df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10  # It works as expected now.
assert len(df100.collect()) == 100

Scala

import java.util.UUID

def createTempViewAndDataFrame(x: Int) = {
  val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
  spark.range(x).createOrReplaceTempView(tempViewName)
  spark.table(tempViewName)
}

val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)

val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)

Обертка определений UDF

Как правило, считается плохой практикой, чтобы функции, определяемые пользователем, зависели от изменяемых внешних переменных, так как это вводит неявные зависимости, может приводить к недетерминированному поведению и уменьшать композируемость. Тем не менее, если у вас есть такой шаблон, помните о следующем подвохе:

В Spark Connect определяемые пользователем функции Python являются ленивыми. Их сериализация и регистрация откладываются до времени выполнения. В следующем примере UDF сериализуется и передается в кластер Spark для выполнения при вызове show().

from pyspark.sql.functions import udf

x = 123

@udf("INT")
def foo():
  return x


df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456

Это поведение отличается от классической модели Spark, где определяемые пользователем функции создаются с нетерпением. В Spark Classic значение x во время создания UDF фиксируется, поэтому последующие изменения x не влияют на уже созданный UDF.

Если необходимо изменить значение внешних переменных, от которых зависит UDF, используйте фабрику функций (замыкание с ранней привязкой) для правильного захвата значений переменных. В частности, оберните создание UDF во вспомогательную функцию, чтобы зафиксировать значение зависимой переменной.

Питон

from pyspark.sql.functions import udf

def make_udf(value):
  def foo():
    return value
  return udf(foo)


x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected

Scala

def makeUDF(value: Int) = udf(() => value)

var x = 123
val fooUDF = makeUDF(x)  // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected

Завернув определение UDF внутри другой функции (make_udf), мы создадим новую область, в которой текущее значение x передается в качестве аргумента. Это гарантирует, что каждый созданный UDF имеет собственную копию поля, привязанную во время создания UDF.

Запуск немедленного анализа для обнаружения ошибок

Следующая обработка ошибок полезна в классической версии Spark, так как она выполняет ранний анализ, что позволяет быстро выбрасывать исключения. Однако в Spark Connect этот код не вызывает никаких проблем, так как он создает только локальный неразрешенный план без активации какого-либо анализа.

df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])

try:
  df = df.select("name", "age")
  df = df.withColumn(
      "age_group",
      when(col("age") < 18, "minor").otherwise("adult"))
  df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
  print(f"Error: {repr(e)}")

Если код использует исключение анализа и вы хотите поймать его, можно активировать активный анализ, например с df.columns, df.schema или df.collect().

Питон

try:
  df = ...
  df.columns # This will trigger eager analysis
except Exception as e:
  print(f"Error: {repr(e)}")

Scala

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._

val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")

try {
  val df2 = df.select("name", "age")
    .withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
    .filter(col("age_with_typo") > 6)
  df2.columns // Trigger eager analysis to catch the error
} catch {
  case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}

Избегайте слишком большого количества запросов на анализ

Производительность можно улучшить, если избежать запросов анализа на большое количество DataFrame.

Создание новых DataFrame пошагово и доступ к их схемам на каждой итерации

При создании большого количества новых кадров данных избегайте чрезмерного использования вызовов, запускающих немедленный анализ на них (например, df.columns, df.schema). Вы можете получить доступ к схеме одного кадра данных несколько раз, но активация анализа во многих только что созданных кадрах данных влияет на производительность.

Например, при итеративном добавлении столбцов в кадр данных внутри цикла и проверке того, существует ли каждый столбец перед добавлением, вызов df.columns каждого созданного кадра данных активирует запрос на анализ при каждой итерации. Чтобы избежать этого, следует сохранить набор для отслеживания имен столбцов вместо многократного доступа к схеме Кадра данных.

Питон
df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
  new_column_name = str(i)
  # if new_column_name not in df.columns:  # Bad practice. The `df.columns` call causes an analysis request on the newly created DataFrame in every iteration.
  if new_column_name not in columns:  # Check the set without triggering analysis
    df = df.withColumn(new_column_name, F.col("id") + i)
    columns.add(new_column_name)
df.show()
Scala
import org.apache.spark.sql.functions._

var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
  val newColumnName = i.toString
  // if (!df.columns.contains(newColumnName)) {  // Bad practice. The `df.columns` call causes an analysis request on the newly created DataFrame in every iteration.
  if (!columns.contains(newColumnName)) {  // Check the set without triggering analysis
    df = df.withColumn(newColumnName, col("id") + i)
    columns.add(newColumnName)
  }
}
df.show()

Избегайте доступа к схемам для большого количества промежуточных кадров данных

Другим аналогичным случаем является создание большого количества ненужных промежуточных кадров данных и их анализа. В следующем случае для извлечения имен полей из каждого столбца типа структуры получите StructType сведения о поле непосредственно из схемы Кадра данных вместо создания промежуточных кадров данных.

Питон
from pyspark.sql.types import StructType

df = ...
struct_column_fields = {
    # column_schema.name: df.select(column_schema.name + ".*").columns  # Bad practice. This creates an intermediate DataFrame and triggers an analysis request for each StructType column.
    column_schema.name: [f.name for f in column_schema.dataType.fields]  # Access StructType fields directly from the schema, avoiding analysis on intermediate DataFrames.
    for column_schema in df.schema
    if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)
Scala
import org.apache.spark.sql.types.StructType

df = ...
val structColumnFields = df.schema.fields
  .filter(_.dataType.isInstanceOf[StructType])
  .map { field =>
    // field.name -> df.select(field.name + ".*").columns  // Bad practice. This creates an intermediate DataFrame and triggers analysis for each StructType column.
    field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name)  // Access StructType fields directly from the schema, avoiding analysis on intermediate DataFrames.
  }
  .toMap
println(structColumnFields)