В предыдущей статье мы говорили о ранжирующих функциях из семейство оконных (window function) в PySpark. В этой статье пойдет речь об аналитических функциях LEAD и LAG, которые помогают сдвигать строки.
Данные с затонувшего Титаника
Возьмем для примера датасет с затонувшим Титаником. Его можно взять тут. У него много столбцов, поэтому вы покажем только его схему.
df = spark.read.csv("titanic_dataset.csv", header=True, inferScheme=True) """ root |-- PassengerId: integer (nullable = true) |-- Survived: integer (nullable = true) |-- Pclass: integer (nullable = true) |-- Name: string (nullable = true) |-- Sex: string (nullable = true) |-- Age: double (nullable = true) |-- SibSp: integer (nullable = true) |-- Parch: integer (nullable = true) |-- Ticket: string (nullable = true) |-- Fare: double (nullable = true) |-- Cabin: string (nullable = true) |-- Embarked: string (nullable = true) """
Что делает функция LAG в Apache PySpark
Функция LAG
берет значения заданного столбца из предыдущих строк. Можно также указать смещение, от которого нужно начать считать. Такая функция позволяет сравнивать значения одного и того же столбца: например, если есть столбец со временными замерами, тогда мы можем узнать сколько, было затрачено времени между замерами.
Для примера возьмем предыдущие значения оплаты за проезд Fare
, SQL-запрос для его получения выглядит следующим образом:
SELECT t.Name, t.Fare, LAG(t.Fare) OVER(ORDER BY t.Age) as prev_fare FROM titanic t;
Этот же запрос в PySpark:
import pyspark.sql.functions as F from pyspark.sql import Window w = Window.orderBy("Age") df.select( "Name", "Fare", F.lag("Fare").over(w).alias("prev_fare") ) """ +--------------------+------+---------+ | Name| Fare|prev_fare| +--------------------+------+---------+ | Moran, Mr. James|8.4583| null| |Williams, Mr. Cha...| 13.0| 8.4583| |Masselmani, Mrs. ...| 7.225| 13.0| |Emir, Mr. Farred ...| 7.225| 7.225| |"O'Dwyer, Miss. E...|7.8792| 7.225| +--------------------+------+---------+ """
Как можно заметить, самое первое значение полученного столбца является null
. Действительно, у первой строки нет предыдущего значения, поэтому ему по умолчанию выдается null
. Вместо null
можно выдавать свое значение, которое передается третьим параметром функции LAG
.
Указываем группы с помощью PARTITION BY
Мы можем также указать партицию (группу), по которой нужно выполнять оконную функцию. например, вычислим предыдущие значения для каждого класса пассажира Pclass
. Кроме того, давайте упорядочим результат в порядке убывания возраста пассажиры, т.е. самые старшие впереди.
SQL-запрос выглядит так:
SELECT t.Name, t.Age, t.Pclass, t.Fare, LAG(t.Fare) OVER(PARTITION BY t.Pclass ORDER BY t.Age) as prev_fare FROM titanic t;
В PySpark API вот так:
w = Window.partitionBy("Pclass").orderBy(F.desc("Age")) df.select( "Name", "Age", "Pclass", "Fare", F.lag("Fare").over(w).alias("prev_fare") ) """ +--------------------+----+------+-------+---------+ | Name| Age|Pclass| Fare|prev_fare| +--------------------+----+------+-------+---------+ |Barkworth, Mr. Al...|80.0| 1| 30.0| null| |Goldschmidt, Mr. ...|71.0| 1|34.6542| 30.0| |Artagaveytia, Mr....|71.0| 1|49.5042| 34.6542| |Crosby, Capt. Edw...|70.0| 1| 71.0| 49.5042| |Ostby, Mr. Engelh...|65.0| 1|61.9792| 71.0| |Millet, Mr. Franc...|65.0| 1| 26.55| 61.9792| | Fortune, Mr. Mark|64.0| 1| 263.0| 26.55| |Nicholson, Mr. Ar...|64.0| 1| 26.0| 263.0| +--------------------+----+------+-------+---------+ """
Чтобы удостовериться, что посчиталось предыдущие значения по каждому классу, используем тот трюк с ROW NUMBER
, который мы демонстрировали в одной из прошлой статьи.
df.select( "Name", "Age", "Pclass", "Fare", F.lag("Fare").over(w).alias("prev_fare"), F.row_number().over(w).alias("rn") ).where("rn < 4") """ +--------------------+----+------+-------+---------+---+ | Name| Age|Pclass| Fare|prev_fare| rn| +--------------------+----+------+-------+---------+---+ |Barkworth, Mr. Al...|80.0| 1| 30.0| null| 1| |Goldschmidt, Mr. ...|71.0| 1|34.6542| 30.0| 2| |Artagaveytia, Mr....|71.0| 1|49.5042| 34.6542| 3| | Svensson, Mr. Johan|74.0| 3| 7.775| null| 1| |Connors, Mr. Patrick|70.5| 3| 7.75| 7.775| 2| | Duane, Mr. Frank|65.0| 3| 7.75| 7.75| 3| |Mitchell, Mr. Hen...|70.0| 2| 10.5| null| 1| |Wheadon, Mr. Edwa...|66.0| 2| 10.5| 10.5| 2| | Harris, Mr. George|62.0| 2| 10.5| 10.5| 3| +--------------------+----+------+-------+---------+---+ """
Каждая группа с предыдущими значениями столбца Fare
начинается с null
. Значит запрос правильный.
Определяем свое смещение и значение по умолчанию
Во всех предыдущих случаях столбец смещался только на одну строку. Также первому значению строки выдавалось null
. Это поведение можно изменить, передав свое смещение и свое заполняемое значение через параметры функции.
Например, выберем смещение, равное 2, а первые две строки заполним нулями:
df.select( "Name", "Age", "Pclass", "Fare", F.lag("Fare", 2, 0).over(w).alias("prev_fare"), F.row_number().over(w).alias("rn") ).where("rn < 4").show(9) """ +--------------------+----+------+-------+---------+---+ | Name| Age|Pclass| Fare|prev_fare| rn| +--------------------+----+------+-------+---------+---+ |Barkworth, Mr. Al...|80.0| 1| 30.0| 0.0| 1| |Goldschmidt, Mr. ...|71.0| 1|34.6542| 0.0| 2| |Artagaveytia, Mr....|71.0| 1|49.5042| 30.0| 3| | Svensson, Mr. Johan|74.0| 3| 7.775| 0.0| 1| |Connors, Mr. Patrick|70.5| 3| 7.75| 0.0| 2| | Duane, Mr. Frank|65.0| 3| 7.75| 7.775| 3| |Mitchell, Mr. Hen...|70.0| 2| 10.5| 0.0| 1| |Wheadon, Mr. Edwa...|66.0| 2| 10.5| 0.0| 2| | Harris, Mr. George|62.0| 2| 10.5| 10.5| 3| +--------------------+----+------+-------+---------+---+ """
Функция LEAD выбирает следующие строки в PySpark
Функция LEAD
— противоположная LEAD
, т.е. отбирает следующие строки заданного столбца. Следовательно, незаполненными будут строки не первые, а последние. Функция также может принимать смещение и значение по умолчанию для незаполненных строк.
Продемонстрируем работу функции LEAD
и сравним ее с LAG
:
w = Window.orderBy("Age") df.select( "Name", "Fare", F.lag("Fare").over(w).alias("prev_fare"), F.lead("Fare").over(w).alias("next_fare") ) """ +--------------------+------+---------+---------+ | Name| Fare|prev_fare|next_fare| +--------------------+------+---------+---------+ | Moran, Mr. James|8.4583| null| 13.0| |Williams, Mr. Cha...| 13.0| 8.4583| 7.225| |Masselmani, Mrs. ...| 7.225| 13.0| 7.225| |Emir, Mr. Farred ...| 7.225| 7.225| 7.8792| |"O'Dwyer, Miss. E...|7.8792| 7.225| 7.8958| +--------------------+------+---------+---------+ """
Как видим, первые строки заполнены, потому что у первой строки есть следующее значение.
Убедимся, что последнее значение результат функции LEAD
является null
с помощью функции tail
, которая возвращает список из последних строк:
df.select( "Name", "Fare", F.lead("Fare").over(w).alias("next_fare") ).tail(3) """ [Row(Name='Artagaveytia, Mr. Ramon', Fare=49.5042, next_fare=7.775, last_val=49.5042), Row(Name='Svensson, Mr. Johan', Fare=7.775, next_fare=30.0, last_val=7.775), Row(Name='Barkworth, Mr. Algernon Henry Wilson', Fare=30.0, next_fare=None, last_val=30.0)] """
Код курса
SPOT
Ближайшая дата курса
по запросу
Продолжительность
ак.часов
Стоимость обучения
0 руб.
Еще больше подробностей об оконных функциях в PySpark вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и IT-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:
- Анализ данных с Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
- Потоковая обработка в Apache Spark
- Основы Apache Spark для разработчиков