Spark, как инструмент анализа данных, отлично подходит при увеличении масштаба задач и при увеличении размера самих данных Пока вы используете датафреймы и библиотеки Spark вы можете работать с большими массивами данных (Big Data) в рамках потоковой обработки. Однако существуют сценарии, когда библиотеки могут быть недоступны, тогда для достижения параллельных вычисления в Spark требуются другие подходы. В этой статье обсудим три различных способа достижения распараллеливания в PySpark: нативный метод, пул потоков (Thread Pools) и использование Pandas UDFs.
Распараллеливание и распределенность
Прежде чем начать, важно различать распараллеливание и распределенность в Spark. Когда задача распараллеливается, параллельные задачи могут выполняться на рабочих узлах или драйверной программе. То, как задача распределяется между этими различными узлами в кластере, зависит от типов структур данных и библиотек, которые вы используете. Кроме того, в Spark может быть параллелизм без распределенности, а это значит, что рабочий узел может выполнять всю работу.
При распределенности в Spark обрабатываемые данные распределяются между разными узлами в кластере, и задачи выполняются одновременно. В идеале хотелось бы, чтобы задачи одновременно распараллеливались и распределялись.
Ниже мы рассмотрим примеры параллельных вычислений в Spark на датасете с домами на продажу в Бостоне. Для создания DataFrame в Spark по ссылке на датасет используется следующий код:
from pyspark import SparkFiles from pyspark.ml.feature import VectorAssembler url = "https://raw.githubusercontent.com/DataLatata/python-school/master/data/boston.csv" spark.sparkContext.addFile(url) file = "file://"+SparkFiles.get("boston.csv") data = spark.read.csv(file, header=True, inferSchema=True)
1. Нативный Spark
Если вы используете датафреймы и библиотеки Spark, то задача автоматически распараллелится. Допустим, требуется предсказать значение цены на дом с помощью линейной регрессии. Тогда прежде всего нужно векторизовать признаки (говорили об это тут), а затем применить класс LinearRegression.
Итак, код для векторизации и обучения модели линейной регрессии в Spark выглядит так:
from pyspark.ml.regression import LinearRegression # Векторизация features = [ 'CRIM', 'RAD',, 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'ZN' 'TAX', 'B', 'LSTAT', 'PTRATIO', ] assembler = VectorAssembler(inputCols=features, outputCol="features") boston_df = assembler.transform(data).select('features', 'target') # Линейная регрессия lr = LinearRegression(maxIter=10, regParam=0.1, elasticNetParam=0.5, labelCol="target") model = lr.fit(boston_train) boston_pred = model.transform(boston_test) ## R-квадрат для оценки модели r = boston_pred.stat.corr("prediction", "target") print("R-sqaured: " + str(r**2))
Данные операции в среде Databricks выполнились очень быстро. На рисунке ниже можете увидеть, как задача распараллелилась по разным рабочим узлам в кластере.
2. Пулы потоков
Ещё один из способов добиться параллелизма в Spark для обработки больших данных (Big data) является использование стандартной библиотеки многопроцессорной обработки в Python (multiprocessing). Библиотека применяется для создания параллельных потоков программы на Python.
Как правило, в этом подходе применяется функция map
в пуле потоков. Функция map
принимает лямбда-выражение и массив значений в качестве входных данных и вызывает лямбда-выражение для каждого из значений в массиве. Если возможно, лучше всего использовать датафреймы при работе с пулами потоков, потому что тогда операции будут распределены между рабочими узлами в кластере. Spark MLib с использованием пулов потоков показана в приведенном ниже примере, который распаралеливает задачи по рабочим узлам для обучения модели случайных лесов (Random forest).
from multiprocessing.pool import ThreadPool from pyspark.ml.regression import RandomForestRegressor def mllib_random_forest(train, test, trees, target_col='target'): rf = RandomForestRegressor(numTrees=trees, labelCol=target_col) model = rf.fit(train) pred = model.transform(test) r = pred.stat.corr("prediction", target_col) return [trees, r**2] parameters = [10, 20, 50] train, test = df.randomSplit([0.8, 0.2], 0) # разрешить до 5 одновременных потоков pool = ThreadPool(5) pool.map( lambda trees: mllib_random_forest(train, test, trees), parameters )
Результаты R^2 при разных значениях гиперпараметра случайных лесов:
[[10, 0.8427721591161678], [20, 0.8299459711730467], [50, 0.8060209329899399]]
3. Spark и Pandas UDFs
Одним из новых нововведений, обеспечивающих параллельную потоковую обработку, являются пользовательские функции Pandas (user defined function, UDF). С помощью пользовательских функций вы можете разделить датафрейм на меньшие наборы данных, которые распределяются и преобразуются в объекты Pandas. Над этими объектами применяется функция, а затем результаты объединяются обратно в один большой Spark-датафрейм. По сути, пользовательские функции Pandas позволяют работать с различными библиотеками Python (например, scikit-learn), получая при этом преимущества параллельных вычислений.
Продемонстрируем Pandas UDF на примере. Допустим, требуется обучить модель случайных лесов из библиотеки Scikit-learn. В первую очередь, создадим таблицу из исходного датафрейма, а затем добавим в нее столбцы training и trees:
- Столбец training определяет к обучающей или тестовой выборки относится строка
- Столбец trees определяет гиперпараметр случайных весов. Мы инициализировали для каждой строки значения 11, 20, 50, идущие друг за другом по очереди
Вот так выглядит код для создания таблицы Spark через запрос SQL:
from pyspark.sql.types import * # представим DataFrame в виде таблица data.createOrReplaceTempView("boston") # добавим метки обучение/тест, а также # по очереди определим гиперпарметры (11, 20, 50) full_df = spark.sql(""" select * from ( select *, case when rand() < 0.8 then 1 else 0 end as training from boston ) b cross join ( select 11 as trees union all select 20 as trees union all select 50 as trees) """)
Далее, применим пользовательскую функцию над сгруппированными данными. Сгруппируем данные по значениям trees, в результате получится три группы. Для каждой группы обучим модель случайных весов при заданном значении гиперпараметра и вернем новый DataFrame со значениями R^2 для каждого значения гиперпараметра. Пример кода обучения модели машинного обучения с распараллеливанием по группам Spark выглядит так:
from pyspark.sql.functions import pandas_udf, PandasUDFType from sklearn.ensemble import RandomForestRegressor as RFR from scipy.stats.stats import pearsonr import pandas as pd schema = StructType([StructField('trees', LongType(), True), StructField('r_squared', DoubleType(), True)]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def train_RF(df): trees = df['trees'].unique()[0] # получим обучающию и тестовую выборки train = df[df['training'] == 1] test = df[df['training'] == 0] # данные и метки y_train = train['target'] X_train = train.drop(['target'], axis=1) y_test = test['target'] X_test = test.drop(['target'], axis=1) # обучение регрессора rf = RFR(n_estimators=trees) model = rf.fit(X_train, y_train) # предсказания y_pred = model.predict(X_test) r = pearsonr(y_pred, y_test) r_squared = r[0]**2 return pd.DataFrame({'trees': trees, 'r_squared': r_squared}, index=[0]) results = full_df.groupby('trees').apply(train_RF)
Обратите внимание, что пользовательская функция принимает GROUPED_MAP
, поскольку принимает сгруппированные данные. А также определяем схему, то есть возвращаемые типы данных. Внутри функции мы работаем только с библиотеками Python: Pandas, Scikit-learn, Scipy — а не с функциями библиотеки MLlib.
Полученные результаты получились немного больше, чем в MLlib:
+-----+------------------+ |trees| r_squared| +-----+------------------+ | 20|0.9125784797184912| | 50|0.9202834179211972| | 11| 0.888820056398275| +-----+------------------+
Об обработке больших данных (Big Data) и параллельном обучении моделей вы узнаете на специализированном курсе «Потоковая обработка в Apache Spark» в лицензированном учебном центре обучения и повышения квалификации разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве.