Ускоряем Apache Spark: тонкая настройка

В прошлой статье мы говорили о кэшировании данных в Apache Spark для более быстрой обработки больших данных (Big Data). Помимо кэширования, можно также оптимизировать производительность потоковой обработки данных путем тонкой настройки конфигурации, так называемый fine tuninng. Читайте в этой статье: как посчитать количество исполнителей и расходы на память, подключить сборщик мусора G1GC, установить размеры задач при параллельном вычислении, подключить сериализацию Kyro, активировать динамическое размещение, а также как уменьшить время ожидания в Apache Spark.

Исполнители (Execturors) в Spark

Наиболее часто используемая и неправильно понимаемая часть потоковой обработки в Spark — это исполнители. Их настройка — это метод проб и ошибок, и без тестирования и экспериментов на вашей машине не обойтись.

Количество ядер

На первый взгляд кажется, что чем больше ядер, тем лучше. Но это не так. На каждом ядре исполнитель должен выполнять свои задачи. Думая о их количестве, вы должны учитывать:

  • Чем больше задач выполняется параллельно, тем больше накладных расходов на сборку мусора
  • Некоторые ядра будут заняты процессами ОС (операционная система), а также другими процессами HDFS

Количество исполнителей

Количество исполнителей — это количество Java-процессов, запущенных или отправленных менеджеру ресурсов. Если у вас есть m ядер, доступных в кластере, и вам нужно n ядер на каждого исполнителя, у вас в конечном итоге будут m/n исполнителей. При отправке задачи вы должны учитывать, что исполнитель должен быть назначен мастеру приложения YARN.

Память исполнителя

Память исполнителя — это доступная память в кластере (сумма доступной памяти по узлам с вычетом памяти, необходимой для операционной и файловой систем). Например, имеется 16 ГБ и 5 исполнителей на каждом узле, и допустим ОС и другие сторонние процессы занимают 1 ГБ памяти. Таким образом, вы можете выделить (16-1)/5 = 3 ГБ памяти на исполнителя. Конечно, получить все 2 ГБ на каждого исполнителя не получится, поскольку около 10% общей памяти исполнителя утилизируется YARN.

Сборщик мусора (Garbage Collector)

Сборка мусора в Spark Streaming является важным пунктом при потоковой обработке данных, поскольку она выполняется в самих потоках или микропакетах. В частности, в случае потоковой передачи это может серьезно повлиять на стандартную сборку мусора Java JVM из-за большого количества объектов, обрабатываемых во время выполнения. Это вызывает частые паузы и тем самым увеличивает задержку самого приложения в реальном времени. В качестве альтернативного варианта можно использовать сборщик CMS (concurrent mark sweep) внутри драйверной программы для сокращении количества задержек, поскольку сборка мусора выполняется одновременно с работой приложения. Ограничение количества создаваемых объектов наиболее важно для снижения накладных расходов на сборку мусора и повышения производительности.

Для более высоких размеров кучи (heap) и большей производительности при высокой пропускной способности и низких задержек рекомендуется использовать сборщик мусора G1GC. Однако, когда G1GC пытается собрать мусор для определенных разделов, он не может найти свободные разделы, в которые он может скопировать текущие объекты. Эта часто приводит к заполнению сборщика мусора. Заполненный сборщик мусора в G1GC даже хуже, чем в Parallel GC (параллельный сборщик), поэтому нужно стараться избегать переполнения.

Чтобы избежать перпеолнения в G1GC, обычно используются два подхода:

  • Уменьшение значения InitiatingHeapOccupancyPercent (по умолчанию 45), чтобы G1GC запускал параллельное маркирование в более раннее время
  • Увеличение значения ConcGCThreads, чтобы иметь больше потоков для параллельного маркирования. Этот параметр также может занять некоторые ресурсы вашего рабочего потока, в зависимости от загрузки CPU

Подобно сборщику CMS, G1 запускает параллельную обработку в зависимости от того, какая часть кучи заполнена. Опираясь на вышесказанное, в Apache Spark можно, например, применить следующую конфигурацию:

spark.executor.extraJavaOptions -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12

Размеры задач при параллельных вычислениях

Желательно, чтобы все вычисления обрабатывались параллельно (о параллельных вычислениях в Apache Spark тут). Здесь следует учитывать тот факт, что необходимо иметь оптимальные размеры разделов, чтобы они небыли слишком большими и слишком маленькие. В первом случае не получится получить параллельных вычислений, а во второй приведет к созданию множества мелких задач и потребует дополнительных затрат.

Приглядитесь к следующим настройкам:

  • spark.sql.files.maxPartitionBytes и spark.sql.files.openCostInBytes — это параметры, которые необходимо настроить, чтобы Spark мог определить идеальный размер раздела в кластере. Особенно их следует применять к таким форматам как: Parquet, JSON и ORC
  • Свойство spark.sql.shuffle.partitions определяет размер раздела при каждой операции при перемешивания, в идеале размер должен быть прямо пропорционален размеру файла

Они могут быть установлены с начальными значениями с помощью файла конфигурации и параметров командной строки с префиксом --conf/-c или явно в SparkConf. В этих параметрах указывается размер, но в них нужно передавать размер в байтах, а 1 MB = 1.048.576 Byte. Например, вот так можно указать размер 8 МБ, который приложение Spark просканирует одновременно:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .getOrCreate()

# 1 MB = 1.048.576 B
# 8 MB = 8.388.608 B
spark.conf.set("spark.files.openCostInBytes", "8388608")

Сериализация: Java vs Kyro

Сериализация играет важную роль в оптимизации любой распределенной системы, а также при настройке производительности Apache Spark. Сериализация напрямую замедляет работу системы, если выбранному типу требуется больше времени для сжатия и преобразования объектов. Spark поддерживает два сериализатора: Java и Kyro.

Java Serializer используется по умолчанию в Apache Spark. В основе лежит структура потока вывода объектов Java и реализуется за счет класса java.io.Serializable. Он обеспечивает хорошую гибкость и может без проблем работать с большинством форматов. Но он немного медленный.

Kyro Serializer работает примерно в 8 раз быстрее, чем Java Serializer, но недостатком является то, что он не поддерживает все форматы данных и требует. Попробуйте использовать его. Для того, чтобы перейти на Kyro в Spark достаточно указать следующую конфигурацию:

spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

Также стоит помнить, что кэширование может помочь избежать накладных расходов на десериализацию (о кэшировании тут).

Динамическое размещение

Spark может динамически добавлять или удалять исполнителей в зависимости от потребности и доступности ресурсов в кластере для лучшей оптимизации вычислительной мощности. Чтобы включить динамическое распределение ресурсов в Spark используйте следующую конфигурацию:

spark.conf.set("spark.dynamicAllocation.enabled", "true")

Теперь приложение может вернуть ресурсы кластеру, если они больше не используются, и запросить их снова, когда возникнет необходимость. Эта функция особенно полезна, если несколько приложений совместно используют ресурсы в вашем кластере.

Уменьшите время перехода уровня обработки данных

Как правило, при построении распределенных систем наиболее желательно, чтобы потоковая обработка выполнялась как можно ближе к хранилищу данных, откуда данные поступают. Конфигурации местоположения в Spark помогают определить наиболее подходящий из пяти поддерживаемых параметров:

  • PROCESS_LOCAL — данные и потоковая обработка локализованы на одной JVM
  • NODE_LOCAL — данные и потоковая обработка находятся на одном узле, но на разных исполнителях. Этот уровень медленнее, чем предыдущий, потому что он должен перемещать данные между исполнителями
  • RACK_LOCAL — данные и обработка локализованы на разных узлах, но узлы находятся на одной серверной стойке, т.е. данные считываются с жесткого диска на удаленном узле, а затем передаются по сети
  • NO_PREF — нет предпочтения по местности
  • ANY — данные находятся в другом месте и на разных серверах

Важно помнить, что эти уровни работают по приоритету, поэтому, если нет данных, полученных на Spark, то их поиск будет осуществляться на следующем уровне после некоторого ожидания, установленного spark.locality.wait в секундах (по умолчанию 3 сек). Например, код для уменьшения времени ожидания перехода на новый уровень в Apache Spark до 2 секунд:

spark.conf.set("spark.locality.wait", "2")

 

А ещё больше подробностей о тонкой настройки Apache Spark для достижения более высокой обработки больших данных (Big Data) на практических примерах вы узнаете на специализированном курсе «Потоковая обработка в Apache Spark» в лицензированном учебном центре обучения и повышения квалификации разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве.

Записаться на курс

Смотреть раcписание

Источники
  1. docs/latest/sql-performance-tuning.html
  2. docs/latest/job-scheduling.html#dynamic-resource-allocation
  3. stackoverflow.com/questions/36081571/node-local-vs-rack-local-task-read-time

Добавить комментарий

Поиск по сайту