Набор RDD (Resilient Distributed Dataset) – это неизменяемая коллекция объектов данных. Каждый такой набор делится на определенное количество частей, которые обрабатываются различными узлами в кластере. Распределенные наборы данных (RDD) можно создавать двумя способами: загружая внешние наборы данных (из существующего файла) или распределяя большие множества данных внутри программы-драйвера. Существует масса методов для работы с распределенными наборами данных (RDD). Среди них известны такие методы, как фильтрация, удаление дубликатов, случайная выборка элементов, применение функций к каждому элементу в RDD и т.д.
Однако все операции над распределенными наборами данных в Spark, которым принадлежат эти методы подразделяются на 2 вида:
- действия – это такой тип операций с RDD, который возвращает конкретное значение. Действия применяются в том случае, когда необходимо вывести конкретное значение в консоль.
- преобразования – это операции над коллекциями данных RDD, результатом которых служат новые RDD. Вычисление преобразованных RDD откладывается до того момента, когда к ним будут применены действия. Преобразования в основном ведутся поэлементно, то есть ведутся вычисления над каждым элементом в датасете по отдельности.
Особенности работы с наборами RDD в Spark: несколько практических примеров
Для создания объекта RDD из внешнего источника используется метод textFile()
:
my_text_RDD = sc.textFile('rdd.csv')
Для создания набора RDD вручную необходимо вызвать метод parallelize()
:
my_inner_RDD = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
Оба этих метода возвращают распределенную коллекцию данных (RDD) для дальнейшего использования.
Для применения действий к RDD служат методы collect()
, take(n)
и count()
. Метод collect()
применяется, когда необходимо получить элементы датасета в виде массива или списка:
my_inner_RDD.collect() ###[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Метод take(n)
возвращает в виде массива или списка первые n элементов:
my_inner_RDD.take(3) ###[1, 2, 3]
Метод count()
нужен для подсчета количества элементов в датасете:
my_inner_RDD.count() ###10
Важно быть осторожным, так как в момент вызова нового действия, происходит вычисление всего набора RDD с самого начала. Это может занимать огромное количество времени и ресурсов.
Основными преобразованиями считаются методы map()
, filter()
, distinct()
. Метод map()
принимает в качестве параметра функцию и применяет ее к каждому элементу RDD, а затем получает новый набор. Например, увеличим каждый элемент в нашем датасете на 2:
increased=my_inner_RDD.map(lambda x:x+2) ###[3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
Иногда нам нужно получить набор, содержащий определенные элементы, которые соответствуют определенному условию. В этом нам поможет метод filter()
. Давайте получим новый набор со всеми элементами из старого, кроме 1 и 2.
new_RDD = my_inner_RDD.filter(lambda x:x!=1 and x!=2) ##[3, 4, 5, 6, 7, 8, 9, 10]
Порой мы получаем на обработку огромную коллекцию с данными. Работать с такими данными бывает очень проблематично, поскольку это требует большого количества времени и ресурсов. Причиной такого размера могут быть дубликаты, которые «затмили» данные. Чтобы их удалить, можно использовать очень простой метод distinct()
:
dublicates = sc.parallelize([1,1,3,4,4,6,7,8,8,10]) ###[1,1,3,4,4,6,7,8,8,10] clear = dublicates.distinct()###[1, 3, 4, 6, 7, 8, 10]
Мы видим, что наш набор данных уменьшился и работать с ним будет гораздо проще и быстрее.
Фреймворк Spark также предоставляет использование числовых операций с данными RDD, которые находятся в модуле pyspark.sql.functions
. Чтобы использовать методы числовых операций, необходимо импортировать этот модуль следующим образом:
from pyspark.sql.functions import*
Для того, чтобы создать набор данных RDD, используется метод parallelize()
:
my_RDD = sc.parallelize([1,3,7,9,5,4,6,8,999])
Для того, чтобы подсчитать количество элементов в наборе, используется метод count()
:
my_RDD = sc.parallelize([1,3,7,9,5,4,6,8,999]) my_RDD.count()###9
Из примера видно, что метод возвращает число-счетчик, которое соответствует количеству элементов в наборе данных, начиная с 1. Для расчета суммы элементов используется метод sum()
:
my_RDD.sum() ###1042
Для того, чтобы вычислить среднее значение, можно использовать классическое деление суммы на количество элементов:
my_RDD.sum() / my_RDD.count() ###115.7777777
Однако благодаря тому, что модуль pyspark.sql
имеет в своем арсенале метод mean()
, задача существенно упрощается. Метод mean()
использует «под капотом» счетчик количества элементов и их сумму для вычисления среднего значения элементов:
my_RDD.mean() ###115.777777777765
Стоит отметить, что при использовании встроенного метода mean()
точность результата выше, чем при использовании «ручных» вычислений. Это говорит о том, что метод mean()
работает гораздо эффективнее, чем «ручные вычисления».
В наборе данных RDD, который состоит из числовых данных существует также возможность нахождения минимального и максимального значений с помощью методов min()
и max()
соответственно:
my_RDD.min() ###1 my_RDD.max() ###999
Бывают случаи, когда необходимо вычислить аномальные значения (outlier values) в наборе RDD во избежание некорректной работы алгоритмов машинного обучения. Для этого прежде всего необходимо вычислить дисперсию значений (variance) и их стандартное отклонение (standard deviation). Для вычисления дисперсии используется метод variance()
:
my_RDD.variance() ### 97515.72839506173
Метод stdev()
применяется для вычисления стандартного отклонения:
my_RDD.stdev() ### 312.27508449292225
Таким образом, комбинация всех вышерассмотренных операций делает Apache Spark достаточно полезным средством в работе с распределенными наборами данных. Все это делает фреймворк Apache Spark весьма полезным средством для Data Scientist’а и разработчика Big Data приложений.
Код курса
CORS
Ближайшая дата курса
по запросу
Продолжительность
ак.часов
Стоимость обучения
0 руб.
Больше подробностей про применение Apache Spark в проектах анализа больших данных, разработки Big Data приложений и прочих прикладных областях Data Science вы узнаете на практических курсах по Spark в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:
- Графовые алгоритмы в Apache Spark
- Машинное обучение в Apache Spark
- Потоковая обработка в Apache Spark
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Разработка и внедерение ML-решений
- Графовые алгоритмы. Бизнес-приложения
Источники
- https://spark.apache.org/documentation.html
- К.Харау, Э.Ковински, П.Венделл, М.Захария. Изучаем Spark: молниеносный анализ данных