Может случиться так, что вам нужно объединить два DataFrame в PySpark. Как это можно сделать? В этой статье мы расскажем, как объединить две или более таблиц с помощью методов union и unionByName.
Вступление
Допустим имеется некоторая таблица ниже. Она состоит из 5 атрибутов.
df = spark.createDataFrame([ ('py', 'Anton', 'Moscow', 23, 19), ('c', 'Valentina', 'Omsk', 27, 25), ('py', 'Andry', 'Moscow', 24, 22), ('cpp', 'Alex', 'Moscow', 32, 25), ('cpp', 'Boris', 'Omsk', 55, 37), ('py', 'Vera', 'Moscow', 89, 41), ], ['lang', 'name', 'city', 'salary', 'age'] )
Немного позже у нас появилась вторая таблица (например, не весь датасет был собран). Эта таблица также имеет 5 точно таких же столбцов: имена атрибутов и их типы совпадают. Допустим, мы также инициализировали, код с PySpark:
df2 = spark.createDataFrame([ ('cpp', 'Frank', 'Moscow', 23, 19), ('c', 'Valentina', 'Omsk', 27, 25), ('cpp', 'Boris', 'Omsk', 55, 37), ('py', 'Stones', 'Moscow', 24, 22), ], ['lang', 'name', 'city', 'salary', 'age'] )
Поскольку мы не хотим работать с каждой таблицей по отдельности, нужно объединить две таблицы PySpark. Для объединения DataFrame’ов есть методы union
и unionAll
.
Код курса
GRAS
Ближайшая дата курса
по запросу
Продолжительность
ак.часов
Стоимость обучения
0 руб.
Объединить два и более столбцов с помощью union
Здесь никакой магии нет. Нужно просто вызвать метод union
. Однако результатом объединения является новый DataFrame, поэтому создается новая или переприсваивается старая переменная.
df = df.union(df2) df.show() """ +----+---------+------+------+---+ |lang| name| city|salary|age| +----+---------+------+------+---+ | py| Anton|Moscow| 23| 19| | c|Valentina| Omsk| 27| 25| | py| Andry|Moscow| 24| 22| | cpp| Alex|Moscow| 32| 25| | cpp| Boris| Omsk| 55| 37| | py| Vera|Moscow| 89| 41| | cpp| Frank|Moscow| 23| 19| | c|Alexander| Omsk| 27| 25| | py| Stones|Moscow| 24| 22| | cpp| Frank|Moscow| 23| 19| | c|Valentina| Omsk| 27| 25| | cpp| Boris| Omsk| 55| 37| | py| Stones|Moscow| 24| 22| +----+---------+------+------+---+ """
Вы возможно встречали еще один метод — unionAll
. Так вот он делает то же самое. Этот метод использовался до PySpark 2.0.0. Сейчас он просто вызывает union
.
Если вы хотите объединить несколько таблиц, то вызывайте union
друг за другом (a.k.a функциональное программирование):
df = df.union(df2).union(df3)
Объединить без дубликатов
Может такое случиться, что одна таблица содержит записи другой. После слияния таких таблиц появятся дублирующиеся значения. К сожалению, в PySpark на данный момент нет возможности предотвратить это при использовании union
.
Поэтому мы можем предложить использовать метод distinct
сразу после метода union
:
df = df.union(df2).distinct()
Интересно, что мы могли бы использовать SQL-стиль, о котором говорили тут. Оператор UNION
по умолчанию сам убирает дубликаты, т.е. не нужно использовать UNION DISTINCT
:
df.createTempView('df') df2.createTempView('df2') spark.sql("select * from df union select * from df2").show() """ +----+---------+------+------+---+ |lang| name| city|salary|age| +----+---------+------+------+---+ | cpp| Boris| Omsk| 55| 37| | c|Valentina| Omsk| 27| 25| | py| Andry|Moscow| 24| 22| | cpp| Alex|Moscow| 32| 25| | py| Anton|Moscow| 23| 19| | cpp| Frank|Moscow| 23| 19| | py| Vera|Moscow| 89| 41| | py| Stones|Moscow| 24| 22| +----+---------+------+------+---+ """
Как видим, Frank и Stones были добавлены, а Valentina и Boris — нет.
Объединить столбцы таблицы, в которых они не стоят одинаково
Случаи выши работают только в том случае, если столбцы каждой из таблиц расположены в одинаковом порядке.
При другом порядке следования у нас получится не совсем то, что хотим:
df2 = spark.createDataFrame([ ('Frank', 'cpp', 23, 'Moscow', 19), ('Valentina', 'c', 27, 'Omsk', 25), ('Boris', 'cpp', 55, 'Omsk', 37), ('Stones', 'py', 24, 'Moscow', 22), ], ['name', 'lang', 'age', 'city', 'salary'] ) df.unionByName(df2).show() """ +---------+---------+------+------+---+ | lang| name| city|salary|age| +---------+---------+------+------+---+ | py| Anton|Moscow| 23| 19| | c|Valentina| Omsk| 27| 25| | py| Andry|Moscow| 24| 22| | cpp| Alex|Moscow| 32| 25| | cpp| Boris| Omsk| 55| 37| | py| Vera|Moscow| 89| 41| | Frank| cpp| 23|Moscow| 19| |Valentina| c| 27| Omsk| 25| | Boris| cpp| 55| Omsk| 37| | Stones| py| 24|Moscow| 22| +---------+---------+------+------+---+ """
Для этого случае используйте метод unionByName
. Он сам расположит столбцы одной таблицы в том же порядке, что и другая.
df.unionByName(df2)
Если вы хотите разобраться и с внутренней кухней PySpark, как устроены методы объединения таблиц, сколько памяти съедают, сколько времени длятся, то приходите на наши образовательные курс в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark