5 способов вычисления SQL-выражений в Spark

Если вы знаете SQL, но еще не освоились с фреймворком Apache Spark, то вы можете выполнять запросы различными способами. В этой статье вы узнаете, как писать SQL-выражения в PySpark, какие способы выполнения запросов существуют, как конкатенировать строки, фильтровать данные и работать с датами.

Способы выполнения в SQL-запросов в Apache Spark

Выполнять SQL-запросы можно выполнять различным образом, но все они в итоге дают тот же результат. Причем только один из них является полноценным запросом, все остальные является вызов функции, принимающей SQL-выражение. Вот некоторые из них:

  • напрямую через spark.sql("QUERY");
  • выборка столбцов через метод select в связке с функцией expr из pyspark.sql.functions;
  • выборка столбцов через метод selectExpr;
  • добавление результата вычисления SQL-выражения к исходной таблице через метод withColumn;
  • фильтрация данных через методы where или filter в связке с функцией expr из pyspark.sql.functions;.

Допустим, имеется следующий DataFrame:

df = spark.createDataFrame([
    ('py',   'Anton', 'Moscow',  23),
    ('c',    'Anna',  'Omsk',    27),
    ('py',   'Andry', 'Moscow',  24),
    ('cpp',  'Alex',  'Moscow',  32),
    ('cpp',  'Boris', 'Omsk',    55),
    ('py',   'Vera',  'Moscow',  89), ],
    ['lang', 'name',  'city',   'salary'])

Тогда чтобы напрямую выполнить SQL-запрос (1-способ), нужно создать так называемое представление (view). Это делается для того чтобы Spark знал о созданной таблице. Представление создается вызовом метода, в который нужно передать желаемое имя таблицы:

df.createOrReplaceTempView("prog_info")

Теперь можно выполнять запросы, так же, как это делается при работе с базами данных:

spark.sql("select lang from prog_info").show()
"""
+----+
|lang|
+----+
|  py|
|   c|
|  py|
| cpp|
| cpp|
|  py|
+----+
"""

Этот способ наиболее универсален, поскольку не нужно знать о различных функциях, а просто нужно дать необходимый запрос.

Второй способ заключается в использовании метода select в связке с функцией expr. Она уже не требует создания представления. Однако она применяется к конкретному DataFrame, поэтому операторы SELECT FROM должны быть опущены. В этом случае следует говорить о SQL-выражении, ведь не является полным запросом. Например, вот так выглядит тот же самый запрос, что и выше, но только с использованием функции expr:

# На самом деле именно для этого случая, `expr` может быть опушен,
# но далее вы увидите, что его обязательно нужно использовать
import pyspark.sql.functions as F
df.select(F.expr('lang')).show()
"""
+----+
|lang|
+----+
|  py|
|   c|
|  py|
| cpp|
| cpp|
|  py|
+----+
"""

Далее следуют метод selectExpr. Второй метод точно такой же, как и select, но принимает на вход SQL-выражения, поэтому не нужно отдельно вызывать функцию expr.

df.selectExpr('lang').show()
# Или что то же самое:
df.select('lang').show()
+----+
|lang|
+----+
|  py|
|   c|
|  py|
| cpp|
| cpp|
|  py|
+----+

Различие заключается в том, что в select передаются именно столбцы, поэтому такая запись неправильная:

# Ошибка
selectExpr(df.lang).show()
# А вот так можно:
select(df.lang).show()

Самый последний способ использования SQL-выражений — это метод withColumn в связке с функцией expr. В отличие от предыдущих он добавляет к имеющейся таблице новый столбец, полученный на основе SQL-выражения. Его можно рассматривать как оператор SELECT * FROM.

Код курса
SPARK
Ближайшая дата курса
по запросу
Продолжительность
ак.часов
Стоимость обучения
0 руб.

Конечно, на примере операции выборки столбца всю силу SQL-выражений не показать, поэтому рассмотрим более интересные операции, включая фильтрацию данных, о которой мы еще не поговорили.

Конкатенация строк с использованием ||

Самый простой способ объединения нескольких строк в одну — это использование SQL-выражения со знаком ||. Такая операция называется конкатенация строк. Итак, чтобы конкатенировать строки используйте один из вышеперечисленных способов.

spark.sql('SELECT name || "_" || lang AS name_lang FROM lang_info').show()
df.select(F.expr('name || "_" || lang AS name_lang')).show()
df.selectExpr('name || "_" || lang AS name_lang').show()
"""
+---------+
|name_lang|
+---------+
| Anton_py|
|   Anna_c|
| Andry_py|
| Alex_cpp|
|Boris_cpp|
|  Vera_py|
+---------+
"""
# Возвращает всю таблицу
df.withColumn('name_lang', F.expr('name || "_" || lang')).show()
"""
+----+-----+------+------+---------+
|lang| name|  city|salary|name_lang|
+----+-----+------+------+---------+
|  py|Anton|Moscow|    23| Anton_py|
|   c| Anna|  Omsk|    27|   Anna_c|
|  py|Andry|Moscow|    24| Andry_py|
| cpp| Alex|Moscow|    32| Alex_cpp|
| cpp|Boris|  Omsk|    55|Boris_cpp|
|  py| Vera|Moscow|    89|  Vera_py|
+----+-----+------+------+---------+
"""

Фильтрация данных в PySpark

Для фильтрации существуют методы filter и where (это синонимы, они делают то же самое). Фильтрация данных может быть выражена двумя способами: в виде SQL-выражения или так, как это делается в библиотеке Pandas. Например, найдем те записи, которые содержат одинаковые значения.

# В виде SQL-выражения
df.filter('salary > 30 AND city like "Moscow"').show()
# Как в Pandas:
df.filter((df.salary > 30) & (df.city == 'Moscow')).show()
"""
+----+----+------+------+
|lang|name|  city|salary|
+----+----+------+------+
| cpp|Alex|Moscow|    32|
|  py|Vera|Moscow|    89|
+----+----+------+------+
"""

При использовании Pandas-стиля выражения заключается в скобки; если требуется произвести логические операции между ними, то ставится знак & (AND, побитовое И) или | (OR, побитовое ИЛИ). В нашем примере мы использовали знак И. Также стоит отметить, что его использование ограничено использованием имен с латинскими буквами и цифрами (как например, обратиться к имени столбца, содержащий пробел).

Также можно использовать обычный SQL-запрос в spark.sql.

spark.sql('SELECT * FROM lang_info WHERE salary > 30 AND city like "Moscow"')

Использование CASE WHEN в PySpark

В PySpark эквивалентом оператор CASE WHEN является использование функций when().otherwise(). Он необходим для выбора того или иного результата в зависимости от выполнения условия. Аналогичный оператор можно встретить в Python, который состоит if\elif\eles. Синтаксис у оператора `CASE WHEN в SQL такой:

CASE
    WHEN condition1 THEN result1
    WHEN condition2 THEN result2
    WHEN conditionN THEN resultN
    ELSE result
END;

Допустим из неполных названий языков мы хотим получить полные. В этом случае можно поступить таким образом:

sql = """
CASE
    WHEN (lang LIKE 'c') OR (lang LIKE 'cpp') THEN 'C/C++'
    WHEN (lang LIKE 'py') THEN 'Python'
    ELSE 'unknown'
END"""

df.withColumn('FullLang', F.expr(sql)).show()
"""
+----+-----+------+------+--------+
|lang| name|  city|salary|FullLang|
+----+-----+------+------+--------+
|  py|Anton|Moscow|    23|  Python|
|   c| Anna|  Omsk|    27|   C/C++|
|  py|Andry|Moscow|    24|  Python|
| cpp| Alex|Moscow|    32|   C/C++|
| cpp|Boris|  Omsk|    55|   C/C++|
|  py| Vera|Moscow|    89|  Python|
+----+-----+------+------+--------+
"""

Опять же мы могли бы вызвать select, но получили бы только один столбец, также в этом случае следует добавить оператор AS или метод alias, иначе название столбца будет состоять из всего оператора CASE WHEN. Например, так:

df.select(F.expr(sql).alias('FullLang')).show()
"""
+--------+
|FullLang|
+--------+
|  Python|
|   C/C++|
|  Python|
|   C/C++|
|   C/C++|
|  Python|
+--------+
"""

Либо, как уже говорили, и вовсе сделать с помощью функций when-otherwise. Советуем при этом соблюдать структурные отступы для читабельности кода:

case_when = (
    F.when(F.expr('lang LIKE "c" OR lang LIKE "cpp"'), 'C/C++')
     .when(F.expr('lang LIKE "py"'), 'Python')
     .otherwise('uknown')
) 
df.withColumn('FullLang', case_when)

Работаем с датой и временем

Одними только числовыми и строковыми не отделаешься, иногда приходится работать с датами и временем. Для них тоже имеются различные функции, похожие на те, которые встречаются в SQL. Например, вот так можно найти разницу между датами в днях в Spark:

df2 = spark.createDataFrame([
    ('2020-01-01', '2022-10-25'),
    ('2021-05-12', '2022-05-12'),],
    ['before', 'after'])
df2.withColumn('diff', F.expr('DATEDIFF(after, before)')).show()
"""
+----------+----------+----+
|    before|     after|diff|
+----------+----------+----+
|2020-01-01|2022-10-25|1028|
|2021-05-12|2022-05-12| 365|
+----------+----------+----+
"""

Заметим, что мы могли бы не писать так, как это делается в SQL, а могли бы использовать функции из pyspark.sql.functions. Более того, это хороший способ узнать, есть ли такая-то функция, заглянув в документацию. Однако полностью полагаться на документацию не стоит, так как есть еще незадокументированные функции, которые нет в языке SQL, но могут в некоторых случаях пригодится(например, stack — обратная операция функции pivot, см. [5]).

 

Еще больше подробностей об SQL-операциях в PySpark вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:

Записаться на курс

Смотреть раcписание

Источники
  1. expr
  2. select
  3. selectExpr
  4. withColumn
  5. Как развернуть таблицу

Добавить комментарий

Поиск по сайту