Поделиться через


Запросы к базам данных с помощью JDBC

Это важно

Устаревшая документация по федерации запросов была прекращена и может не обновляться. Продукты, услуги или технологии, упомянутые в этом контенте, официально не поддерживаются или проверяются Databricks. См. раздел "Что такое федерация Lakehouse?".

Azure Databricks поддерживает подключение к внешним базам данных с помощью JDBC. В этой статье представлен базовый синтаксис для настройки и использования этих подключений с примерами в Python, SQL и Scala.

Это важно

Конфигурации, описанные в этой статье, являются экспериментальными. Экспериментальные функции предоставляются в имеющемся виде и не поддерживаются Databricks в рамках технической поддержки клиентов. Чтобы получить полную поддержку федерации запросов, следует использовать Lakehouse Federation, которая позволяет пользователям Azure Databricks воспользоваться синтаксисом Unity Catalog и средствами управления данными.

Partner Connect предоставляет оптимизированные интеграции для синхронизации данных со многими внешними источниками данных. См. статью Что такое Databricks Partner Connect?.

Это важно

Примеры в этой статье не содержат имен пользователей и паролей в URL-адресах JDBC. Databricks рекомендует использовать секреты для хранения ваших учётных данных базы данных. Рассмотрим пример.

Питон

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Чтобы ссылаться на секреты Databricks с помощью SQL, необходимо настроить свойство конфигурации Spark во время инициализации кластера.

Полный пример управления секретами см. в руководстве по созданию и использованию секрета Databricks.

Чтение данных с помощью JDBC

Необходимо настроить ряд параметров для чтения данных с помощью JDBC. Обратите внимание, что для каждой базы данных используется другой формат <jdbc-url>.

Питон

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark автоматически считывает схему из таблицы базы данных и сопоставляет типы с типами SPARK SQL.

Питон

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

Вы можете выполнять запросы к этой таблице JDBC:

Питон

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Запись данных с помощью JDBC

Сохранение данных в таблицы с помощью JDBC использует аналогичные конфигурации для чтения. См. следующий пример.

Питон

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Поведение по умолчанию пытается создать новую таблицу и выдает ошибку, если таблица с этим именем уже существует.

Можно добавить данные в существующую таблицу с помощью следующего синтаксиса:

Питон

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Вы можете перезаписать существующую таблицу с помощью следующего синтаксиса:

Питон

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Управление параллелизмом для запросов JDBC

По умолчанию драйвер JDBC запрашивает исходную базу данных только с одним потоком. Чтобы повысить производительность операций чтения, необходимо указать ряд параметров для управления количеством одновременных запросов Azure Databricks в базу данных. Для небольших кластеров установка numPartitions параметра равно количеству ядер исполнителя в кластере гарантирует, что все узлы запрашивают данные параллельно.

Предупреждение

Установка numPartitions высокого значения в большом кластере может привести к отрицательной производительности для удаленной базы данных, так как слишком много одновременных запросов может перегружать службу. Это особенно сложно для баз данных приложений. Будьте осторожны, устанавливая это значение выше 50.

Примечание.

Ускоряйте запросы, выбрав столбец с индексом, вычисляемым в исходной базе данных для partitionColumn.

В следующем примере кода демонстрируется настройка параллелизма для кластера с восемью ядрами:

Питон

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Примечание.

Azure Databricks поддерживает все параметры Apache Spark для настройки JDBC.

При записи в базы данных с помощью JDBC Apache Spark использует количество секций в памяти для управления параллелизмом. Перед записью можно переразбивать данные для управления параллелизмом. Избегайте большого количества секций в больших кластерах, чтобы избежать перегрузки удаленной базы данных. В следующем примере демонстрируется перераспределение на восемь частей перед записью.

Питон

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Передача запроса в ядро СУБД

Вы можете передать в базу данных весь запрос и возвратить только результат. Параметр table определяет таблицу JDBC для чтения. В предложении FROM запроса SQL можно использовать все допустимые элементы.

Питон

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Контроль количества строк, извлекаемых за один запрос

Драйверы JDBC имеют fetchSize параметр, который управляет количеством строк, полученных за раз из удаленной базы данных.

Настройки Результат
Слишком низкая Высокая задержка из-за большого числа запросов (малое количество строк, возвращаемых на один запрос)
Слишком высокая Ошибка нехватки памяти (слишком много данных, возвращаемых в одном запросе)

Оптимальное значение зависит от рабочей нагрузки. К ним относятся следующие рекомендации:

  • Сколько столбцов возвращает запрос?
  • Какие типы данных возвращаются?
  • Какова длина строк в каждом возвращаемом столбце?

Системы могут иметь очень небольшие настройки по умолчанию и выигрывать от их оптимизации. Например, значение по умолчанию fetchSize Oracle равно 10. Увеличение его до 100 уменьшает количество общих запросов, которые должны выполняться в 10 раз. Результаты JDBC представляют собой сетевой трафик, поэтому следует избегать очень больших чисел, однако оптимальные значения могут составлять тысячи для многих наборов данных.

fetchSize Используйте этот параметр, как показано в следующем примере:

Питон

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()