Определяемые пользователем скалярные функции — Scala

В этой статье приведены примеры определяемых пользователем функций Scala. Здесь показано, как регистрировать UDF, как вызывать UDF, и предупреждения, касающиеся порядка вычисления подвыражений в Spark SQL. Дополнительные сведения см. в статье о внешних скалярных функциях, определяемых пользователем (UDF).

Требования

  • Для вычислительных ресурсов с поддержкой Unity Catalog функции Scala UDF со стандартным режимом доступа требуется Databricks Runtime 14.2 или более поздняя версия.

  • Для поддержки Scala UDFs на экземплярах ARM в кластерах с поддержкой каталога Unity требуется Databricks Runtime версии 15.2 или выше.

Зарегистрируйте функцию как UDF

val squared = (s: Long) => {
  s * s
}
spark.udf.register("square", squared)

Вызовите UDF в Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, square(id) as id_squared from test

Использование UDF с DataFrames

import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))

Порядок вычисления и проверка на NULL

В Spark SQL (включая SQL, а также API DataFrame и Dataset) не гарантируется определённый порядок вычисления подвыражений. В частности, входные данные оператора или функции не обязательно вычисляются слева направо или в любом другом фиксированном порядке. Например, логические выражения AND и OR не имеют семантики "краткозамыкания" слева направо.

Таким образом, не следует полагаться на побочные эффекты или порядок вычисления логических выражений, а также порядок предложений WHERE и HAVING, так как этот порядок и правила применения предложений могут изменяться в результате оптимизации или планирования запросов. В частности, если UDF использует семантику короткого замыкания в SQL для проверки на null, нет гарантии, что эта проверка будет выполнена до вызова UDF. Например,

spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee

В этом примере выражение WHERE не гарантирует, что определяемая пользователем функция strlen будет вызываться после фильтрации значений NULL.

Для правильной проверки значений NULL рекомендуется выполнить одно из следующих действий:

  • Сделайте саму UDF поддерживающей значения NULL и выполняйте проверку на NULL внутри неё.
  • Используйте выражения IF или CASE WHEN для проверки значения NULL и вызова UDF в условном блоке.
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

API типизированного набора данных

Примечание.

Эта функция поддерживается кластерами с включенным каталогом Unity и стандартным режимом доступа в Databricks Runtime 15.4 и выше.

API типизированного набора данных позволяют выполнять преобразования, такие как сопоставление, фильтрация и агрегаты для результирующего набора данных с определяемой пользователем функцией.

Например, следующее приложение Scala использует map() API для изменения числа в столбце результатов на префиксированную строку.

spark.range(3).map(f => s"row-$f").show()

Хотя в этом примере используется map() API, это также относится к другим типизированным API набора данных, таким как filter(), , mapPartitions()foreach(), foreachPartition()reduce()и flatMap().

Совместимость функций Scala UDF и Databricks Runtime

Следующие функции Scala требуют минимальных версий среды выполнения Databricks при использовании в кластерах с поддержкой каталога Unity в стандартном (общем) режиме доступа.

Функция Минимальная версия среды выполнения Databricks
Определяемые пользователем скалярные функции Databricks Runtime версия 14.2
Dataset.map, , Dataset.mapPartitionsDataset.filter, Dataset.reduceDataset.flatMap Databricks Runtime 15.4
KeyValueGroupedDataset.flatMapGroups, KeyValueGroupedDataset.mapGroups Databricks Runtime 15.4
(Потоковая передача) foreachWriter Sink Databricks Runtime 15.4
(Потоковая передача) foreachBatch Databricks Runtime 16.1
(Потоковая передача) KeyValueGroupedDataset.flatMapGroupsWithState Databricks Runtime 16.2