Функции PySpark SQL для работы с JSON

В предыдущий раз мы говорили о чтение и записи JSON файлов в PySpark. Сегодня затронем функции для работы с JSON. Читайте в этой статье: как строку JSON преобразовать в корректную запись, как получить схему для JSON, как преобразовать строки JSON в столбцы.

Читаем строковый формат JSON с помощью from_json

Допустим имеется, строковый вариант записи JSON. Например, мы можем сымитировать его с помощью Python-модуля json (об этом модуле тут):

import json
l = [
    {"name": "Mila", "age": 77},
    {"name": "Gowl", "age": 33},
    {"name": "Bona", "age": 15}
]
d = json.dumps(l)
print(type(d), d)
"""
str
'[{"name": "Mila", "age": 77}, {"name": "Gowl", "age": 33}, {"name": "Bona", "age": 15}]'
"""

Так, мы преобразовали из объектов Python строку в соответствии с нотацией JSON. Создадим DataFrame из данной строки:

df = spark.createDataFrame([(0, d)], ["id", "json"])
df.show(truncate=False)
df.printSchema()
"""
+---+---------------------------------------------------------------------------------------+
|id |json                                                                                   |
+---+---------------------------------------------------------------------------------------+
|0  |[{"name": "Mila", "age": 77}, {"name": "Gowl", "age": 33}, {"name": "Bona", "age": 15}]|
+---+---------------------------------------------------------------------------------------+

root
 |-- id: long (nullable = true)
 |-- json: string (nullable = true)

"""

Как видим, это обычная строка, анализ который подразумевает использование парсинга. Это сделать можно с помощью функции from_json. Данная функция на основании заданной схемы парсит строку. В нашем примере записи с двумя атрибутами (name и age) заключены в массив. Тогда для преобразования нам потребуется создать схему:

from pyspark.sql.types import StructField, IntegerType, StringType

schema = ArrayType(
    StructType([
        StructField("name", StringType()),
        StructField("age", IntegerType())
    ])
)

Тогда для парсинга строки требуется написать:

df = df.select(F.from_json("json", schema).alias("parsed"))
df.show()
df.printSchema()
"""
+------------------------------------+
|from_json(json)                     |
+------------------------------------+
|[[Mila, 77], [Gowl, 33], [Bona, 15]]|
+------------------------------------+

root
 |-- from_json(json): array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- age: integer (nullable = true)
"""

Теперь можно работать с уже массивом. Например, выбрать имена:

df.select("parsed.name").show()
"""
+------------------+
|              name|
+------------------+
|[Mila, Gowl, Bona]|
+------------------+
"""

Правда, стоит заметить, что и с массивами работать все также не удобно.

Как узнать схему JSON?

Если вы не уверены, какой вид схемы нужно использовать, то воспользуйтесь функцией schema_of_json. Для этого стоит также применить функцию lit, в которую нужно передать строку с JSON:

d = '[{"name": "Alex", "age": 25}]'
df.select(F.schema_of_json(F.lit(d)).alias("scheme")) \
  .show(truncate=False)
"""
+-------------------------------------+
|scheme                               |
+-------------------------------------+
|array<struct<age:bigint,name:string>>|
+-------------------------------------+
"""

Из набора строк в полноценную таблицу с помощью json_tuple

Из предыдущего примера видно, что мы вынуждены были преобразовывать в массив. Если же у вас изначально есть массив из JSON-записей, то можно их преобразовать в полноценную таблицу с помощью json_tuple.

dl = [
    (0, '{"name": "Alex", "age": 47}'),
    (1, '{"name": "Anna", "age": 24}')
]
df = spark.createDataFrame(dl, ["id", "json"])
df.select(F.json_tuple(df.json, "name", "age")).show()
"""
+----+---+
|  c0| c1|
+----+---+
|Alex| 47|
|Anna| 24|
+----+---+
"""

Правда, почему-то Spark столбцы называет своими именами, а не берет из того, что он распарсил. Вы всегда можете вручную переименовать столбцы PySaprk вручную, как это описано тут.

Вытаскиваем отдельные атрибуты

Возвращаясь к предыдущему примеру:

dl = [
    (0, '{"name": "Alex", "age": 47}'),
    (1, '{"name": "Anna", "age": 24}')
]
df = spark.createDataFrame(dl, ["id", "json"])

— мы можем отдельно вытащить отдельные атрибуты. Для этого используется функцию get_json_object. Вторым параметром функция принимает путь до атрибута, поэтому вам понадобится специальный символ $, который означает корневой узел (есть еще текущий узел @ и групповой символ *).

Например, чтобы извлечь возраст людей, написать нужно следующее:

df.select(F.get_json_object("json", "$.age").alias("age")) \
  .show()
"""
+---+
|age|
+---+
| 47|
| 24|
+---+
"""

Можно не ограничиваться одним столбцом:

age = F.get_json_object("json", "$.age").alias("age")
name = F.get_json_object("json", "$.name").alias("name")
df.select(age, name).show()
"""
+---+----+
|age|name|
+---+----+
| 47|Alex|
| 24|Anna|
+---+----+
"""

Более того, глубина атрибута может быть больше 2. Все это подразумевает знания парсинга JSON. Мы лишь привели самой простой пример, как получить атрибуты, над которыми уже можно проводить дальнейший анализ.

Функции from_json, to_json и schema_of_json принимают еще один параметр со словарем в виде значения, в который можно передать дополнительные опции. О них можете узнать из документации.

Метод toJSON для преобразования таблицы в JSON

У объекта DataFrame есть метод toJSON, который преобразует таблицу в записи JSON. Допустим, имеется таблица:

df.show()
"""
+---+----+
|age|name|
+---+----+
| 47|Alex|
| 24|Anna|
+---+----+
"""

Тогда функция toJSON преобразует ее в RDD, состоящую из строк:

rdd = df.toJSON()
rdd.take(2)
"""
['{"age":"47","name":"Alex"}', '{"age":"24","name":"Anna"}']
"""

 

Больше подробностей о JSON в PySpark вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:

Записаться на курс

Смотреть раcписание

Источники
  1. from_json
  2. schema_of_json
  3. json_tuple
  4. get_json_object
  5. to_json

Добавить комментарий

Поиск по сайту