4 совета по оптимизации Apache Spark

В прошлый раз мы рассмотрели 6 способов повышения производительности Apache Spark: кэширование, трансляция, бакетирование, минимизация перетасовки (shuffle), применение оконных функций и контрольных точек. Дадим еще 4 совета по оптимизации планов Spark.

Совет 1. Не используйте UDF (user defined functions)

На первый взгляд UDF (user defined functions), или функции определенные пользователем, очень полезны, так как помогают решать проблемы в чисто программистском ключе. Однако они идут с высокой ценой в PySpark. Такие функции обрабатывают одну строку за раз, следовательно, это приводит к лишней сериализации. Это значит, что данные будут перемещаться между исполнителем JVM и интерпретатором Python. Более того, после вызова функции Spark забудет, как данные были распределены до вызова. По этой причине производительность UDF в Python намного ниже, чем в Java и Scala.

Поэтому попытки избегать применения UDF — хорошая практика в PySpark (см. код ниже). Вместо этого используйте встроенные функции Spark SQL. Если же по какой-то причине вы вынуждены использовать UDF, то лучше воспользуйтесь Pandas UDF [1], функции которой построены на основе Apache Arrow. Также можно использовать UDF, которые реализованы через Java/Scala, но вызываемые в PySpark [2].

# Подсчет значений z-score

## Неоправданное использование UDF
z_score_udf = F.udf(lambda x, m, s: (x — m) / s, DoubleType())
  df = df.withColumn('z_score', z_score_udf('completed_job',
  'mean_completed_job', 'std_completed_job'))

## Но лучше сделать так:
df = df.withColumn('z_score',
  F.round(((F.col('completed_job') — F.col('mean_completed_job')) /
  F.col('std_completed_job')) ,2))

Совет 2. Избегайте скошенных данных

Код курса
SPARK
Ближайшая дата курса
по запросу
Продолжительность
ак.часов
Стоимость обучения
0 руб.

Время выполнения всего этапа напрямую зависит от самой долгой задачи. Вы могли встречаться с таким случаем, когда одна задача занимала минуты, а остальные, скажем 199, задачи занимали миллисекунды. Это результат неравномерного распределения данных по партициям. Такая проблема называется скосом данных. Эта проблема может возникнуть во время промежуточного этапа приложения Spark. Более того, если данные скошены слишком сильно, то это приведет к переносу данных с оперативной памяти на диск. Чтобы найти распределение данных в партициях используется функция glom. Также неравномерное распределение может быть обнаружено с помощью времени выполнения задачи и информации о объеме обработанных данных в этой задаче на странице исполнителя Spark UI.

partition_number = df.rdd.getNumPartitions()
data_distribution = df.rdd.glom().map(len).collect()

Spark 3.0 имеет оптимизатор запрос под названием Adaptive query execution, который автоматически пытается сбалансировать скошенные данные в партициях.

Совет 3. Используйте подходящие файловые форматы — Parquet

Apache Parquet — это колоночный формат, который разработан для выбора только запрашиваемых столбцов и игнорирования всех остальных. Он обеспечивает быстрое чтение данных. Parquet организует данные по столбцам, группируя связанные значения рядом друг с другом для оптимизации запросов, для минимизации использование операций ввода-вывода и для содействия сжатию. Более того, этот формат реализует вертикальный и горизонтальный срез (column pruning and predicate pushdown), который отбирает только необходимые записи, что предотвращает ненужную загрузку памяти и уменьшает нагрузку на сеть. Конечно, это решение не совсем связано со Spark, однако это хорошая практика.

Как оптимизируется запрос в Apache Parquet
Выбор нужных записей в Apache Parquet

Кроме того, чтение партиций напрямую также очень эффективное решение, если вы используете СУБД Cassandra. Однако здесь имеются подводные камни. Предположим, что таблица Cassandra разбита на столбцы, и вам интересно чтение последних 15 дней. В этом случае обычное чтение день за днем и использование оператора == с последующим объединением намного быстрее, чем просто фильтр > date_current-15:

dfs = []
for i in range(15):
    day_i = day_from + timedelta(days=i)
    df = self.sc_session \
      .read \
      .format('org.apache.spark.sql.cassandra') \
      .options(table=self.table, keyspace=self.keyspace) \
      .load()
    # Вместо > day_i:
    df = df.filter(F.col('PARTITION_KEY_COLUMN') == day_i)
    dfs.append(df)
# unin не требует перетасовки:
df_complete = reduce(DataFrame.union, dfs)

Совет 4. Используйте toPandas с pyArrow

Apache Arrow — это независимая от языка платформа для данных, хранящихся в оперативной памяти. В ней определены хранение непрерывных и иерархических данных в виде колоночной структуры. Иными словами, Apache Arrow — это мост между независимой от языка платформы, которая осуществляет чтение фреймов данных Spark’а, с последующим чтением фрейма в Apache Cassandra без каких-либо операций сериализации и десериализации.

Apache PyArrow — это имплементация Arrow на языке Python. Она обеспечивает взаимодействие Arrow и Python API, включая использование библиотек Pandas и NumPy. В Spark данные обрабатываются со скоростью, определенной в JVM (Java Virtual Machine). Как уже говорилось, сериализация, возникающая из-за переноса данных с интерпретатора Python на JVM, — не лучшая идея. В этом случае использование PyArrow не дает этому произойти, например, при перетаскивании данных фрейма данных из Pandas в PySpark и обратно.

PyArrow устанавливается через менеджеры пакетов pip или conda. Затем нужно лишь активировать в настройках. Все остальное остается прежним, без лишних строк кода.

!pip install pyarrow
spark.conf.set(“spark.sql.execution.arrow.enabled”, “true”)

Ещё больше подробностей о тонкостях работы Apache Spark и способах его оптимизации вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:

Код курса
SPOT
Ближайшая дата курса
по запросу
Продолжительность
ак.часов
Стоимость обучения
0 руб.

Записаться на курс

Смотреть раcписание

Источники
  1. pandas_udf
  2. Функции UDF в Java/Scala

Добавить комментарий

Поиск по сайту