Объединение таблиц с union и unionByName

Может случиться так, что вам нужно объединить два DataFrame в PySpark. Как это можно сделать? В этой статье мы расскажем, как объединить две или более таблиц с помощью методов union и unionByName.

Вступление

Допустим имеется некоторая таблица ниже. Она состоит из 5 атрибутов.

df = spark.createDataFrame([
    ('py', 'Anton', 'Moscow', 23, 19),
    ('c', 'Valentina', 'Omsk', 27, 25),
    ('py', 'Andry', 'Moscow', 24, 22),
    ('cpp', 'Alex', 'Moscow', 32, 25),
    ('cpp', 'Boris', 'Omsk', 55, 37),
    ('py', 'Vera', 'Moscow', 89, 41),
],
    ['lang', 'name', 'city', 'salary', 'age']
)

Немного позже у нас появилась вторая таблица (например, не весь датасет был собран). Эта таблица также имеет 5 точно таких же столбцов: имена атрибутов и их типы совпадают. Допустим, мы также инициализировали, код с PySpark:

df2 = spark.createDataFrame([
    ('cpp', 'Frank', 'Moscow', 23, 19),
    ('c', 'Valentina', 'Omsk', 27, 25),
    ('cpp', 'Boris', 'Omsk', 55, 37),
    ('py', 'Stones', 'Moscow', 24, 22),
],
    ['lang', 'name', 'city', 'salary', 'age']
)

Поскольку мы не хотим работать с каждой таблицей по отдельности, нужно объединить две таблицы PySpark. Для объединения DataFrame’ов есть методы union и unionAll.

Графовые алгоритмы в Apache Spark

Код курса
GRAS
Ближайшая дата курса
20 июня, 2022
Длительность обучения
16 ак.часов
Стоимость обучения
40 000 руб.

Объединить два и более столбцов с помощью union

Здесь никакой магии нет. Нужно просто вызвать метод union. Однако результатом объединения является новый DataFrame, поэтому создается новая или переприсваивается старая переменная.

df = df.union(df2)
df.show()
"""
+----+---------+------+------+---+
|lang|     name|  city|salary|age|
+----+---------+------+------+---+
|  py|    Anton|Moscow|    23| 19|
|   c|Valentina|  Omsk|    27| 25|
|  py|    Andry|Moscow|    24| 22|
| cpp|     Alex|Moscow|    32| 25|
| cpp|    Boris|  Omsk|    55| 37|
|  py|     Vera|Moscow|    89| 41|
| cpp|    Frank|Moscow|    23| 19|
|   c|Alexander|  Omsk|    27| 25|
|  py|   Stones|Moscow|    24| 22|
| cpp|    Frank|Moscow|    23| 19|
|   c|Valentina|  Omsk|    27| 25|
| cpp|    Boris|  Omsk|    55| 37|
|  py|   Stones|Moscow|    24| 22|
+----+---------+------+------+---+
"""

Вы возможно встречали еще один метод — unionAll. Так вот он делает то же самое. Этот метод использовался до PySpark 2.0.0. Сейчас он просто вызывает union.

Если вы хотите объединить несколько таблиц, то вызывайте union друг за другом (a.k.a функциональное программирование):

df = df.union(df2).union(df3)

Объединить без дубликатов

Может такое случиться, что одна таблица содержит записи другой. После слияния таких таблиц появятся дублирующиеся значения. К сожалению, в PySpark на данный момент нет возможности предотвратить это при использовании union.

Поэтому мы можем предложить использовать метод distinct сразу после метода union:

df = df.union(df2).distinct()

Интересно, что мы могли бы использовать SQL-стиль, о котором говорили тут. Оператор UNION по умолчанию сам убирает дубликаты, т.е. не нужно использовать UNION DISTINCT:

df.createTempView('df')
df2.createTempView('df2')

spark.sql("select * from df union select * from df2").show()
"""
+----+---------+------+------+---+
|lang|     name|  city|salary|age|
+----+---------+------+------+---+
| cpp|    Boris|  Omsk|    55| 37|
|   c|Valentina|  Omsk|    27| 25|
|  py|    Andry|Moscow|    24| 22|
| cpp|     Alex|Moscow|    32| 25|
|  py|    Anton|Moscow|    23| 19|
| cpp|    Frank|Moscow|    23| 19|
|  py|     Vera|Moscow|    89| 41|
|  py|   Stones|Moscow|    24| 22|
+----+---------+------+------+---+
"""

Как видим, Frank и Stones были добавлены, а Valentina и Boris — нет.

Объединить столбцы таблицы, в которых они не стоят одинаково

Случаи выши работают только в том случае, если столбцы каждой из таблиц расположены в одинаковом порядке.

При другом порядке следования у нас получится не совсем то, что хотим:

df2 = spark.createDataFrame([
    ('Frank', 'cpp', 23, 'Moscow', 19),
    ('Valentina', 'c', 27, 'Omsk', 25),
    ('Boris', 'cpp', 55, 'Omsk', 37),
    ('Stones', 'py', 24, 'Moscow', 22),
],
    ['name', 'lang', 'age', 'city', 'salary']
)

df.unionByName(df2).show()
"""
+---------+---------+------+------+---+
|     lang|     name|  city|salary|age|
+---------+---------+------+------+---+
|       py|    Anton|Moscow|    23| 19|
|        c|Valentina|  Omsk|    27| 25|
|       py|    Andry|Moscow|    24| 22|
|      cpp|     Alex|Moscow|    32| 25|
|      cpp|    Boris|  Omsk|    55| 37|
|       py|     Vera|Moscow|    89| 41|
|    Frank|      cpp|    23|Moscow| 19|
|Valentina|        c|    27|  Omsk| 25|
|    Boris|      cpp|    55|  Omsk| 37|
|   Stones|       py|    24|Moscow| 22|
+---------+---------+------+------+---+
"""

Для этого случае используйте метод unionByName. Он сам расположит столбцы одной таблицы в том же порядке, что и другая.

df.unionByName(df2)

 

Если вы хотите разобраться и с внутренней кухней PySpark, как устроены методы объединения таблиц, сколько памяти съедают, сколько времени длятся, то приходите на наши образовательные курс в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:

Источники
  1. документация union
  2. документация unionByName

Добавить комментарий

Поиск по сайту