Распределенные матрицы в Spark MLlib

В прошлой статье мы говорили о локальных векторах и матрицах. Сегодня рассмотрим распределенные матрицы Spark MLlib. В этой статье вы узнаете, как создаются строковая матрица (RowMatrix), матрица с нумерованными строками (IndexedRowMatrix), координатная матрица (CoordinateMatrix) и блочная матрица (BlockMatrix) с примерами кода на Scala и Python.

4 вида распределенных матриц в Spark MLlib

Распределенные матрицы Spark MLlib имеют индексы строк и столбцов типа long, а значения типа double, которые хранятся распределено в одном или нескольких RDD. Очень важно выбрать правильный формат для хранения больших и распределенных матриц. Кроме того, преобразование распределенных матриц в другой формат может потребовать дополнительных мощностей. В Spark MLlib имеется 4 типа распределенных матриц: RowMatrix, IndexedRowMatrix, CoordinateMatrix и BlockMatrix. Их мы и рассмотрим.

Строковая матрица (RowMatrix)

Первый базовый тип называется RowMatrix. Это строко-ориентированная распределенная матрица без значимых индексов строк. Такая матрица может выступать, например, в виде набора признаков для алгоритмов машинного обучения (machine learning). Каждая строка внутри RDD представляется локальным вектором. Предполагается, что количество столбцов для RowMatrix невелико, так что один локальный вектор может быть разумно передан драйверу, а также может быть сохранен или обработан с использованием одного узла.

RowMatrix можно создать из экземпляра RDD[Vector]. После чего может быть вычислена сводную статистика столбца и его разложение. QR-разложение — это форма A=QR, где Q — это унитарная матрица, а R — верхнетреугольная матрица

Код на Scala для создания строковой матрицы Spark MLlib выглядит следующим образом:

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix

val rows: RDD[Vector] = ... // RDD их локальных векторов
val mat: RowMatrix = new RowMatrix(rows)

// Размеры матрицы
val m = mat.numRows()
val n = mat.numCols()

// QR-разложение
val qrResult = mat.tallSkinnyQR(true)

На Python строковая матрица Spark MLlib создается аналогичным способом:

from pyspark.mllib.linalg.distributed import RowMatrix

# RDD:
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])

# Cтроковая матрица из RDD с векторами
mat = RowMatrix(rows)

# Размеры
m = mat.numRows()  # 4
n = mat.numCols()  # 3

# Обратная операция: получение строк в виде RDD
rowsRDD = mat.rows

Матрица с нумерованными строками (IndexedMatrix)

Вторая реализация распределенной матрицы — это IndexedMatrix. Матрица IndexedRowMatrix похожа на RowMatrix, но строки нумеруются и имеют тип long, а сами значение — это локальные векторы. Индексы можно использовать для идентификации строк и выполнения объединений.

IndexedRowMatrix может быть создан из экземпляра RDD[IndexedRow], где IndexedRow представляется в виде (Index, Vector). IndexedRowMatrix можно преобразовать в RowMatrix, отбросив индексы строк.

Пример кода на Scala для создания матрицы с нумерованными строками Spark MLlib:

import org.apache.spark.mllib.linalg.distributed.
    {IndexedRow, IndexedRowMatrix, RowMatrix}

val rows: RDD[IndexedRow] = ... // RDD нумерованных строк
// Создание IndexedRowMatrix из RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)

// Размеры 
val m = mat.numRows()
val n = mat.numCols()

// Преобразование в RowMatrix путем отбрасывания индексов
val rowMat: RowMatrix = mat.toRowMatrix()

На Python матрица IndexedRowMatrix создается следующим образом:

from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

# Создавние RDD нумерованных строк
# может быть реализована через класс IndexedRow:
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
                              IndexedRow(1, [4, 5, 6]),
                              IndexedRow(2, [7, 8, 9]),
                              IndexedRow(3, [10, 11, 12])])
# или через кортеж (long, vector)
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
                              (2, [7, 8, 9]), (3, [10, 11, 12])])

# Создание IndexedRowMatrix из RDD с IndexedRows.
mat = IndexedRowMatrix(indexedRows)

# Размер.
m = mat.numRows()  # 4
n = mat.numCols()  # 3

# Получение строк в виде RDD из IndexedRow.
rowsRDD = mat.rows

# Преобразование в RowMatrix.
rowMat = mat.toRowMatrix()

Координатная матрица (CoordinateMatrix)

Третий тип распределенных матриц — это CoordinateMatrix. Такая матрица хранится в формате coordinate list внутри RDD, т.е. данные имеют вид (row, col, value). Координатную матрицу следует использовать только тогда, когда оба измерения матрицы очень большие, а матрица очень разреженная, т.е. много нулей.

CoordinateMatrix можно создать из экземпляра RDD[MatrixEntry], где MatrixEntry является оболочкой (row:Long, col:Long, value:Double). CoordinateMatrix можно преобразовать в IndexedRowMatrix с разреженными строками, вызвав метод toIndexedRowMatrix. Другие вычисления для CoordinateMatrix в настоящее время не поддерживаются.

На Scala матрица CoordinateMatrix создается так:

import org.apache.spark.mllib.linalg.distributed.
{CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = ... // RDD с матрицами
val mat: CoordinateMatrix = new CoordinateMatrix(entries)

// Размеры.
val m = mat.numRows()
val n = mat.numCols()

// Преобразование в IndexedRowMatrix, у которой строки -
// разреженные векторы.
val indexedRowMatrix = mat.toIndexedRowMatrix()

Пример кода на Python для создания координатной матрицы Spark MLlib:

from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

# Создание RDD с матрицами MatrixEntry
# может быть реализовано явно черех MatrixEntry: 
entries = sc.parallelize([MatrixEntry(0, 0, 1.2), 
                          MatrixEntry(1, 0, 2.1),
                          MatrixEntry(2, 1, 3.7)])

# или через кортеж (long, long, float):
entries = sc.parallelize([(0, 0, 1.2),
                          (1, 0, 2.1),
                          (2, 1, 3.7)])

# Создание CoordinateMatrix из RDD с MatrixEntries.
mat = CoordinateMatrix(entries)

# Размеры.
m = mat.numRows()  # 3
n = mat.numCols()  # 2

# Получение MatrixEntries.
entriesRDD = mat.entries

# Преобразование в RowMatrix.
rowMat = mat.toRowMatrix()

# Преобразование в IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()

# Преобразование в BlockMatrix.
blockMat = mat.toBlockMatrix()

Блочная матрица (BlockMatrix) в Spark MLlib

Последний четвертый тип распределенных матриц — это BlockMatrix. Здесь уже данные представляются в виде ((Int, Int), Matrix), где (Int, Int) — индекс блока, а Matrix — это подматрица заданного индекса и размером rowsPerBlock x colsPerBlock. Блочная матрица поддерживает такие операции, как add (прибавить) и multiply (умножить) с другими блочными матрицами. Еще есть операция validate, которая проверяет правильно ли настроена матрица.

BlockMatrix может быть создана из матрицы с нумерованными строками (IndexedRowMatrix) или координатной матрицы (CoordinateMatrix) путем вызова метода toBlockMatrix, который создает блок размером 1024×1024 по умолчанию. Эти значения можно изменить, передав в их в качестве аргументов метода.

Пример на Scala для создания блочной матрицы Saprk MLlib:

import org.apache.spark.mllib.linalg.distributed.
    {BlockMatrix, CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = ... // RDD c (i, j, v) матрицами
// Создание CoordinateMatrix из RDD[MatrixEntry].
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
// Преобразование CoordinateMatrix в BlockMatrix
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()

// Проверяем, что блочная матрица установлена правильно.
// Метод выкинет исключение, если валидация не прошла.
// Ничего не произойдет, если валдация прошла.
matA.validate()

// Вычисление A^T A.
val ata = matA.transpose.multiply(matA)

На Python матрица BlockMatrix создается так:

from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix

# Создание RDD с блоками подматриц.
blocks = sc.parallelize([
    ((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
    ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))
])
mat = BlockMatrix(blocks, 3, 2)

# Размеры.
m = mat.numRows()  # 6
n = mat.numCols()  # 2

# Получение блоков с подматрицами
blocksRDD = mat.blocks

# Преобразование в LocalMatrix.
localMat = mat.toLocalMatrix()

# Преобразование в IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()

# Преобразование в CoordinateMatrix.
coordinateMat = mat.toCoordinateMatrix()

 

Еще больше подробностей о векторах, матрицах и других структурах данных с примерами из Data Science вы узнаете на специализированном курсе «Машинное обучение в Apache Spark» в лицензированном учебном центре обучения и повышения квалификации разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве.

Записаться на курс

Смотреть раcписание

Добавить комментарий

Поиск по сайту