Чтение и запись данных Apache HBase с помощью Apache Spark
Обычно для запроса Apache HBase применяется низкоуровневый API (сканирует, получает и помещает) или синтаксис SQL, использующий Apache Phoenix. Apache также предоставляет соединитель Apache Spark HBase. Он является удобной и эффективной альтернативой запросу и изменению данных, хранящихся в HBase.
Необходимые компоненты
Два отдельных кластера HDInsight развернуты в одной виртуальной сети: HBase и Spark (версия не ниже 2.1) (HDInsight 3.6). Дополнительные сведения см. в статье Создание кластеров под управлением Linux в HDInsight с помощью портала Azure.
Схема универсального кода ресурса (URI) для основного хранилища кластеров. Для службы хранилища BLOB-объектов этой схемой будет wasb://, для Azure Data Lake Storage 2-го поколения —
abfs://
, для Azure Data Lake Storage 1-го поколения — adl://. Если для службы хранилища BLOB-объектов включено безопасное перемещение, URI будет таким:wasbs://
. См. также сведения о безопасной передаче.
Общий процесс
Ниже приведен общий процесс, чтобы позволить кластеру Spark запрашивать кластер HBase.
- Подготовка демонстрационных данных в HBase.
- Извлеките файл hbase-site.xml из папки конфигурации кластера HBase (/etc/hbase/conf) и создайте копию файла hbase-site.xml в папке конфигурации Spark 2 (/etc/spark2/conf). (Необязательно: используйте для автоматизации этого процесса скрипт от команды HDInsight.)
- Запуск
spark-shell
со ссылкой на соединитель Spark HBase по его координатам Maven в параметреpackages
. - Определение каталога, который сопоставляет схему из Spark с HBase.
- Взаимодействие с данными HBase с помощью API RDD или таблицы данных.
Подготовка демонстрационных данных в Apache HBase
На этом этапе вы создаете и заполняете таблицу в Apache HBase, которую затем можно запросить с помощью Spark.
С помощью команды
ssh
подключитесь к кластеру HBase. Измените команду, заменивHBASECLUSTER
имя кластера HBase, а затем введите команду:ssh [email protected]
С помощью команды
hbase shell
запустите интерактивную оболочку HBase. В строку SSH-подключения введите следующую команду:hbase shell
С помощью команды
create
создайте таблицу HBase с двумя семействами столбцов. Введите следующую команду:create 'Contacts', 'Personal', 'Office'
С помощью команды
put
вставьте значения в указанный столбец строки в определенной таблице. Введите следующую команду:put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
С помощью команды
exit
остановите работу интерактивной оболочки HBase. Введите следующую команду:exit
Запуск сценариев для настройки подключения между кластерами
Чтобы настроить связь между кластерами, выполните действия, чтобы запустить два скрипта в кластерах. Эти скрипты автоматизируют процесс копирования файлов, описанных в разделе "Настройка обмена данными вручную".
- Скрипт, запускаемый из кластера HBase, отправит файл
hbase-site.xml
и сведения о сопоставлении IP-адресов HBase в хранилище по умолчанию, подключенное к кластеру Spark. - Скрипт, выполняемый из кластера Spark, настраивает два задания cron для периодического выполнения двух вспомогательных скриптов:
- задание cron HBase — скачивание новых файлов
hbase-site.xml
и сопоставления IP-адресов HBase из учетной записи хранения Spark по умолчанию на локальный узел; - задание cron Spark — проверка того, имело ли место масштабирование Spark и является ли кластер безопасным. Если это так, отредактируйте файл
/etc/hosts
, включив сохраненное локально IP-сопоставление HBase.
- задание cron HBase — скачивание новых файлов
ПРИМЕЧАНИЕ. Прежде чем продолжить, убедитесь, что вы добавили учетную запись хранения кластера Spark в кластер HBase в качестве дополнительной учетной записи хранения. Убедитесь, что скрипты указаны в порядке.
С помощью действия скрипта на кластере HBase примените изменения, приняв во внимание указанные ниже соображения.
Свойство Значение URI bash-скрипта https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
Типы узлов Область/регион Параметры -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
Сохранено yes SECONDARYS_STORAGE_URL
— URL-адрес хранилища по умолчанию на стороне Spark. Пример параметра:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
С помощью действия скрипта на кластере Spark примените изменения, приняв во внимание указанные ниже соображения.
Свойство Значение URI bash-скрипта https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
Типы узлов Головной, рабочий или ZooKeeper Параметры -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
Сохранено yes - Можно указать, как часто этот кластер должен автоматически проверять наличие обновлений. Вариант по умолчанию: -s “*/1 * * * *” -h 0 (в этом примере задание cron Spark выполняется каждую минуту, а задание cron HBase не выполняется).
- Так как cron HBase не настроен по умолчанию, необходимо повторно запустить этот скрипт при выполнении масштабирования в кластере HBase. Если кластер HBase часто масштабируется, вы можете настроить автоматический запуск задания cron HBase. Например:
-s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc"
настраивает скрипт для выполнения проверок каждые 30 минут. Это приведет к периодическому запуску задания cron HBase по расписанию для автоматической загрузки новых сведений HBase в учетную запись хранения на локальный узел.
Примечание.
Эти скрипты работают только в кластерах HDI 5.0 и HDI 5.1.
Настройка связи вручную (необязательно — на тот случай, если скрипт на предыдущем шаге завершается ошибкой)
ПРИМЕЧАНИЕ. Эти действия следует выполнять каждый раз, когда на одном из кластеров происходит масштабирование.
Скопируйте файл hbase-site.xml из локального хранилища в корень хранилища по умолчанию кластера Spark. Измените команду, чтобы отразить конфигурацию. Затем в открытом подключении SSH к кластеру HBase введите следующую команду:
Значение синтаксиса Новое значение Схема URI Измените в соответствии со своим хранилищем. Синтаксис предназначен для хранилища BLOB-объектов с включенной безопасной передачей. SPARK_STORAGE_CONTAINER
Замените соответствующее значение именем контейнера хранилища по умолчанию, используемым для вашего кластера Spark. SPARK_STORAGE_ACCOUNT
Замените соответствующее значение именем учетной записи хранения по умолчанию, используемой для вашего кластера Spark. hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
Затем закройте подключение SSH к кластеру HBase.
exit
Подключитесь к головному узлу кластера Spark с помощью SSH. Измените команду, заменив
SPARKCLUSTER
имя кластера Spark, а затем введите команду:ssh [email protected]
Введите команду, чтобы скопировать
hbase-site.xml
из хранилища кластера Spark по умолчанию в папку конфигурации Spark 2 в локальном хранилище кластера:sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
Запуск оболочки Shell со ссылкой на соединитель Spark HBase
После выполнения предыдущего шага вы сможете запустить оболочку Spark, указав соответствующую версию соединителя Spark HBase.
Например, в следующей таблице указаны две версии и соответствующие команды, которые используют специалисты HDInsight. Вы можете использовать для своих кластеров те же версии, если ваши версии HBase и Spark совпадают с указанными в таблице.
В подключении SSH к кластеру Spark введите следующую команду, чтобы запустить оболочку Spark:
Версия Spark Версия HDI HBase Версия SHC Команда 2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
Не закрывайте этот экземпляр оболочки Spark и перейдите в раздел Определение каталога и отправка запроса. Дальше в разделе описано, что делать, если вы не нашли JAR-файлы для своих версий в репозитории SHC Core.
Эти артефакты больше не публикуются в указанном выше репозитории для последующих сочетаний версий Spark и HBase. Вы можете создать необходимые вам JAR-файлы непосредственно из ветви GitHub spark-hbase-connector. Например, если вы работаете с Spark 2.4 и HBase 2.1, выполните следующие действия:
Клонируйте репозиторий:
git clone https://github.com/hortonworks-spark/shc
Перейдите к ветви branch-2.4.
git checkout branch-2.4
Выполните сборку из ветви (будет создан JAR-файл):
mvn clean package -DskipTests
Выполните следующую команду (обязательно измените имя JAR-файла на имя созданного вами файла):
spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
Не закрывайте этот экземпляр оболочки Spark и переходите к следующему разделу.
Определение каталога и отправка запроса
На этом этапе вы определяете объект каталога, который сопоставляет схему из Spark с Apache HBase.
В открытой оболочке Spark выполните следующие инструкции
import
:import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._
Введите следующую команду, чтобы задать каталог для таблицы Contacts, созданной в HBase:
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMargin
Код.
- определяет схему каталога для таблицы HBase под названием
Contacts
; - определяет rowkey как
key
и сопоставляет имена столбцов, используемые в Spark, с семейством столбцов, именем столбца и типом столбца, используемыми в HBase; - подробно определяет rowkey как именованный столбец (
rowkey
), который содержит определенное семейство столбцовcf
изrowkey
.
- определяет схему каталога для таблицы HBase под названием
Введите команду, чтобы определить метод, который предоставляет кадр данных вокруг
Contacts
таблицы в HBase:def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }
Создайте экземпляр таблицы данных:
val df = withCatalog(catalog)
Выполните запрос таблицы данных:
df.show()
Вы должны увидеть две строки данных:
+------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+
Зарегистрируйте временную таблицу, чтобы запрашивать таблицу HBase с помощью Spark SQL:
df.createTempView("contacts")
Выполните SQL-запрос к таблице
contacts
:spark.sqlContext.sql("select personalName, officeAddress from contacts").show
Вы должны увидеть примерно такой результат:
+-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+
Вставка новых данных
Чтобы вставить новую запись о контакте, определите класс
ContactRecord
:case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )
Создайте экземпляр
ContactRecord
и поместите его в массив:val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContact
Сохраните массив новых данных в HBase:
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
Изучите результаты:
df.show()
Вы должны увидеть примерно такой результат:
+------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+
Закройте оболочку Spark с помощью следующей команды:
:q