В предыдущей статье мы говорили о фреймах оконных функций (window functions) в PySpark. Сегодня мы затронем такие аналитические функции, как NTH
, NTILE
, CUME_DIST
, PERCENT_RANK
.
Исходные данные
В качестве данных возьмем следующие образцы:
data = [ (1, "Alexander", "Admin", 3500), (2, "Roman", "IT", 4500), (3, "Tom", "IT", 5500), (4, "Alex", "HR", 3000), (5, "Arthur", "Finance", 4250), (6, "Anna", "Finance", 3520), (7, "Svetlana", "Admin", 3000), (8, "Oleg", "IT", 3200), (9, "Felix", "IT", 5500), (10, "Olga", "IT", 4970), (11, "Anna", "Finance", 6320), (12, "Tom", "HR", 3500), (13, "Felix", "Finance", 3520), (14, "John", "Admin", 3500), (15, "Finn", "IT", 5570), (16, "Polly", "HR", 3800), ] schema = ["id", "emp_name", "dept_name", "salary"] df = spark.createDataFrame(data, schema) df.createTempView("work")Код курса
SPOT
Ближайшая дата курса
по запросуПродолжительность
ак.часов
Стоимость обучения
0 руб.
Что вычисляет NTILE в PySpark
Функция ntile
группирует строки на заданное количество частей. Применение этой функции аналогично функции bucketBy
, т.е. созданию бакетов, о котором мы говорили тут.
Допустим поделим наши строки на три части в соответствии с уровнем зарплаты. Тогда SQL-запрос будет выглядеть так:
SELECT *, NTILE(3) OVER(ORDER BY salary DESC) as backet_id FROM work
В PySpark же:
w = Window.orderBy(F.desc("salary")) df.withColumn("bucket_id", F.ntile(3).over(w)).show() """ +---+---------+---------+------+---------+ | id| emp_name|dept_name|salary|bucket_id| +---+---------+---------+------+---------+ | 11| Anna| Finance| 6320| 1| | 15| Finn| IT| 5570| 1| | 3| Tom| IT| 5500| 1| | 9| Felix| IT| 5500| 1| | 10| Olga| IT| 4970| 1| | 2| Roman| IT| 4500| 1| | 5| Arthur| Finance| 4250| 2| | 16| Polly| HR| 3800| 2| | 6| Anna| Finance| 3520| 2| | 13| Felix| Finance| 3520| 2| | 1|Alexander| Admin| 3500| 2| | 12| Tom| HR| 3500| 3| | 14| John| Admin| 3500| 3| | 8| Oleg| IT| 3200| 3| | 4| Alex| HR| 3000| 3| | 7| Svetlana| Admin| 3000| 3| +---+---------+---------+------+---------+ """
Конечно, 16 строк поделить ровно на три части не получится, поэтому в первом бакете находится 6 строк.
Теперь мы можем обращаться к любому из бакетов. Например, так:
w = Window.orderBy(F.desc("salary")) case_when = ( F.when(F.expr("bucket_id = 1"), "richest") .when(F.expr("bucket_id = 2"), "middle") .when(F.expr("bucket_id = 3"), "poorest") ) df.withColumn("bucket_id", F.ntile(3).over(w)) \ .withColumn("Status", case_when).show() """ +---+---------+---------+------+---------+-------+ | id| emp_name|dept_name|salary|bucket_id| Status| +---+---------+---------+------+---------+-------+ | 11| Anna| Finance| 6320| 1|richest| | 15| Finn| IT| 5570| 1|richest| | 3| Tom| IT| 5500| 1|richest| | 9| Felix| IT| 5500| 1|richest| | 10| Olga| IT| 4970| 1|richest| | 2| Roman| IT| 4500| 1|richest| | 5| Arthur| Finance| 4250| 2| middle| | 16| Polly| HR| 3800| 2| middle| | 6| Anna| Finance| 3520| 2| middle| | 13| Felix| Finance| 3520| 2| middle| | 1|Alexander| Admin| 3500| 2| middle| | 12| Tom| HR| 3500| 3|poorest| | 14| John| Admin| 3500| 3|poorest| | 8| Oleg| IT| 3200| 3|poorest| | 4| Alex| HR| 3000| 3|poorest| | 7| Svetlana| Admin| 3000| 3|poorest| +---+---------+---------+------+---------+-------+ """
Также можно делить на бакеты можно и для каждой партиции. Нужно для этого только указать partitionBy
. Но в ней нельзя выбрать границы rows
или range
.
Что вычисляет CUME_DIST в PySpark
Функция cume_dist
вычисляет кумулятивное распределение внутри группы значений.
Считается распределение очень просто. Например, в нашем датасете всего 16 записей. Тогда для первой строки cume_dist
будет равен 1 / 16, для второй 2 / 16 и т.д. Однако одинаковые значения будут рассмотрены как один фрейм. В PySpark это выглядит так:
w = Window.orderBy("salary") df.withColumn("cume_dist", F.cume_dist().over(w)).show() """ +---+---------+---------+------+---------+ | id| emp_name|dept_name|salary|cume_dist| +---+---------+---------+------+---------+ | 4| Alex| HR| 3000| 0.125| | 7| Svetlana| Admin| 3000| 0.125| | 8| Oleg| IT| 3200| 0.1875| | 1|Alexander| Admin| 3500| 0.375| | 12| Tom| HR| 3500| 0.375| | 14| John| Admin| 3500| 0.375| | 6| Anna| Finance| 3520| 0.5| | 13| Felix| Finance| 3520| 0.5| | 16| Polly| HR| 3800| 0.5625| | 5| Arthur| Finance| 4250| 0.625| | 2| Roman| IT| 4500| 0.6875| | 10| Olga| IT| 4970| 0.75| | 3| Tom| IT| 5500| 0.875| | 9| Felix| IT| 5500| 0.875| | 15| Finn| IT| 5570| 0.9375| | 11| Anna| Finance| 6320| 1.0| +---+---------+---------+------+--------- """
Как видим, первые две строки имеют одинаковую зарплату, поэтому для обоих будет значением 2 / 16 = 0.125, то же самое для Тома, Джона и Александра: 6 / 16 = 0.375.
Что вычисляет PERCENT_RANK в PySpark
Функция percent_rank
вычисляет относительный ранг. Она очень похожа на предыдущую функцию cume_dist
. Формула percent_rank
такова:
percent_rank(i) = (i - 1) / (total_number_rows - 1), где, i - номер строки фрейма
Давайте посчитаем percent_rank
в PySpark, также переведя его в проценты:
w = Window.orderBy("salary") df.withColumn("perc_rank", F.percent_rank().over(w)) \ .withColumn("percentage", F.round(F.percent_rank().over(w)* 100)) \ .show() """ +---+---------+---------+------+-------------------+----------+ | id| emp_name|dept_name|salary| perc_rank|percentage| +---+---------+---------+------+-------------------+----------+ | 4| Alex| HR| 3000| 0.0| 0.0| | 7| Svetlana| Admin| 3000| 0.0| 0.0| | 8| Oleg| IT| 3200|0.13333333333333333| 13.0| | 1|Alexander| Admin| 3500| 0.2| 20.0| | 12| Tom| HR| 3500| 0.2| 20.0| | 14| John| Admin| 3500| 0.2| 20.0| | 6| Anna| Finance| 3520| 0.4| 40.0| | 13| Felix| Finance| 3520| 0.4| 40.0| | 16| Polly| HR| 3800| 0.5333333333333333| 53.0| | 5| Arthur| Finance| 4250| 0.6| 60.0| | 2| Roman| IT| 4500| 0.6666666666666666| 67.0| | 10| Olga| IT| 4970| 0.7333333333333333| 73.0| | 3| Tom| IT| 5500| 0.8| 80.0| | 9| Felix| IT| 5500| 0.8| 80.0| | 15| Finn| IT| 5570| 0.9333333333333333| 93.0| | 11| Anna| Finance| 6320| 1.0| 100.0| +---+---------+---------+------+-------------------+----------+ """
Итак, для первого фрейма мы получаем: (1 — 1) / (16 — 1) = 0, для второго: (3 — 1) / 15 = 0.13, для третьего: (4 — 1) / 15 = 0.2 и т.д. Здесь можно увидеть разницу между подсчетами фреймов. Функция percent_rank
считает от первой строки фрейма, а функция cume_dist
— от последней.
На основании этой таблицы м можем узнать, насколько в процентах кто-то зарабатывает больше или меньше. Например, мы видим, что Анна из финансового отдела зарабатывает на 40% больше, чем 6 ее коллег.
Код курса
SPOT
Ближайшая дата курса
Продолжительность
ак.часов
Стоимость обучения
0 руб.
Больше о аналитических функциях вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и IT-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:
- Анализ данных с Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
- Потоковая обработка в Apache Spark
- Основы Apache Spark для разработчиков