Если вы знаете SQL, но еще не освоились с фреймворком Apache Spark, то вы можете выполнять запросы различными способами. В этой статье вы узнаете, как писать SQL-выражения в PySpark, какие способы выполнения запросов существуют, как конкатенировать строки, фильтровать данные и работать с датами.
Способы выполнения в SQL-запросов в Apache Spark
Выполнять SQL-запросы можно выполнять различным образом, но все они в итоге дают тот же результат. Причем только один из них является полноценным запросом, все остальные является вызов функции, принимающей SQL-выражение. Вот некоторые из них:
- напрямую через
spark.sql("QUERY")
; - выборка столбцов через метод
select
в связке с функциейexpr
изpyspark.sql.functions
; - выборка столбцов через метод
selectExpr
; - добавление результата вычисления SQL-выражения к исходной таблице через метод
withColumn
; - фильтрация данных через методы
where
илиfilter
в связке с функциейexpr
изpyspark.sql.functions
;.
Допустим, имеется следующий DataFrame:
df = spark.createDataFrame([ ('py', 'Anton', 'Moscow', 23), ('c', 'Anna', 'Omsk', 27), ('py', 'Andry', 'Moscow', 24), ('cpp', 'Alex', 'Moscow', 32), ('cpp', 'Boris', 'Omsk', 55), ('py', 'Vera', 'Moscow', 89), ], ['lang', 'name', 'city', 'salary'])
Тогда чтобы напрямую выполнить SQL-запрос (1-способ), нужно создать так называемое представление (view). Это делается для того чтобы Spark знал о созданной таблице. Представление создается вызовом метода, в который нужно передать желаемое имя таблицы:
df.createOrReplaceTempView("prog_info")
Теперь можно выполнять запросы, так же, как это делается при работе с базами данных:
spark.sql("select lang from prog_info").show() """ +----+ |lang| +----+ | py| | c| | py| | cpp| | cpp| | py| +----+ """
Этот способ наиболее универсален, поскольку не нужно знать о различных функциях, а просто нужно дать необходимый запрос.
Второй способ заключается в использовании метода select
в связке с функцией expr
. Она уже не требует создания представления. Однако она применяется к конкретному DataFrame, поэтому операторы SELECT FROM
должны быть опущены. В этом случае следует говорить о SQL-выражении, ведь не является полным запросом. Например, вот так выглядит тот же самый запрос, что и выше, но только с использованием функции expr
:
# На самом деле именно для этого случая, `expr` может быть опушен, # но далее вы увидите, что его обязательно нужно использовать import pyspark.sql.functions as F df.select(F.expr('lang')).show() """ +----+ |lang| +----+ | py| | c| | py| | cpp| | cpp| | py| +----+ """
Далее следуют метод selectExpr
. Второй метод точно такой же, как и select
, но принимает на вход SQL-выражения, поэтому не нужно отдельно вызывать функцию expr
.
df.selectExpr('lang').show() # Или что то же самое: df.select('lang').show() +----+ |lang| +----+ | py| | c| | py| | cpp| | cpp| | py| +----+
Различие заключается в том, что в select
передаются именно столбцы, поэтому такая запись неправильная:
# Ошибка selectExpr(df.lang).show() # А вот так можно: select(df.lang).show()
Самый последний способ использования SQL-выражений — это метод withColumn
в связке с функцией expr
. В отличие от предыдущих он добавляет к имеющейся таблице новый столбец, полученный на основе SQL-выражения. Его можно рассматривать как оператор SELECT * FROM
.
Код курса
SPARK
Ближайшая дата курса
по запросу
Продолжительность
ак.часов
Стоимость обучения
0 руб.
Конечно, на примере операции выборки столбца всю силу SQL-выражений не показать, поэтому рассмотрим более интересные операции, включая фильтрацию данных, о которой мы еще не поговорили.
Конкатенация строк с использованием ||
Самый простой способ объединения нескольких строк в одну — это использование SQL-выражения со знаком ||
. Такая операция называется конкатенация строк. Итак, чтобы конкатенировать строки используйте один из вышеперечисленных способов.
spark.sql('SELECT name || "_" || lang AS name_lang FROM lang_info').show() df.select(F.expr('name || "_" || lang AS name_lang')).show() df.selectExpr('name || "_" || lang AS name_lang').show() """ +---------+ |name_lang| +---------+ | Anton_py| | Anna_c| | Andry_py| | Alex_cpp| |Boris_cpp| | Vera_py| +---------+ """ # Возвращает всю таблицу df.withColumn('name_lang', F.expr('name || "_" || lang')).show() """ +----+-----+------+------+---------+ |lang| name| city|salary|name_lang| +----+-----+------+------+---------+ | py|Anton|Moscow| 23| Anton_py| | c| Anna| Omsk| 27| Anna_c| | py|Andry|Moscow| 24| Andry_py| | cpp| Alex|Moscow| 32| Alex_cpp| | cpp|Boris| Omsk| 55|Boris_cpp| | py| Vera|Moscow| 89| Vera_py| +----+-----+------+------+---------+ """
Фильтрация данных в PySpark
Для фильтрации существуют методы filter
и where
(это синонимы, они делают то же самое). Фильтрация данных может быть выражена двумя способами: в виде SQL-выражения или так, как это делается в библиотеке Pandas. Например, найдем те записи, которые содержат одинаковые значения.
# В виде SQL-выражения df.filter('salary > 30 AND city like "Moscow"').show() # Как в Pandas: df.filter((df.salary > 30) & (df.city == 'Moscow')).show() """ +----+----+------+------+ |lang|name| city|salary| +----+----+------+------+ | cpp|Alex|Moscow| 32| | py|Vera|Moscow| 89| +----+----+------+------+ """
При использовании Pandas-стиля выражения заключается в скобки; если требуется произвести логические операции между ними, то ставится знак &
(AND, побитовое И) или |
(OR, побитовое ИЛИ). В нашем примере мы использовали знак И. Также стоит отметить, что его использование ограничено использованием имен с латинскими буквами и цифрами (как например, обратиться к имени столбца, содержащий пробел).
Также можно использовать обычный SQL-запрос в spark.sql
.
spark.sql('SELECT * FROM lang_info WHERE salary > 30 AND city like "Moscow"')
Использование CASE WHEN в PySpark
В PySpark эквивалентом оператор CASE WHEN
является использование функций when().otherwise()
. Он необходим для выбора того или иного результата в зависимости от выполнения условия. Аналогичный оператор можно встретить в Python, который состоит if\elif\eles
. Синтаксис у оператора `CASE WHEN
в SQL такой:
CASE WHEN condition1 THEN result1 WHEN condition2 THEN result2 WHEN conditionN THEN resultN ELSE result END;
Допустим из неполных названий языков мы хотим получить полные. В этом случае можно поступить таким образом:
sql = """ CASE WHEN (lang LIKE 'c') OR (lang LIKE 'cpp') THEN 'C/C++' WHEN (lang LIKE 'py') THEN 'Python' ELSE 'unknown' END""" df.withColumn('FullLang', F.expr(sql)).show() """ +----+-----+------+------+--------+ |lang| name| city|salary|FullLang| +----+-----+------+------+--------+ | py|Anton|Moscow| 23| Python| | c| Anna| Omsk| 27| C/C++| | py|Andry|Moscow| 24| Python| | cpp| Alex|Moscow| 32| C/C++| | cpp|Boris| Omsk| 55| C/C++| | py| Vera|Moscow| 89| Python| +----+-----+------+------+--------+ """
Опять же мы могли бы вызвать select
, но получили бы только один столбец, также в этом случае следует добавить оператор AS
или метод alias
, иначе название столбца будет состоять из всего оператора CASE WHEN
. Например, так:
df.select(F.expr(sql).alias('FullLang')).show() """ +--------+ |FullLang| +--------+ | Python| | C/C++| | Python| | C/C++| | C/C++| | Python| +--------+ """
Либо, как уже говорили, и вовсе сделать с помощью функций when-otherwise
. Советуем при этом соблюдать структурные отступы для читабельности кода:
case_when = ( F.when(F.expr('lang LIKE "c" OR lang LIKE "cpp"'), 'C/C++') .when(F.expr('lang LIKE "py"'), 'Python') .otherwise('uknown') ) df.withColumn('FullLang', case_when)
Работаем с датой и временем
Одними только числовыми и строковыми не отделаешься, иногда приходится работать с датами и временем. Для них тоже имеются различные функции, похожие на те, которые встречаются в SQL. Например, вот так можно найти разницу между датами в днях в Spark:
df2 = spark.createDataFrame([ ('2020-01-01', '2022-10-25'), ('2021-05-12', '2022-05-12'),], ['before', 'after']) df2.withColumn('diff', F.expr('DATEDIFF(after, before)')).show() """ +----------+----------+----+ | before| after|diff| +----------+----------+----+ |2020-01-01|2022-10-25|1028| |2021-05-12|2022-05-12| 365| +----------+----------+----+ """
Заметим, что мы могли бы не писать так, как это делается в SQL, а могли бы использовать функции из pyspark.sql.functions
. Более того, это хороший способ узнать, есть ли такая-то функция, заглянув в документацию. Однако полностью полагаться на документацию не стоит, так как есть еще незадокументированные функции, которые нет в языке SQL, но могут в некоторых случаях пригодится(например, stack
— обратная операция функции pivot
, см. [5]).
Еще больше подробностей об SQL-операциях в PySpark вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве: