Фильтрация данных в Apache Spark: лучшие практики

В прошлой статье мы говорили о вычислении SQL-выражений в Apache Spark, а также немного затронули тему фильтрации данных. В этот раз углубимся в эту тему, поскольку без фильтрации не обходится никакой анализ данных. Сегодня расскажем, как фильтровать числовые, строковые значения, а также массивы в PySpark.

Что такое фильтрация данных, и как она используется в Spark

Фильтрация данных — это выборка тех записей таблицы (DataFrame), которые удовлетворяют заданному условию. В PySpark для этого используется методы filter или where (они делают то же самое). Кроме того, имеется возможность напрямую писать SQL-запросы через spark.sql (см. тут).

df = spark.createDataFrame([
    (['py', 'c', 'cpp'],  'Anton', 'Moscow',  23,      19),
    (['c', 'java', 'hs'], 'Anna',  'Omsk',    27,      25),
    (['py', 'cl',],       'Andry', 'Moscow',  24,      22),
    (['cpp',],            'Alex',  'Moscow',  32,      25),
    (['cpp', 'py'],       'Boris', 'Omsk',    55,      37),
    (['py', 'cl'],        'Vera',  'Moscow',  89,      41), ],
    ['lang',              'name',  'city',   'salary', 'age'])

df.show()
"""
+-------------+-----+------+------+
|         lang| name|  city|salary|
+-------------+-----+------+------+
| [py, c, cpp]|Anton|Moscow|    23|
|[c, java, hs]| Anna|  Omsk|    27|
|     [py, cl]|Andry|Moscow|    24|
|        [cpp]| Alex|Moscow|    32|
|    [cpp, py]|Boris|  Omsk|    55|
|     [py, cl]| Vera|Moscow|    89|
+-------------+-----+------+------+
"""

Далее мы будем вводить выражения для фильтрации в двух редакциях: через точечную нотацию и через SQL-выражения. Чаще программисты используют точечную нотацию, однако у него есть недостаток: он не сработает, если в имени столбца есть что-то отличное от цифр и латинских букв. Например, может прошествовать пробел. В этом случае можно либо переименовать столбцы, либо использовать SQL-выражения. На всякий случай упомянем, как переименовать столбец в PySpark (если вы решили использовать точечную нотацию):

# Есть ещё масса других способов, но это самый простой
df2 = df.withColumnRenamed('old_name', 'new')

Фильтрация чисел

Машинное обучение в Apache Spark

Код курса
MLSP
Ближайшая дата курса
21 сентября, 2022
Длительность обучения
16 ак.часов
Стоимость обучения
40 000 руб.

Каким образом можно фильтровать числа? Число может быть равно какому-то значению, быть больше или меньше, входить в интервал. Здесь все предельно просто:

# SQL-выражение
df.filter('age > 25')
# Точечная нотация
df.filter(df.age > 25)
"""
+---------+-----+------+------+---+
|     lang| name|  city|salary|age|
+---------+-----+------+------+---+
|[cpp, py]|Boris|  Omsk|    55| 37|
| [py, cl]| Vera|Moscow|    89| 41|
+---------+-----+------+------+---+
"""

Для того чтобы получить записи, входящие в заданный интервал, то нужно всего лишь добавить логическое И (AND), ИЛИ (OR), как это делается в SQL. А вот для точеной нотации придется использовать побитовые логические операции: & (И), | (ИЛИ), а подвыражения вставлять в скобки. Код на Python:

df.filter('age > 25 AND salary >= 60')
df.filter((df.age > 25) & (df.salary >= 60))
"""
+--------+----+------+------+---+
|    lang|name|  city|salary|age|
+--------+----+------+------+---+
|[py, cl]|Vera|Moscow|    89| 41|
+--------+----+------+------+---+
"""

Добавим также, в Python “равно” и “не равно” выражаются в виде == и != соответственно. В SQL часто применяются паскалевская нотация: = (равно) и <> (не равно). Так вот в SQL-выражениях можно использовать и в стиле Python, и в стиле Pascal:

# С точкой только == или !=
df.filter(
    ((df.age == 25) & (df.salary != 55)) | 
    ((df.age == 37) & (df.salary != 40))
)
df.filter('(age == 25 AND salary != 55) OR (age = 37 AND salary <> 40)').show()
"""
+-------------+-----+------+------+---+
|         lang| name|  city|salary|age|
+-------------+-----+------+------+---+
|[c, java, hs]| Anna|  Omsk|    27| 25|
|        [cpp]| Alex|Moscow|    32| 25|
|    [cpp, py]|Boris|  Omsk|    55| 37|
+-------------+-----+------+------+---+
"""

Еще можно использовать отрицание. В точечной нотации оно будет обозначаться через ~. В SQL-выражениях — через NOT. Хотя использовать его с числовыми значениями вызовет путаницу, но иметь в виду стоит.

df.filter('NOT age = 25 AND salary > 55')
df.filter(~(df.age = 25) & (df.salary > 55))

Фильтрация строковых значений

Со строковыми значениями можно использовать ту же операцию сравнения (==, != или =, <> в SQL-выражениях):

df.filter((df.city == 'Moscow') & (df.name != 'Alex'))
df.filter('city == "Moscow" AND name != "Alex"')
"""
+------------+-----+------+------+---+
|        lang| name|  city|salary|age|
+------------+-----+------+------+---+
|[py, c, cpp]|Anton|Moscow|    23| 19|
|    [py, cl]|Andry|Moscow|    24| 22|
|    [py, cl]| Vera|Moscow|    89| 41|
+------------+-----+------+------+---+
"""

Однако нужно понимать, что значения должны обязательно попадать в условия. В Spark можно найти записи, которые начинаются, заканчиваются заданными символами или их содержит. В точечной нотации для этого используются методы startswith, endswith и contains.

df.filter((df.name.startswith('An')) & (df.city.contains('sc')))
"""
+------------+-----+------+------+---+
|        lang| name|  city|salary|age|
+------------+-----+------+------+---+
|[py, c, cpp]|Anton|Moscow|    23| 19|
|    [py, cl]|Andry|Moscow|    24| 22|
+------------+-----+------+------+---+
"""

А вот в SQL-выражениях придется использовать LIKE и RLIKE (Regex LIKE). При использовании LIKE знак % обозначает сколько угодно символов, знак _ — один любой символ. С их помощью можно найти записи, которые начинаются каким-то символом, им заканчивается и/или содержит символы. В точечной нотации также есть методы like и rlike. Код выше может быть переписан так:

df.filter((df.name.like('An%')) & (df.city.like('%sc%')))
df.filter('name LIKE "An%" AND city LIKE "%sc%"')

С другой стороны, RLIKE или метод rlike позволяет использовать регулярные выражения. У нас даже имеется отдельная статья по регулярным выражениям. Отметим только, что нужно в начале выражения указывать знак ^, обозначающее начало текста (например, RLIKE('^*')).

Входит ли значение в заданный список

Возможно вам нужно проверить входит ли значения в заданный список. Например, нужно найти только всех Анн и Борисов. Тогда вам понадобится оператор IN или метод isin:

df.filter(df.name.isin(['Anna', 'Boris']))
df.filter('name in ("Anna", "Boris")')
"""
+-------------+-----+----+------+---+
|         lang| name|city|salary|age|
+-------------+-----+----+------+---+
|[c, java, hs]| Anna|Omsk|    27| 25|
|    [cpp, py]|Boris|Omsk|    55| 37|
+-------------+-----+----+------+---+
"""

Есть также операторы IS NULL, IS NOT NULL и методы isNull и isNotNull, предназначенные для проверки на отсутствующие значения.

Фильтруем массивы

Графовые алгоритмы в Apache Spark

Код курса
GRAS
Ближайшая дата курса
20 июня, 2022
Длительность обучения
16 ак.часов
Стоимость обучения
40 000 руб.

Для проверки того, входит ли элемент в массив, используется функция ARRAY_CONTAINS. Отдельного метода под нее нет. Поэтому она может содержаться в SQL-выражении, либо в виде импортированной функции из pyspark.sql.functions.

df.filter('ARRAY_CONTAINS(lang, "py")').show()
"""
+------------+-----+------+------+---+
|        lang| name|  city|salary|age|
+------------+-----+------+------+---+
|[py, c, cpp]|Anton|Moscow|    23| 19|
|    [py, cl]|Andry|Moscow|    24| 22|
|   [cpp, py]|Boris|  Omsk|    55| 37|
|    [py, cl]| Vera|Moscow|    89| 41|
+------------+-----+------+------+---+
"""

Выбор значения в зависимости от условия (аналог CASE WHEN)

В прошлой статье мы использовали запрос с оператором CASE WHEN, который на основе заданного условия, выдает тот или иной результат. Данный оператор вносит модифицирующее действие: создается новый столбец с результатами. Итак, в PySpark для этого используется цепочка методов when, а ELSE реализуется через otherwise. Данная конструкция менее читаемая, чем полноценный запрос с CASE WHEN. Вот так он выглядит:

case_when = (
    F.when(F.expr('ARRAY_CONTAINS(lang, "c") OR ARRAY_CONTAINS(lang, "cpp")'), 'C/C++')
     .when(F.expr('ARRAY_CONTAINS(lang, "py")'), 'Python')
     .otherwise('uknown')
) 
df.withColumn('FullLang', case_when)
"""
+----+-----+------+------+--------+
|lang| name|  city|salary|FullLang|
+----+-----+------+------+--------+
|  py|Anton|Moscow|    23|  Python|
|   c| Anna|  Omsk|    27|   C/C++|
|  py|Andry|Moscow|    24|  Python|
| cpp| Alex|Moscow|    32|   C/C++|
| cpp|Boris|  Omsk|    55|   C/C++|
|  py| Vera|Moscow|    89|  Python|
+----+-----+------+------+--------+
"""

 

О том, как использовать Apache Spark для фильтрации и анализа данных вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:

Записаться на курс

Смотреть раcписание

Добавить комментарий

Поиск по сайту