В прошлый раз мы говорили о использовании агрегирующих функциях с использованием окон (window function) в PySpark. Сегодня поговорим об одной из ранжирующих функций ROW NUMBER, а также посмотрим, как ее можно использовать.
Данные по Титанику
Возьмем для примера датасет с затонувшим Титаником. Его можно взять тут. У него много столбцов, поэтому вы покажем только его схему.
df = spark.read.csv("titanic_dataset.csv", header=True, inferScheme=True)
"""
root
|-- PassengerId: integer (nullable = true)
|-- Survived: integer (nullable = true)
|-- Pclass: integer (nullable = true)
|-- Name: string (nullable = true)
|-- Sex: string (nullable = true)
|-- Age: double (nullable = true)
|-- SibSp: integer (nullable = true)
|-- Parch: integer (nullable = true)
|-- Ticket: string (nullable = true)
|-- Fare: double (nullable = true)
|-- Cabin: string (nullable = true)
|-- Embarked: string (nullable = true)
"""
Как использовать ROW NUMBER в PySpark
Функция ROW NUMBER присваивает каждой строке номер в порядке следования. Так, с ее помощью можно создать уникальные идентификаторы после сортировки ORDER BY, для каждой группы PARTITION BY или и то , и другое.
В Apache Spark обязательно нужно указать порядок ORDER BY, без него вылетит ошибка (хотя некоторые СУБД могут сами разрешить порядок).
Итак, допустим дадим номера в порядке следования, упорядочив данные по плате за проезд Fare. Тогда запрос SQL будет выглядеть так:
SELECT t.Name, t.Sex, t.Fare,
ROW_NUMBER() OVER(ORDER BY t.Fare) AS rn
FROM titanic t
"""
+--------------------+------+---+
| Name| Fare| rn|
+--------------------+------+---+
| Leonard, Mr. Lionel| 0.0| 1|
|Harrison, Mr. Wil...| 0.0| 2|
|Tornquist, Mr. Wi...| 0.0| 3|
|"Parkes, Mr. Fran...| 0.0| 4|
|Johnson, Mr. Will...| 0.0| 5|
|Cunningham, Mr. A...| 0.0| 6|
|Campbell, Mr. Wil...| 0.0| 7|
|"Frost, Mr. Antho...| 0.0| 8|
| Johnson, Mr. Alfred| 0.0| 9|
|Parr, Mr. William...| 0.0| 10|
|Watson, Mr. Ennis...| 0.0| 11|
|Knight, Mr. Robert J| 0.0| 12|
|Andrews, Mr. Thom...| 0.0| 13|
| Fry, Mr. Richard| 0.0| 14|
|Reuchlin, Jonkhee...| 0.0| 15|
| Betros, Mr. Tannous|4.0125| 16|
|Carlsson, Mr. Fra...| 5.0| 17|
|Nysveen, Mr. Joha...|6.2375| 18|
|Lemberopolous, Mr...|6.4375| 19|
|Holm, Mr. John Fr...| 6.45| 20|
+--------------------+------+---+
"""
Новый столбец rn и является тем самым столбцом с индивидуальными номерами.
В PySpark этот же запрос будет выглядеть так:
import pyspark.sql.functions as F
from pyspark.sql import Window
w = Window.orderBy("Fare")
df.select("Name", "Fare", F.row_number().over(w).alias("rn"))
Мы можем также задействовать группы PARTITION BY. Например, сгруппируем пассажиров Титаника по их классу Pclass:
w = Window.partitionBy("Pclass").orderBy("Fare")
df.select(
"Name", "Pclass", "Sex", "Fare",
F.row_number().over(w).alias("rn")
)
"""
+--------------------+------+------+-------+---+
| Name|Pclass| Sex| Fare| rn|
+--------------------+------+------+-------+---+
|Harrison, Mr. Wil...| 1| male| 0.0| 1|
|Parr, Mr. William...| 1| male| 0.0| 2|
|Andrews, Mr. Thom...| 1| male| 0.0| 3|
| Fry, Mr. Richard| 1| male| 0.0| 4|
|Reuchlin, Jonkhee...| 1| male| 0.0| 5|
|Carlsson, Mr. Fra...| 1| male| 5.0| 6|
|Colley, Mr. Edwar...| 1| male|25.5875| 7|
| Baumann, Mr. John D| 1| male| 25.925| 8|
|Leader, Dr. Alice...| 1|female|25.9292| 9|
|Swift, Mrs. Frede...| 1|female|25.9292| 10|
+--------------------+------+------+-------+---+
"""
Можем убедиться, что у другого класса своя нумерация:
df.select(
"Name", "Pclass", "Sex", "Fare",
F.row_number().over(w).alias("rn")
).where("Pclass == 2")
"""
+--------------------+------+----+----+---+
| Name|Pclass| Sex|Fare| rn|
+--------------------+------+----+----+---+
|"Parkes, Mr. Fran...| 2|male| 0.0| 1|
|Cunningham, Mr. A...| 2|male| 0.0| 2|
|Campbell, Mr. Wil...| 2|male| 0.0| 3|
|"Frost, Mr. Antho...| 2|male| 0.0| 4|
|Watson, Mr. Ennis...| 2|male| 0.0| 5|
+--------------------+------+----+----+---+
"""
Можно поступить другим путем и взять несколько строк из каждой группы, как это показано далее.
Use case: взять по несколько строк из каждой группы
Что если нужно взять заданное количество строк из каждой группы? Тогда вам может понадобится функция ROW NUMBER.
В этом случае мы можем взять в виде подзапроса запрос с функцией ROW NUMBER и отфильтровать его. Например, вот так выглядит SQL-запрос, который позволит взять не более две строки из каждой группы:
SELECT * FROM (
SELECT t.Name, t.Pclass, t.Fare,
ROW_NUMBER() OVER(PARTITION BY Pclass ORDER BY t.Fare) AS rn
FROM titanic t
) x
WHERE x.rn < 3;
"""
+--------------------+------+----+---+
| Name|Pclass|Fare| rn|
+--------------------+------+----+---+
| Leonard, Mr. Lionel| 3| 0.0| 1|
|Tornquist, Mr. Wi...| 3| 0.0| 2|
|Harrison, Mr. Wil...| 1| 0.0| 1|
|Parr, Mr. William...| 1| 0.0| 2|
|"Parkes, Mr. Fran...| 2| 0.0| 1|
|Cunningham, Mr. A...| 2| 0.0| 2|
+--------------------+------+----+---+
"""
Тот же самый запрос в PySpark:
x = df.select(
"Name", "Pclass", "Sex", "Fare",
F.row_number().over(w).alias("rn")
)
x.where("rn < 3")
Код курса
GRAS
Ближайшая дата курса
по запросу
Продолжительность
ак.часов
Стоимость обучения
0 руб.
А в следующей статье поговорим о функциях RANK. Еще больше подробностей об оконных функциях вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и IT-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:
- Анализ данных с Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
- Потоковая обработка в Apache Spark
- Основы Apache Spark для разработчиков



