Кластер Spark

Spark — кластерная вычислительная платформа с открытым исходным кодом для распределенной пакетной и потоковой обработки данных. В рамках сервиса пользователи могут конфигурировать и создавать кластеры Spark для последующего запуска на них соответствующих задач препроцессинга данных. Организация работы с кластером Spark описана в разделах ниже. Пример, иллюстрирующий препроцессинг данных на кластере Spark, размещен на GitHub.

Создание Jupyter Server с доступом к кластеру Spark

Для создания Jupyter Server с доступом к кластеру Spark:

  1. Перейдите на вкладку Создать окружение → Стандартное окружение в главном меню сервиса Environments.

  2. На панели Стандартное окружение нажмите кнопку Создать Jupyter Server. Откроется диалоговое окно с параметрами нового сервера.

    ../_images/s__jupyter__create-new.png
  3. В поле Название Jupyter Server введите имя нового сервера. Имя следует вводить с учетом регистра. Разрешается применять следующие символы: строчные латинские буквы от a до z, цифры от 0 до 9, символ тире (–).

  4. Выберите Тип Jupyter Server. Обратите внимание на то, что выбор Spark CPU создаст Jupyter Server с доступом к кластеру Spark.

  5. В открывшемся диалоговом окне задайте необходимые параметры. Описание параметров приведено в таблице ниже.

    Этап конфигурирования

    Параметр

    Описание

    Название Jupyter Server

    Название Jupyter Server

    В поле Название введите имя нового подключения. Имя нового подключения следует вводить с учетом регистра. Разрешается применять следующие символы: строчные латинские буквы от a до z, цифры от 0 до 9, символ тире (–).

    Тип Jupyter Server

    Тип Jupyter Server

    Для создания Jupyter Server с доступом к кластеру Spark требуется указать параметр Spark (CPU).

    Ресурсы

    Ресурсы Driver

    Аппаратные ресурсы, аллоцированные для размещения Spark Driver. Выберите необходимую конфигурацию из списка доступных конфигураций.

    Ресурсы Executor

    Аппаратные ресурсы, аллоцированные для размещения воркеров Spark, на которых запускаются процессы Spark Executor. Выберите необходимую конфигурацию, которая будет соответствовать одному экземпляру Executor.

    Количество Executor

    Количество воркеров Spark, которые будут созданы в рамках кластера. В данный момент автомасштабирование кластера не поддерживается. Ресурсы будут выделены статично на все время существования кластера.

    Версия Spark

    Версия Spark на создаваемом кластере.

    Образ Jupyter Server

    Образ, в рамках которого запустится Jupyter Server. Выберите значение по умолчанию. Список библиотек, установленных в образе см. в разделе Библиотеки в образах.

    Образ Spark

    Образ для создания кластера Spark.

  6. Для создания кластера Spark с выбранными параметрами нажмите Создать.

  7. Кластер Spark будет создан через 1–2 минуты. После завершения создания кластера станет возможным подключение (возможность использовать интерфейс Jupyter Notebook или JupyterLab). Подключитесь к Jupyter Server, нажав соответствующую кнопку.

  8. В открывшемся окне Jupyter Server нажмите кнопку New. Выберите из выпадающего меню тип ноутбука Spark –> Python (Kubernetes Mode) AI Cloud. Созданный ноутбук будет подключен к кластеру Spark.

Схема создания кластера Spark в заданной конфигурации представлена ниже.

../_images/schm__create-spark-in-given-config.svg

Пояснение к схеме: пользователь создает Jupyter Server конфигурации Spark (CPU) с заданными параметрами. После нажатия кнопки Создать сервис разворачивает кластер Spark соответствующей конфигурации в Workspace пользователя.

SparkSession и SparkContext

SparkSession, начиная с версии Spark 2.0, является основной точкой входа в программирование Spark с использованием Dataframe и Dataset. В ранних версиях Spark в качестве точки входа в программирование наборов данных RDD и соединения с кластером использовался SparkContext.

Точка входа SparkContext в Spark версий 2.0 и выше имеется для обеспечения обратной совместимости.

Точка входа SparkSession представляет собой комбинированный класс для всех различных контекстов, которые использовались отдельно до версии Spark 2.0, включая SparkContext. Создание экземпляра SparkSession с использованием SparkSession.builder необходимо для начала работы с RDD, DataFrame и Dataset.

Spark Session включает в себя все API, доступные в контекстах:

  • Spark Context

  • SQL Context

Загрузка данных из хранилища S3

Данные, препроцессинг которых нужно осуществить в среде Spark, могут располагаться в хранилище S3. Доступ к хранилищу S3 осуществляется по протоколу S3A. Для получения доступа требуется задать конфигурацию для объекта класса SparkContext в атрибуте _jsc при помощи метода hadoopConfiguration().set(). Адрес endpoint, access key и secret key задаются следующим образом:

sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "https://your_endpoint_name")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "your_access_key")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "your_secret_key")

После указания данных параметров можно обращаться к данным в объектном хранилище S3 по ссылке, включающей в себя имя бакета S3. Например:

path = 's3a://your_bucket_name/path/dataset.csv'

После этого вы можете загрузить данные и создать датасет Spark следующим образом:

df = spark.read.format('csv').load(path)

Установка дополнительных библиотек при работе с кластером Spark

Подробнее про установку дополнительных библиотек можно узнать в разделе Установка дополнительных библиотек в Jupyter Server.

Запуск препроцессинга данных на кластере Spark

Работа с датафреймами Spark ведется по принципу «ленивых вычислений». Вычисления производятся тогда, когда пользователь запрашивает их результат. Собрать можно с помощью функции collect(), просмотреть состояние датасета — с помощью функции show().

Например, для выполнения SQL-запроса к содержимому датафрейма требуется создать временное представление, затем запустить задание Spark на препроцессинг, и собрать данные:

df.createOrReplaceTempView('temp_view')

result = spark.sql('''
    /* SQL-statement (FROM temp_view) */
''').collect()

Тарификация кластера Spark

Jupyter со Spark с CPU (окружение)
Вычислительные ресурсы
Jupyter со Spark с CPU (окружение) состоит из двух частей
  • Ресурсы Driver (cpu-ai-small (4 vCPU, 16 GB));

  • ресурсы Executor (cpu-ai-middle (8 vCPU, 32 GB)), количество Executor — 2.

Время выполнения операций

11 минут 25 секунд (округляется до целого числа минут в большую сторону)

Расчет стоимости (в рублях с учетом НДС):

\((0,14+0,29 \times 2) \times 12=8,64\)

Где:

  • 0,14 — стоимость CPU-минуты с учетом НДС (по тарифу AI-MLSPENVCPUENLSN-TS1MS0).

  • 0,24 — стоимость CPU-минуты с учетом НДС (по тарифу AI-MLSPENVCPUENLMN-TS1MS0).

  • 2 — количество Executor.

  • 12 — количество минут, которые проработала задача.

Масштабирование ресурсов

В данный момент для кластеров Spark не реализовано динамическое масштабирование ресурсов. Это означает, что количество Executor-ов останется неизменным и соответствующим заданному при создании кластера значению на протяжении всего времени существования кластера.

В будущих релизах будет добавлено автоматическое масштабирование кластера — количество Executor-ов будет изменяться до заданного максимального значения в зависимости от нагрузки на кластер.