Если вы знаете SQL, но еще не освоились с фреймворком Apache Spark, то вы можете выполнять запросы различными способами. В этой статье вы узнаете, как писать SQL-выражения в PySpark, какие способы выполнения запросов существуют, как конкатенировать строки, фильтровать данные и работать с датами.
Способы выполнения в SQL-запросов в Apache Spark
Выполнять SQL-запросы можно выполнять различным образом, но все они в итоге дают тот же результат. Причем только один из них является полноценным запросом, все остальные является вызов функции, принимающей SQL-выражение. Вот некоторые из них:
- напрямую через
spark.sql("QUERY"); - выборка столбцов через метод
selectв связке с функциейexprизpyspark.sql.functions; - выборка столбцов через метод
selectExpr; - добавление результата вычисления SQL-выражения к исходной таблице через метод
withColumn; - фильтрация данных через методы
whereилиfilterв связке с функциейexprизpyspark.sql.functions;.
Допустим, имеется следующий DataFrame:
df = spark.createDataFrame([
('py', 'Anton', 'Moscow', 23),
('c', 'Anna', 'Omsk', 27),
('py', 'Andry', 'Moscow', 24),
('cpp', 'Alex', 'Moscow', 32),
('cpp', 'Boris', 'Omsk', 55),
('py', 'Vera', 'Moscow', 89), ],
['lang', 'name', 'city', 'salary'])
Тогда чтобы напрямую выполнить SQL-запрос (1-способ), нужно создать так называемое представление (view). Это делается для того чтобы Spark знал о созданной таблице. Представление создается вызовом метода, в который нужно передать желаемое имя таблицы:
df.createOrReplaceTempView("prog_info")
Теперь можно выполнять запросы, так же, как это делается при работе с базами данных:
spark.sql("select lang from prog_info").show()
"""
+----+
|lang|
+----+
| py|
| c|
| py|
| cpp|
| cpp|
| py|
+----+
"""
Этот способ наиболее универсален, поскольку не нужно знать о различных функциях, а просто нужно дать необходимый запрос.
Второй способ заключается в использовании метода select в связке с функцией expr. Она уже не требует создания представления. Однако она применяется к конкретному DataFrame, поэтому операторы SELECT FROM должны быть опущены. В этом случае следует говорить о SQL-выражении, ведь не является полным запросом. Например, вот так выглядит тот же самый запрос, что и выше, но только с использованием функции expr:
# На самом деле именно для этого случая, `expr` может быть опушен,
# но далее вы увидите, что его обязательно нужно использовать
import pyspark.sql.functions as F
df.select(F.expr('lang')).show()
"""
+----+
|lang|
+----+
| py|
| c|
| py|
| cpp|
| cpp|
| py|
+----+
"""
Далее следуют метод selectExpr. Второй метод точно такой же, как и select, но принимает на вход SQL-выражения, поэтому не нужно отдельно вызывать функцию expr.
df.selectExpr('lang').show()
# Или что то же самое:
df.select('lang').show()
+----+
|lang|
+----+
| py|
| c|
| py|
| cpp|
| cpp|
| py|
+----+
Различие заключается в том, что в select передаются именно столбцы, поэтому такая запись неправильная:
# Ошибка selectExpr(df.lang).show() # А вот так можно: select(df.lang).show()
Самый последний способ использования SQL-выражений — это метод withColumn в связке с функцией expr. В отличие от предыдущих он добавляет к имеющейся таблице новый столбец, полученный на основе SQL-выражения. Его можно рассматривать как оператор SELECT * FROM.
Анализ данных с помощью современного Apache Spark
Код курса
SPARK
Ближайшая дата курса
1 декабря, 2025
Продолжительность
32 ак.часов
Стоимость обучения
96 000 руб.
Конечно, на примере операции выборки столбца всю силу SQL-выражений не показать, поэтому рассмотрим более интересные операции, включая фильтрацию данных, о которой мы еще не поговорили.
Конкатенация строк с использованием ||
Самый простой способ объединения нескольких строк в одну — это использование SQL-выражения со знаком ||. Такая операция называется конкатенация строк. Итак, чтобы конкатенировать строки используйте один из вышеперечисленных способов.
spark.sql('SELECT name || "_" || lang AS name_lang FROM lang_info').show()
df.select(F.expr('name || "_" || lang AS name_lang')).show()
df.selectExpr('name || "_" || lang AS name_lang').show()
"""
+---------+
|name_lang|
+---------+
| Anton_py|
| Anna_c|
| Andry_py|
| Alex_cpp|
|Boris_cpp|
| Vera_py|
+---------+
"""
# Возвращает всю таблицу
df.withColumn('name_lang', F.expr('name || "_" || lang')).show()
"""
+----+-----+------+------+---------+
|lang| name| city|salary|name_lang|
+----+-----+------+------+---------+
| py|Anton|Moscow| 23| Anton_py|
| c| Anna| Omsk| 27| Anna_c|
| py|Andry|Moscow| 24| Andry_py|
| cpp| Alex|Moscow| 32| Alex_cpp|
| cpp|Boris| Omsk| 55|Boris_cpp|
| py| Vera|Moscow| 89| Vera_py|
+----+-----+------+------+---------+
"""
Фильтрация данных в PySpark
Для фильтрации существуют методы filter и where (это синонимы, они делают то же самое). Фильтрация данных может быть выражена двумя способами: в виде SQL-выражения или так, как это делается в библиотеке Pandas. Например, найдем те записи, которые содержат одинаковые значения.
# В виде SQL-выражения
df.filter('salary > 30 AND city like "Moscow"').show()
# Как в Pandas:
df.filter((df.salary > 30) & (df.city == 'Moscow')).show()
"""
+----+----+------+------+
|lang|name| city|salary|
+----+----+------+------+
| cpp|Alex|Moscow| 32|
| py|Vera|Moscow| 89|
+----+----+------+------+
"""
При использовании Pandas-стиля выражения заключается в скобки; если требуется произвести логические операции между ними, то ставится знак & (AND, побитовое И) или | (OR, побитовое ИЛИ). В нашем примере мы использовали знак И. Также стоит отметить, что его использование ограничено использованием имен с латинскими буквами и цифрами (как например, обратиться к имени столбца, содержащий пробел).
Также можно использовать обычный SQL-запрос в spark.sql.
spark.sql('SELECT * FROM lang_info WHERE salary > 30 AND city like "Moscow"')
Использование CASE WHEN в PySpark
В PySpark эквивалентом оператор CASE WHEN является использование функций when().otherwise(). Он необходим для выбора того или иного результата в зависимости от выполнения условия. Аналогичный оператор можно встретить в Python, который состоит if\elif\eles. Синтаксис у оператора `CASE WHEN в SQL такой:
CASE
WHEN condition1 THEN result1
WHEN condition2 THEN result2
WHEN conditionN THEN resultN
ELSE result
END;
Допустим из неполных названий языков мы хотим получить полные. В этом случае можно поступить таким образом:
sql = """
CASE
WHEN (lang LIKE 'c') OR (lang LIKE 'cpp') THEN 'C/C++'
WHEN (lang LIKE 'py') THEN 'Python'
ELSE 'unknown'
END"""
df.withColumn('FullLang', F.expr(sql)).show()
"""
+----+-----+------+------+--------+
|lang| name| city|salary|FullLang|
+----+-----+------+------+--------+
| py|Anton|Moscow| 23| Python|
| c| Anna| Omsk| 27| C/C++|
| py|Andry|Moscow| 24| Python|
| cpp| Alex|Moscow| 32| C/C++|
| cpp|Boris| Omsk| 55| C/C++|
| py| Vera|Moscow| 89| Python|
+----+-----+------+------+--------+
"""
Опять же мы могли бы вызвать select, но получили бы только один столбец, также в этом случае следует добавить оператор AS или метод alias, иначе название столбца будет состоять из всего оператора CASE WHEN. Например, так:
df.select(F.expr(sql).alias('FullLang')).show()
"""
+--------+
|FullLang|
+--------+
| Python|
| C/C++|
| Python|
| C/C++|
| C/C++|
| Python|
+--------+
"""
Либо, как уже говорили, и вовсе сделать с помощью функций when-otherwise. Советуем при этом соблюдать структурные отступы для читабельности кода:
case_when = (
F.when(F.expr('lang LIKE "c" OR lang LIKE "cpp"'), 'C/C++')
.when(F.expr('lang LIKE "py"'), 'Python')
.otherwise('uknown')
)
df.withColumn('FullLang', case_when)
Работаем с датой и временем
Одними только числовыми и строковыми не отделаешься, иногда приходится работать с датами и временем. Для них тоже имеются различные функции, похожие на те, которые встречаются в SQL. Например, вот так можно найти разницу между датами в днях в Spark:
df2 = spark.createDataFrame([
('2020-01-01', '2022-10-25'),
('2021-05-12', '2022-05-12'),],
['before', 'after'])
df2.withColumn('diff', F.expr('DATEDIFF(after, before)')).show()
"""
+----------+----------+----+
| before| after|diff|
+----------+----------+----+
|2020-01-01|2022-10-25|1028|
|2021-05-12|2022-05-12| 365|
+----------+----------+----+
"""
Заметим, что мы могли бы не писать так, как это делается в SQL, а могли бы использовать функции из pyspark.sql.functions. Более того, это хороший способ узнать, есть ли такая-то функция, заглянув в документацию. Однако полностью полагаться на документацию не стоит, так как есть еще незадокументированные функции, которые нет в языке SQL, но могут в некоторых случаях пригодится(например, stack — обратная операция функции pivot, см. [5]).
Еще больше подробностей об SQL-операциях в PySpark вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:



