H J M P R S W Y

window functions

Оконные функции (window functions) в Apache Spark работают на группах строк (это может быть фрейм, партиция, бакет) и возвращает одно значение, полученное в результате вычисления. Группа строк называется окном (window), отсюда и название функций. Spark SQL поддерживает три вида оконных функций:

  • ранжирующие (ranking): dense_rank, ntil, percent_rank, rank, row_number;
  • аналитические (analytic): cume_dist, lag, lead, nth_value (появился в Spark 3.1);
  • агрегирующие (aggregate): min, max, count, sum, std и т.д.

Оконные функции очень полезны для вычисления статистических значений или получения строк относительно некоторой заданной.

Оконные функции в классическом SQL

В классическом SQL оконная функция определяется следующим образом:

function OVER([PARTITION BY col1] [ORDER BY col2] [RANGE|ROWS])

Итак, function является сама оконная функция. PARTITION BY мы указываем нужно ли производить группировку, что аналогично GROUP BY, если партицирование не указано, то рассматриваться будет вся таблица. Также можно сортировать полученные записи через ORDER BY. А также выбрать границы окна.

Окном является группа строк, над которой происходит вычисление функции. Размер окна отсчитывается от текущей строки. По умолчанию границы определяется так:

RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW

Что говорит нам о том, что окно отсчитывается от текущей строки до самой первой строки партиции. Таким образом, можно построить нарастающий итог.

Также можно указать окном всю партицию:

RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING

Можете считать UNBOUNDED очень большим числом. Поэтому вместо него можно подставлять свои числа. Так можно считать окном две строки перед и две строк после относительно текущей:

RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING

Помимо границ фрейма RANGE существует ROWS. Отличаются они тем, как рассматривают дубликаты в столбце, преданному ORDER BY. При использовании ROWS записи с одинаковым значением в ORDER BY рассматриваются как отдельные строки. При использовании RANGE — как одна строка (см. рис ниже).

Схема, показывающая разницу между ROWS BETWEEN и RANGE BETWEEN
В чем разница между ROWS BETWEEN и RANGE BETWEEN

Класс Window

Чтобы применить оконные функции, нужно инициализировать экземпляр WindowSpec. Делается это через методы класса Window:

  1. orderBy(*cols)
  2. partitionBy(*cols)
  3. rangeBetween(start, end)
  4. rowsBetween(start, end)

Экземпляр этого класса передается методу over из оконных функций. Например, вот так можно взять последнее значение из окна, которое определяется всей партицией:

import pyspark.sql.functions as F
from pyspark.sql import Window

w = Window.partitionBy("dept_name") \
          .orderBy(F.desc("salary")) \
          .rangeBetween(Window.unboundedPreceding,
                        Window.unboundedFollowing)

df.withColumn("poorset_id", F.last("id").over(w)).show()

Для текущего строки используется Window.currentRow. Если хотите вместо unboundedPreceding использовать свои границы, то нужно использовать отрицательное число; например, окно из двух строк перед и двух строк после относительно текущей:

w = Window.partitionBy("dept_name") \
          .orderBy(F.desc("salary")) \
          .rowsBetween(-2, 2)

Ниже статьи с конкретными примерами использования оконных функций:

Поиск по сайту