В предыдущей статье мы говорили о фреймах оконных функций (window functions) в PySpark. Сегодня мы затронем такие аналитические функции, как NTH, NTILE, CUME_DIST, PERCENT_RANK.
Исходные данные
В качестве данных возьмем следующие образцы:
data = [
(1, "Alexander", "Admin", 3500),
(2, "Roman", "IT", 4500),
(3, "Tom", "IT", 5500),
(4, "Alex", "HR", 3000),
(5, "Arthur", "Finance", 4250),
(6, "Anna", "Finance", 3520),
(7, "Svetlana", "Admin", 3000),
(8, "Oleg", "IT", 3200),
(9, "Felix", "IT", 5500),
(10, "Olga", "IT", 4970),
(11, "Anna", "Finance", 6320),
(12, "Tom", "HR", 3500),
(13, "Felix", "Finance", 3520),
(14, "John", "Admin", 3500),
(15, "Finn", "IT", 5570),
(16, "Polly", "HR", 3800),
]
schema = ["id", "emp_name", "dept_name", "salary"]
df = spark.createDataFrame(data, schema)
df.createTempView("work")
Код курса
SPOT
Ближайшая дата курса
по запросу
Продолжительность
ак.часов
Стоимость обучения
0 руб.
Что вычисляет NTILE в PySpark
Функция ntile группирует строки на заданное количество частей. Применение этой функции аналогично функции bucketBy, т.е. созданию бакетов, о котором мы говорили тут.
Допустим поделим наши строки на три части в соответствии с уровнем зарплаты. Тогда SQL-запрос будет выглядеть так:
SELECT *,
NTILE(3) OVER(ORDER BY salary DESC) as backet_id
FROM work
В PySpark же:
w = Window.orderBy(F.desc("salary"))
df.withColumn("bucket_id", F.ntile(3).over(w)).show()
"""
+---+---------+---------+------+---------+
| id| emp_name|dept_name|salary|bucket_id|
+---+---------+---------+------+---------+
| 11| Anna| Finance| 6320| 1|
| 15| Finn| IT| 5570| 1|
| 3| Tom| IT| 5500| 1|
| 9| Felix| IT| 5500| 1|
| 10| Olga| IT| 4970| 1|
| 2| Roman| IT| 4500| 1|
| 5| Arthur| Finance| 4250| 2|
| 16| Polly| HR| 3800| 2|
| 6| Anna| Finance| 3520| 2|
| 13| Felix| Finance| 3520| 2|
| 1|Alexander| Admin| 3500| 2|
| 12| Tom| HR| 3500| 3|
| 14| John| Admin| 3500| 3|
| 8| Oleg| IT| 3200| 3|
| 4| Alex| HR| 3000| 3|
| 7| Svetlana| Admin| 3000| 3|
+---+---------+---------+------+---------+
"""
Конечно, 16 строк поделить ровно на три части не получится, поэтому в первом бакете находится 6 строк.
Теперь мы можем обращаться к любому из бакетов. Например, так:
w = Window.orderBy(F.desc("salary"))
case_when = (
F.when(F.expr("bucket_id = 1"), "richest")
.when(F.expr("bucket_id = 2"), "middle")
.when(F.expr("bucket_id = 3"), "poorest")
)
df.withColumn("bucket_id", F.ntile(3).over(w)) \
.withColumn("Status", case_when).show()
"""
+---+---------+---------+------+---------+-------+
| id| emp_name|dept_name|salary|bucket_id| Status|
+---+---------+---------+------+---------+-------+
| 11| Anna| Finance| 6320| 1|richest|
| 15| Finn| IT| 5570| 1|richest|
| 3| Tom| IT| 5500| 1|richest|
| 9| Felix| IT| 5500| 1|richest|
| 10| Olga| IT| 4970| 1|richest|
| 2| Roman| IT| 4500| 1|richest|
| 5| Arthur| Finance| 4250| 2| middle|
| 16| Polly| HR| 3800| 2| middle|
| 6| Anna| Finance| 3520| 2| middle|
| 13| Felix| Finance| 3520| 2| middle|
| 1|Alexander| Admin| 3500| 2| middle|
| 12| Tom| HR| 3500| 3|poorest|
| 14| John| Admin| 3500| 3|poorest|
| 8| Oleg| IT| 3200| 3|poorest|
| 4| Alex| HR| 3000| 3|poorest|
| 7| Svetlana| Admin| 3000| 3|poorest|
+---+---------+---------+------+---------+-------+
"""
Также можно делить на бакеты можно и для каждой партиции. Нужно для этого только указать partitionBy. Но в ней нельзя выбрать границы rows или range.
Что вычисляет CUME_DIST в PySpark
Функция cume_dist вычисляет кумулятивное распределение внутри группы значений.
Считается распределение очень просто. Например, в нашем датасете всего 16 записей. Тогда для первой строки cume_dist будет равен 1 / 16, для второй 2 / 16 и т.д. Однако одинаковые значения будут рассмотрены как один фрейм. В PySpark это выглядит так:
w = Window.orderBy("salary")
df.withColumn("cume_dist", F.cume_dist().over(w)).show()
"""
+---+---------+---------+------+---------+
| id| emp_name|dept_name|salary|cume_dist|
+---+---------+---------+------+---------+
| 4| Alex| HR| 3000| 0.125|
| 7| Svetlana| Admin| 3000| 0.125|
| 8| Oleg| IT| 3200| 0.1875|
| 1|Alexander| Admin| 3500| 0.375|
| 12| Tom| HR| 3500| 0.375|
| 14| John| Admin| 3500| 0.375|
| 6| Anna| Finance| 3520| 0.5|
| 13| Felix| Finance| 3520| 0.5|
| 16| Polly| HR| 3800| 0.5625|
| 5| Arthur| Finance| 4250| 0.625|
| 2| Roman| IT| 4500| 0.6875|
| 10| Olga| IT| 4970| 0.75|
| 3| Tom| IT| 5500| 0.875|
| 9| Felix| IT| 5500| 0.875|
| 15| Finn| IT| 5570| 0.9375|
| 11| Anna| Finance| 6320| 1.0|
+---+---------+---------+------+---------
"""
Как видим, первые две строки имеют одинаковую зарплату, поэтому для обоих будет значением 2 / 16 = 0.125, то же самое для Тома, Джона и Александра: 6 / 16 = 0.375.
Что вычисляет PERCENT_RANK в PySpark
Функция percent_rank вычисляет относительный ранг. Она очень похожа на предыдущую функцию cume_dist. Формула percent_rank такова:
percent_rank(i) = (i - 1) / (total_number_rows - 1), где, i - номер строки фрейма
Давайте посчитаем percent_rank в PySpark, также переведя его в проценты:
w = Window.orderBy("salary")
df.withColumn("perc_rank", F.percent_rank().over(w)) \
.withColumn("percentage", F.round(F.percent_rank().over(w)* 100)) \
.show()
"""
+---+---------+---------+------+-------------------+----------+
| id| emp_name|dept_name|salary| perc_rank|percentage|
+---+---------+---------+------+-------------------+----------+
| 4| Alex| HR| 3000| 0.0| 0.0|
| 7| Svetlana| Admin| 3000| 0.0| 0.0|
| 8| Oleg| IT| 3200|0.13333333333333333| 13.0|
| 1|Alexander| Admin| 3500| 0.2| 20.0|
| 12| Tom| HR| 3500| 0.2| 20.0|
| 14| John| Admin| 3500| 0.2| 20.0|
| 6| Anna| Finance| 3520| 0.4| 40.0|
| 13| Felix| Finance| 3520| 0.4| 40.0|
| 16| Polly| HR| 3800| 0.5333333333333333| 53.0|
| 5| Arthur| Finance| 4250| 0.6| 60.0|
| 2| Roman| IT| 4500| 0.6666666666666666| 67.0|
| 10| Olga| IT| 4970| 0.7333333333333333| 73.0|
| 3| Tom| IT| 5500| 0.8| 80.0|
| 9| Felix| IT| 5500| 0.8| 80.0|
| 15| Finn| IT| 5570| 0.9333333333333333| 93.0|
| 11| Anna| Finance| 6320| 1.0| 100.0|
+---+---------+---------+------+-------------------+----------+
"""
Итак, для первого фрейма мы получаем: (1 — 1) / (16 — 1) = 0, для второго: (3 — 1) / 15 = 0.13, для третьего: (4 — 1) / 15 = 0.2 и т.д. Здесь можно увидеть разницу между подсчетами фреймов. Функция percent_rank считает от первой строки фрейма, а функция cume_dist — от последней.
На основании этой таблицы м можем узнать, насколько в процентах кто-то зарабатывает больше или меньше. Например, мы видим, что Анна из финансового отдела зарабатывает на 40% больше, чем 6 ее коллег.
Код курса
SPOT
Ближайшая дата курса
Продолжительность
ак.часов
Стоимость обучения
0 руб.
Больше о аналитических функциях вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и IT-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:
- Анализ данных с Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
- Потоковая обработка в Apache Spark
- Основы Apache Spark для разработчиков



