В прошлый раз мы рассмотрели, из чего состоит стек фреймворка Spark. В этой статье поговорим про основные операции, на которых основан анализ распределенных данных в Apache Spark. Также рассмотрим, что такое Resilient Distributed Dataset (RDD), зачем нужны эти распределенные наборы больших данных (Big Data) в Spark и как можно их создать.
Какие операции с RDD существуют в Spark: основные виды
Существует масса методов для работы с распределенными наборами данных (RDD, Resilient Distributed Dataset): фильтрация, удаление дубликатов, случайная выборка элементов, применение функций к каждому элементу и пр. Все эти операции над распределенными наборами данных в Spark можно отнести к одному из следующих двух видов:
- действия;
- преобразования.
Как эти операции осуществляются на практике, мы рассмотрим далее.
Что представляют собой распределенные наборы данных: основная структура
RDD – это неизменяемая коллекция объектов данных. Каждый такой набор делится на определенное количество частей, которые обрабатываются различными узлами в кластере. Распределенные наборы данных можно создавать двумя способами:
- загружая внешние наборы данных (из существующего файла);
- распределяя большие множества данных внутри программы-драйвера.
В Apache Spark для создания объекта RDD из внешнего источника используется метод textFile()
:
my_text_RDD = sc.textFile('FIFA.csv')
Для создания набора RDD вручную необходимо вызвать метод parallelize()
:
my_inner_RDD = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
Оба этих метода возвращают нам распределенную коллекцию данных (RDD) для дальнейшего использования.
Действия
Действия – это такой тип операций с RDD, который возвращает конкретное значение. Действия применяются в том случае, когда необходимо вывести конкретное значение в консоль. Самыми известными действиями служат методы collect()
, take(n)
и count()
. Метод collect()
применяется, когда необходимо получить элементы датасета в виде массива или списка:
my_inner_RDD.collect() ###[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Метод take(n)
возвращает в виде массива или списка первые n элементов:
my_inner_RDD.take(3) ###[1, 2, 3]
Метод count()
нужен для подсчета количества элементов в датасете:
my_inner_RDD.count() ###10
Важно отметить, что в момент вызова нового действия происходит вычисление всего набора RDD с самого начала. Это может занимать огромное количество времени и ресурсов, поэтому методами действий следует пользоваться очень осторожно.
Преобразования
Преобразования – это операции над коллекциями данных RDD, результатом которых служат новые RDD. Вычисление преобразованных RDD откладывается до того момента, когда к ним будут применены действия. Преобразования в основном выполняются поэлементно, то есть ведутся вычисления над каждым элементом в датасете по отдельности. Основными преобразованиями считаются методы map()
, filter()
, distinct()
.
Метод map()
принимает в качестве параметра функцию и применяет ее к каждому элементу RDD, а затем получает новый набор. Например, давайте увеличим каждый элемент в нашем датасете на 2:
increased=my_inner_RDD.map(lambda x:x+2) ###[3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
Иногда нам нужно получить набор, содержащий определенные элементы, которые соответствуют определенному условию. В этом нам поможет метод filter()
. Давайте получим новый набор со всеми элементами из старого, кроме 1 и 2.
new_RDD = my_inner_RDD.filter(lambda x:x!=1 and x!=2) ##[3, 4, 5, 6, 7, 8, 9, 10]
Порой мы получаем на обработку огромную коллекцию с данными. Работать с такими данными бывает очень проблематично, поскольку это требует большого количества времени и ресурсов. Причиной такого размера могут быть дубликаты, которые затмили наши данные. Чтобы их удалить, можно использовать очень простой метод distinct()
:
dublicates = sc.parallelize([1,1,3,4,4,6,7,8,8,10]) ###[1,1,3,4,4,6,7,8,8,10] clear = dublicates.distinct()###[1, 3, 4, 6, 7, 8, 10]
Таким образом, набор данных уменьшился и работать с ним будет гораздо проще и быстрее.
Преобразования могут выполняться также с двумя наборами данных одновременно, например, в случае объединения или пересечения датасетов, Декартова произведения и вычитания. Давайте посмотрим, как работают данные методы:
first_RDD = sc.parallelize([1,3,7,9,5,4,6,8]) second_RDD = sc.parallelize([3,9,10,5,7,4]) united_rdd = first_RDD.union(second_RDD)###[1, 3, 7, 9, 5, 4, 6, 8, 3, 9, 10, 5, 7, 4] sub_rdd = first_RDD.subtract(second_RDD)###[6, 8, 1] intersect_RDD = first_RDD.intersection(second_RDD)###[4, 3, 7, 9, 5] cartesian_RDD = first_RDD.cartesian(second_RDD)###[(1, 3), (1, 9), (1, 10)....]
Бывают случаи, когда необходимо комбинировать преобразования и действия. Например, следующий код показывает, как вычислить квадраты чисел коллекции RDD и вывести их на экран:
typic_rdd = sc.parallelize([1,2,3,4,5,6,7,8]) squared_rdd = typic_rdd.map(lambda x: x*x) for i in squared_rdd.collect(): print(i) ### 1 4 9 16 25 36 49 64
Подводя итог всем рассмотренным методам работы с RDD в Apache Spark, отметим, что их разнообразие делает этот фреймворк весьма полезным средством для Data Scientist’а и разработчика Big Data приложений. В следующей статье мы поговорим про другой часто используемый на практике компонент Apache Spark – Spark SQL.
Больше подробностей про применение Apache Spark в проектах анализа больших данных, разработки Big Data приложений и прочих прикладных областях Data Science вы узнаете на практических курсах по Spark в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве.