Управление массивами в Apache Spark

Массивы являются базовыми структурами данных, поэтому ожидаемо, что в Apache Spark есть поддержка базовых операция для работы с ними. В этой статье мы рассмотрим функции из Spark SQL: создание массивов из столбцов DataFrame, проверка на входимость элемента в массив, сортировка и размерность массивов.

Создание массивов из DataFrame

Чтобы выделить значения столбцов из DataFrame, используйте функцию array. Просто передайте в неё имена этих столбцов.

import pyspark.sql.functions as F 

df = spark.createDataFrame([
    (1, 'Anton', 23),
    (2, 'Anna', 27),
    (3, 'Andry', 24),
    (4, 'Alex', 32),
    (5, 'Boris', 55),
    (6, 'Vera', 64), ],
    ['id', 'name', 'age'])

df.select(F.array('age', 'name'))
# Либо так:
# df.select(F.array(df.age, df.name))

Результат вызова show:

+----------------+
|array(age, name)|
+----------------+
|     [23, Anton]|
|      [27, Anna]|
|     [24, Andry]|
|      [32, Alex]|
|     [55, Boris]|
|      [64, Vera]|
+----------------+

Название результирующего столбца можно изменить с помощью alias:

df.select(F.array('age', 'name').alias('res'))

# Результат show:
+-----------+
|        res|
+-----------+
|[23, Anton]|
| [27, Anna]|
|[24, Andry]|
| [32, Alex]|
|[55, Boris]|
| [64, Vera]|
+-----------+

Нечто похожее делает VectorAssembler из модуля ML Spark. Различие заключается в том, что он приводит в массив только числовые значения и возвращает исходный DataFrame с присоединенным результирующим столбцом. Функция array, с другой стороны работает с любыми типами (например, со строками) и возвращает отдельный столбец (тип Column). О использовании VectorAssembler можете узнать тут.

Потоковая обработка в Apache Spark

Код курса
SPOT
Ближайшая дата курса
16 мая, 2024
Продолжительность
16 ак.часов
Стоимость обучения
48 000 руб.

Входит ли элемент в массив

VectorAssembler после преобразования выдает исходные столбцы + столбец с массивом. Мы можем присоединить столбец к DataFrame с помощью метода withColumn; пример в Apache Spark:

df.withColumn('array', F.array('age', 'name'))

# Результат show:
+---+-----+---+-----------+
| id| name|age|      array|
+---+-----+---+-----------+
|  1|Anton| 23|[23, Anton]|
|  2| Anna| 27| [27, Anna]|
|  3|Andry| 24|[24, Andry]|
|  4| Alex| 32| [32, Alex]|
|  5|Boris| 55|[55, Boris]|
|  6| Vera| 64| [64, Vera]|
+---+-----+---+-----------+

Для фильтрации массивов, а именно для нахождения тех строковых записей, которые содержат нужный элемент, используется функция array_contains. Здесь важно заметить, что проверяются именно строковые значения. Функция принимает два аргумента: название столбца (или сам столбец) и целевое значение. Возвращается столбец с флагами true или false.

df1 = df.select(F.array('age', 'name').alias('res'))
df1.select(F.array_contains('res', 'Anton'))

# Результат show:
+--------------------------+
|array_contains(res, Anton)|
+--------------------------+
|                      true|
|                     false|
|                     false|
|                     false|
|                     false|
|                     false|
+--------------------------+

Разворачивание, сортировка и размерность массива в Apache Spark

В Apache Spark есть функция explode, которая наоборот разворачивает столбец с массивом таким образом, что каждый элемент становится отдельной записью.

a = df.withColumn('array', F.array('age', 'name'))
a.select(F.explode('array')).show()

+-----+
| col |
+-----+
|   23|
|Anton|
|   27|
| Anna|
|   24|
|Andry|
|   32|
| Alex|
|   55|
|Boris|
|   64|
| Vera|
+-----+

Core Spark - основы для разработчиков

Код курса
CORS
Ближайшая дата курса
13 мая, 2024
Продолжительность
16 ак.часов
Стоимость обучения
48 000 руб.

Ещё массив можно сортировать с помощью функции sort_array. Функция сортирует не отдельно массивы, а сам столбец с массивами. Следовательно, при сортировке будут учитываться все массив в целом. Например, для нашего DataFrame сортировка будет осуществляться как по алфавиту (name), так и по числу (age) одновременно:

a = df.withColumn('array', F.array('name', 'age'))
a.select(F.sort_array('array')).show()

# по алфавиту и числу:
+-----------------------+
|sort_array(array, true)|
+-----------------------+
|            [23, Anton]|
|             [27, Anna]|
|            [24, Andry]|
|             [32, Alex]|
|            [55, Boris]|
|             [64, Vera]|
+-----------------------+

Последняя функция, о которой мы упомянем, — это функция size, вычисляющая размерность массивов. В нашем примере каждый массив имеет по два элемента, поэтому он не будет иллюстративен, но тем менее в любом другом DataFrame может быть все иначе.

a = df.withColumn('array', F.array('name', 'age'))
a.select(F.size('array')).show()

+-----------+
|size(array)|
+-----------+
|          2|
|          2|
|          2|
|          2|
|          2|
|          2|
+-----------+

 

О работе с массивами, в т.ч. массивами больших данных (Big Data) вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:

Источники
  1. https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html

Добавить комментарий

Поиск по сайту