H J M P R S W Y

JOIN

JOIN — это операция по соединению двух таблиц по заданному полю (ключу) в Spark SQL. Является аналогом merge в Python-библиотеки Pandas. Spark поддерживает все виды JOIN: INNER, LEFT, RIGHT и FULL.

Синтаксис JOIN в Apache Spark

Для соединение двух таблиц вызывается метод join, в который нужно передать присоединяемый датафрейм, ключ и способ соединения. Ключом является условие, через которое связываются две таблицы.

df_joined = (
    df
    .join(
        other_df,
        F.col('emp_id') == F('id')
    ),
    'left'
)

Виды соединения

Соединить две таблицы можно четырьмя способами:

  • LEFT JOIN — это когда к таблице слева присоединяется значения из таблицы справа. Если условие соединения не было выполнено, то присоединенные значения заполняются null’ами.
  • RIGHT JOIN — это когда к таблице справа присоединяется значения из таблицы слева. Если условие соединения не было выполнено, то присоединенные значения заполняются null’ами.
  • INNER JOIN — это когда соединяются, те строки, условие которое выполняются, а остальные выбрасываются.
  • FULL JOIN — это комбинация левого и правого соединения. Если условие соединения не выполняется, то заполняются null’ами не ключевые столбцы.

Количество записей при LEFT JOIN должны быть равно количеству записей из левой таблицы. Если это количество увеличилось, значит в правой таблице есть дубликаты. То же самое работает и с другими JOIN’ами.

Имена ключей

Spark не такой умный как некоторые СУБД. Ключи, по которым происходит соединения должны иметь разные имена. Например, такой код не валиден:

df_joined = (
    df
    .join(
        other_df,
        F.col('id') == F('id'),
        'left'
    )
)

При этом даже alias не помогут в данной ситуации:

df_joined = (
    df.alias('df')
    .join(
        other_df.alias('other'),
        F.col('df.id') == F('other.id')
        'left'
    ),
)

Соединения у вас выполнится, но при обращение к id появится ошибка, поскольку у вас будет два таких столбца.

Поэтому лучше всего с самого начала придерживаться разных имен, а после join в Spark выбрасывать ненужный столбец. Например, так:

df = df .withColumnRenamed('id', 'emp_id')
df_joined = (
    df.alias('df')
    .join(
        other_df.alias('other'),
        F.col('df.emp_id') == F('other.id'),
        'left'
    )
)

Также при использовании JOIN в Spark не стоит забывать о перетасовке (shuffle), который замедляет ход операции. Ниже ссылки на статьи, где вы можете узнать, как это избежать.

  1. Соединяй и властвуй: основы JOIN
  2. 6 способов повышения производительности в Apache Spark
  3. 4 совета по оптимизации Apache Spark

Поиск по сайту