Как работать с RDD в Apache Spark: что такое действия и преобразования

Spark, фреймворк, Data Science, RDD

В прошлый раз мы рассмотрели, из чего состоит стек фреймворка Spark. В этой статье поговорим про основные операции, на которых основан анализ распределенных данных в Apache Spark. Также рассмотрим, что такое Resilient Distributed Dataset (RDD), зачем нужны эти распределенные наборы больших данных (Big Data) в Spark и как можно их создать.

Какие операции с RDD существуют в Spark: основные виды

Существует масса методов для работы с распределенными наборами данных (RDD, Resilient Distributed Dataset): фильтрация, удаление дубликатов, случайная выборка элементов, применение функций к каждому элементу и пр. Все эти операции над распределенными наборами данных в Spark можно отнести к одному из следующих двух видов:

  • действия;
  • преобразования.

Как эти операции осуществляются на практике, мы рассмотрим далее.

Что представляют собой распределенные наборы данных: основная структура

RDD – это неизменяемая коллекция объектов данных. Каждый такой набор делится на определенное количество частей, которые обрабатываются различными узлами в кластере. Распределенные наборы данных можно создавать двумя способами:

  • загружая внешние наборы данных (из существующего файла);
  • распределяя большие множества данных внутри программы-драйвера.

В Apache Spark для создания объекта RDD из внешнего источника используется метод textFile():

my_text_RDD = sc.textFile('FIFA.csv')

Для создания набора RDD вручную необходимо вызвать метод parallelize():

my_inner_RDD = sc.parallelize([1,2,3,4,5,6,7,8,9,10])

Оба этих метода возвращают нам распределенную коллекцию данных (RDD) для дальнейшего использования.

Действия

Действия – это такой тип операций с RDD, который возвращает конкретное значение. Действия применяются в том случае, когда необходимо вывести конкретное значение в консоль. Самыми известными действиями служат методы collect(), take(n) и count(). Метод collect() применяется, когда необходимо получить элементы датасета в виде массива или списка:

my_inner_RDD.collect()   ###[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Метод take(n) возвращает в виде массива или списка первые n элементов:

my_inner_RDD.take(3)  ###[1, 2, 3]

Метод count() нужен для подсчета количества элементов в датасете:

my_inner_RDD.count()  ###10

Важно отметить, что в момент вызова нового действия происходит вычисление всего набора RDD с самого начала. Это может занимать огромное количество времени и ресурсов, поэтому методами действий следует пользоваться очень осторожно.

Преобразования

Преобразования – это операции над коллекциями данных RDD, результатом которых служат новые RDD. Вычисление преобразованных RDD откладывается до того момента, когда к ним будут применены действия. Преобразования в основном выполняются поэлементно, то есть ведутся вычисления над каждым элементом в датасете по отдельности. Основными преобразованиями считаются методы map(), filter(), distinct().

Метод map() принимает в качестве параметра функцию и применяет ее к каждому элементу RDD, а затем получает новый набор. Например, давайте увеличим каждый элемент в нашем датасете на 2:

increased=my_inner_RDD.map(lambda x:x+2) ###[3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

Иногда нам нужно получить набор, содержащий определенные элементы, которые соответствуют определенному условию. В этом нам поможет метод filter(). Давайте получим новый набор со всеми элементами из старого, кроме 1 и 2.

new_RDD = my_inner_RDD.filter(lambda x:x!=1 and x!=2) ##[3, 4, 5, 6, 7, 8, 9, 10]

Порой мы получаем на обработку огромную коллекцию с данными. Работать с такими данными бывает очень проблематично, поскольку это требует большого количества времени и ресурсов. Причиной такого размера могут быть дубликаты, которые затмили наши данные. Чтобы их удалить, можно использовать очень простой метод distinct():

dublicates = sc.parallelize([1,1,3,4,4,6,7,8,8,10]) ###[1,1,3,4,4,6,7,8,8,10]

clear = dublicates.distinct()###[1, 3, 4, 6, 7, 8, 10]

Таким образом, набор данных уменьшился и работать с ним будет гораздо проще и быстрее.

Преобразования могут выполняться также с двумя наборами данных одновременно, например, в случае объединения или пересечения датасетов, Декартова произведения и вычитания. Давайте посмотрим, как работают данные методы:

first_RDD = sc.parallelize([1,3,7,9,5,4,6,8])

second_RDD = sc.parallelize([3,9,10,5,7,4])

united_rdd = first_RDD.union(second_RDD)###[1, 3, 7, 9, 5, 4, 6, 8, 3, 9, 10, 5, 7, 4]

sub_rdd = first_RDD.subtract(second_RDD)###[6, 8, 1]

intersect_RDD = first_RDD.intersection(second_RDD)###[4, 3, 7, 9, 5]

cartesian_RDD = first_RDD.cartesian(second_RDD)###[(1, 3), (1, 9), (1, 10)....]

Бывают случаи, когда необходимо комбинировать преобразования и действия. Например, следующий код показывает, как вычислить квадраты чисел коллекции RDD и вывести их на экран:

typic_rdd = sc.parallelize([1,2,3,4,5,6,7,8])

squared_rdd = typic_rdd.map(lambda x: x*x)

for i in squared_rdd.collect():

    print(i) ### 1 4 9 16 25 36 49 64

Подводя итог всем рассмотренным методам работы с RDD в Apache Spark, отметим, что их разнообразие делает этот фреймворк весьма полезным средством для Data Scientist’а и разработчика Big Data приложений. В следующей статье мы поговорим про другой часто используемый на практике компонент Apache Spark – Spark SQL.

Больше подробностей про применение Apache Spark в проектах анализа больших данных, разработки Big Data приложений и прочих прикладных областях Data Science вы узнаете на практических курсах по Spark в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве.

Записаться на курс

Смотреть раcписание

Добавить комментарий

Поиск по сайту