В предыдущей статье мы говорили о ранжирующих функциях из семейство оконных (window function) в PySpark. В этой статье пойдет речь об аналитических функциях LEAD и LAG, которые помогают сдвигать строки.
Данные с затонувшего Титаника
Возьмем для примера датасет с затонувшим Титаником. Его можно взять тут. У него много столбцов, поэтому вы покажем только его схему.
df = spark.read.csv("titanic_dataset.csv", header=True, inferScheme=True)
"""
root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
"""
Что делает функция LAG в Apache PySpark
Функция LAG берет значения заданного столбца из предыдущих строк. Можно также указать смещение, от которого нужно начать считать. Такая функция позволяет сравнивать значения одного и того же столбца: например, если есть столбец со временными замерами, тогда мы можем узнать сколько, было затрачено времени между замерами.
Для примера возьмем предыдущие значения оплаты за проезд Fare, SQL-запрос для его получения выглядит следующим образом:
SELECT t.Name, t.Fare,
    LAG(t.Fare) OVER(ORDER BY t.Age) as prev_fare
FROM titanic t;
Этот же запрос в PySpark:
import pyspark.sql.functions as F
from pyspark.sql import Window
w = Window.orderBy("Age")
df.select(
    "Name", "Fare",
    F.lag("Fare").over(w).alias("prev_fare")
)
"""
+--------------------+------+---------+
|                Name|  Fare|prev_fare|
+--------------------+------+---------+
|    Moran, Mr. James|8.4583|     null|
|Williams, Mr. Cha...|  13.0|   8.4583|
|Masselmani, Mrs. ...| 7.225|     13.0|
|Emir, Mr. Farred ...| 7.225|    7.225|
|"O'Dwyer, Miss. E...|7.8792|    7.225|
+--------------------+------+---------+
"""
Как можно заметить, самое первое значение полученного столбца является null. Действительно, у первой строки нет предыдущего значения, поэтому ему по умолчанию выдается null. Вместо null можно выдавать свое значение, которое передается третьим параметром функции LAG.
Указываем группы с помощью PARTITION BY
Мы можем также указать партицию (группу), по которой нужно выполнять оконную функцию. например, вычислим предыдущие значения для каждого класса пассажира Pclass. Кроме того, давайте упорядочим результат в порядке убывания возраста пассажиры, т.е. самые старшие впереди.
SQL-запрос выглядит так:
SELECT t.Name, t.Age, t.Pclass, t.Fare,
    LAG(t.Fare)
    OVER(PARTITION BY t.Pclass ORDER BY t.Age) as prev_fare
FROM titanic t;
В PySpark API вот так:
w = Window.partitionBy("Pclass").orderBy(F.desc("Age"))
df.select(
    "Name", "Age", "Pclass", "Fare",
    F.lag("Fare").over(w).alias("prev_fare")
)
"""
+--------------------+----+------+-------+---------+
|                Name| Age|Pclass|   Fare|prev_fare|
+--------------------+----+------+-------+---------+
|Barkworth, Mr. Al...|80.0|     1|   30.0|     null|
|Goldschmidt, Mr. ...|71.0|     1|34.6542|     30.0|
|Artagaveytia, Mr....|71.0|     1|49.5042|  34.6542|
|Crosby, Capt. Edw...|70.0|     1|   71.0|  49.5042|
|Ostby, Mr. Engelh...|65.0|     1|61.9792|     71.0|
|Millet, Mr. Franc...|65.0|     1|  26.55|  61.9792|
|   Fortune, Mr. Mark|64.0|     1|  263.0|    26.55|
|Nicholson, Mr. Ar...|64.0|     1|   26.0|    263.0|
+--------------------+----+------+-------+---------+
"""
Чтобы удостовериться, что посчиталось предыдущие значения по каждому классу, используем тот трюк с ROW NUMBER, который мы демонстрировали в одной из прошлой статьи.
df.select(
    "Name", "Age", "Pclass", "Fare",
    F.lag("Fare").over(w).alias("prev_fare"),
    F.row_number().over(w).alias("rn")
).where("rn < 4")
"""
+--------------------+----+------+-------+---------+---+
|                Name| Age|Pclass|   Fare|prev_fare| rn|
+--------------------+----+------+-------+---------+---+
|Barkworth, Mr. Al...|80.0|     1|   30.0|     null|  1|
|Goldschmidt, Mr. ...|71.0|     1|34.6542|     30.0|  2|
|Artagaveytia, Mr....|71.0|     1|49.5042|  34.6542|  3|
| Svensson, Mr. Johan|74.0|     3|  7.775|     null|  1|
|Connors, Mr. Patrick|70.5|     3|   7.75|    7.775|  2|
|    Duane, Mr. Frank|65.0|     3|   7.75|     7.75|  3|
|Mitchell, Mr. Hen...|70.0|     2|   10.5|     null|  1|
|Wheadon, Mr. Edwa...|66.0|     2|   10.5|     10.5|  2|
|  Harris, Mr. George|62.0|     2|   10.5|     10.5|  3|
+--------------------+----+------+-------+---------+---+
"""
Каждая группа с предыдущими значениями столбца Fare начинается с null. Значит запрос правильный.
Определяем свое смещение и значение по умолчанию
Во всех предыдущих случаях столбец смещался только на одну строку. Также первому значению строки выдавалось null. Это поведение можно изменить, передав свое смещение и свое заполняемое значение через параметры функции.
Например, выберем смещение, равное 2, а первые две строки заполним нулями:
df.select(
    "Name", "Age", "Pclass", "Fare",
    F.lag("Fare", 2, 0).over(w).alias("prev_fare"),
    F.row_number().over(w).alias("rn")
).where("rn < 4").show(9)
"""
+--------------------+----+------+-------+---------+---+
|                Name| Age|Pclass|   Fare|prev_fare| rn|
+--------------------+----+------+-------+---------+---+
|Barkworth, Mr. Al...|80.0|     1|   30.0|      0.0|  1|
|Goldschmidt, Mr. ...|71.0|     1|34.6542|      0.0|  2|
|Artagaveytia, Mr....|71.0|     1|49.5042|     30.0|  3|
| Svensson, Mr. Johan|74.0|     3|  7.775|      0.0|  1|
|Connors, Mr. Patrick|70.5|     3|   7.75|      0.0|  2|
|    Duane, Mr. Frank|65.0|     3|   7.75|    7.775|  3|
|Mitchell, Mr. Hen...|70.0|     2|   10.5|      0.0|  1|
|Wheadon, Mr. Edwa...|66.0|     2|   10.5|      0.0|  2|
|  Harris, Mr. George|62.0|     2|   10.5|     10.5|  3|
+--------------------+----+------+-------+---------+---+
"""
Функция LEAD выбирает следующие строки в PySpark
Функция LEAD — противоположная LEAD, т.е. отбирает следующие строки заданного столбца. Следовательно, незаполненными будут строки не первые, а последние. Функция также может принимать смещение и значение по умолчанию для незаполненных строк.
Продемонстрируем работу функции LEAD и сравним ее с LAG:
w = Window.orderBy("Age")
df.select(
    "Name", "Fare",
    F.lag("Fare").over(w).alias("prev_fare"),
    F.lead("Fare").over(w).alias("next_fare")
)
"""
+--------------------+------+---------+---------+
|                Name|  Fare|prev_fare|next_fare|
+--------------------+------+---------+---------+
|    Moran, Mr. James|8.4583|     null|     13.0|
|Williams, Mr. Cha...|  13.0|   8.4583|    7.225|
|Masselmani, Mrs. ...| 7.225|     13.0|    7.225|
|Emir, Mr. Farred ...| 7.225|    7.225|   7.8792|
|"O'Dwyer, Miss. E...|7.8792|    7.225|   7.8958|
+--------------------+------+---------+---------+
"""
Как видим, первые строки заполнены, потому что у первой строки есть следующее значение.
Убедимся, что последнее значение результат функции LEAD является null с помощью функции tail, которая возвращает список из последних строк:
df.select(
    "Name", "Fare",
    F.lead("Fare").over(w).alias("next_fare")
).tail(3)
"""
[Row(Name='Artagaveytia, Mr. Ramon', Fare=49.5042, next_fare=7.775, last_val=49.5042),
 Row(Name='Svensson, Mr. Johan', Fare=7.775, next_fare=30.0, last_val=7.775),
 Row(Name='Barkworth, Mr. Algernon Henry Wilson', Fare=30.0, next_fare=None, last_val=30.0)]
"""
Код курса
SPOT
Ближайшая дата курса
по запросу
Продолжительность
ак.часов
Стоимость обучения
0 руб.
Еще больше подробностей об оконных функциях в PySpark вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и IT-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:
- Анализ данных с Apache Spark
 - Машинное обучение в Apache Spark
 - Графовые алгоритмы в Apache Spark
 - Потоковая обработка в Apache Spark
 - Основы Apache Spark для разработчиков
 



