Pyspark:解析一列json字符串

2025-03-20 08:46:00
admin
原创
41
摘要:问题描述:我有一个由一列组成的 pyspark 数据框,称为json,其中每行都是 json 的 unicode 字符串。我想解析每一行并返回一个新的数据框,其中每行都是解析后的 json。# Sample Data Frame jstr1 = u'{"header":{"id&...

问题描述:

我有一个由一列组成的 pyspark 数据框,称为json,其中每行都是 json 的 unicode 字符串。我想解析每一行并返回一个新的数据框,其中每行都是解析后的 json。

# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])

我尝试使用以下方法映射每一行json.loads

(df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
  .toDF()
).show()

但这会返回TypeError: expected string or buffer

我怀疑问题的一部分在于从 转换为 时dataframerdd架构信息丢失,因此我也尝试手动输入架构信息:

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

但我得到的结果是一样的TypeError

看看这个答案,看起来用展平行flatMap可能在这里很有用,但我也没有成功:

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .flatMap(lambda x: x)
  .flatMap(lambda x: json.loads(x))
  .map(lambda x: x.get('body'))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

我收到此错误:AttributeError: 'unicode' object has no attribute 'get'


解决方案 1:

对于Spark 2.1+,您可以使用from_json允许保存数据框内的其他非 json 列,如下所示:

from pyspark.sql.functions import from_json, col
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
df.withColumn('json', from_json(col('json'), json_schema))

您让 Spark 导出 json 字符串列的模式。然后该df.json列不再是 StringType,而是正确解码的 json 结构,即嵌套,StrucType并且所有其他列都df按原样保留。

您可以按如下方式访问 json 内容:

df.select(col('json.header').alias('header'))

解决方案 2:

如果您之前将数据框转换为字符串的 RDD,那么在 spark 中将带有 json 字符串的数据框转换为结构化数据框实际上非常简单(请参阅: http: //spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets)

例如:

>>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))
>>> new_df.printSchema()
root
 |-- body: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- sub_json: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- sub_sub_json: struct (nullable = true)
 |    |    |    |-- col1: long (nullable = true)
 |    |    |    |-- col2: string (nullable = true)
 |-- header: struct (nullable = true)
 |    |-- foo: string (nullable = true)
 |    |-- id: long (nullable = true)

解决方案 3:

如果您的 JSON 格式不是完美/传统格式,则现有答案将不起作用。例如,基于 RDD 的架构推断需要花括号中的 JSON,并且如果您的数据如下所示,{}则会提供不正确的架构(导致值):null

[
  {
    "a": 1.0,
    "b": 1
  },
  {
    "a": 0.0,
    "b": 2
  }
]

我编写了一个函数来解决这个问题,通过清理 JSON 使其存在于另一个 JSON 对象中:

def parseJSONCols(df, *cols, sanitize=True):
    """Auto infer the schema of a json column and parse into a struct.

    rdd-based schema inference works if you have well-formatted JSON,
    like ``{"key": "value", ...}``, but breaks if your 'JSON' is just a
    string (``"data"``) or is an array (``[1, 2, 3]``). In those cases you
    can fix everything by wrapping the data in another JSON object
    (``{"key": [1, 2, 3]}``). The ``sanitize`` option (default True)
    automatically performs the wrapping and unwrapping.

    The schema inference is based on this
    `SO Post <https://stackoverflow.com/a/45880574)/>`_.

    Parameters
    ----------
    df : pyspark dataframe
        Dataframe containing the JSON cols.
    *cols : string(s)
        Names of the columns containing JSON.
    sanitize : boolean
        Flag indicating whether you'd like to sanitize your records
        by wrapping and unwrapping them in another JSON object layer.

    Returns
    -------
    pyspark dataframe
        A dataframe with the decoded columns.
    """
    res = df
    for i in cols:

        # sanitize if requested.
        if sanitize:
            res = (
                res.withColumn(
                    i,
                    psf.concat(psf.lit('{"data": '), i, psf.lit('}'))
                )
            )
        # infer schema and apply it
        schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema
        res = res.withColumn(i, psf.from_json(psf.col(i), schema))

        # unpack the wrapped object if needed
        if sanitize:
            res = res.withColumn(i, psf.col(i).data)
    return res

注意:psf= pyspark.sql.functions

解决方案 4:

这是@nolan-conawayparseJSONCols函数的简洁(spark SQL)版本。

SELECT 
explode(
    from_json(
        concat('{"data":', 
               '[{"a": 1.0,"b": 1},{"a": 0.0,"b": 2}]', 
               '}'), 
        'data array<struct<a:DOUBLE, b:INT>>'
    ).data) as data;

PS. 我还添加了爆炸功能 :P

您需要了解一些HIVE SQL 类型

解决方案 5:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

def map2json(dict):
    import json
    return json.dumps(dict)
from pyspark.sql.types import StringType
spark.udf.register("map2json", lambda dict: map2json(dict), StringType())

spark.sql("select map2json(map('a', '1'))").show()

解决方案 6:

如果您不知道每个 JSON 的模式(并且它可能不同),您可以使用:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
 
# ... here you get your DF

# Assuming the first column of your DF is the JSON to parse
my_df = spark.read.json(my_df.rdd.map(lambda x: x[0]))

请注意,它不会保留数据集中存在的任何其他列。来自:https ://github.com/apache/spark/pull/22775

解决方案 7:

如果您的 JSON 字符串是 JSON 数组而不是对象(我无法评论,因为我没有 rep),则此答案是为了增加上下文。如果您使用Martin Tapp 的可靠答案,它将为您的列返回空值。

总结

如果你的 JSON 字符串是数组对象,如下所示:

[{"a":1, "b":1.0}]

spark.read.json将返回一个数据框,其中包含这些数组中元素的模式,而不包含数组本身。from_json对此并不满意,因此为了尽可能具体,您可以将推断出的模式包装spark.read.json在一个中ArrayType,它将正确解析(而不是为所有内容返回空值)。

from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType

array_item_schema = \n  spark.read.json(df.rdd.map(lambda row: row['json_string_column'])).schema

json_array_schema = ArrayType(array_item_schema, True)

arrays_df = df.select(F.from_json('json_string_column', json_array_schema).alias('json_arrays'))

objects_df = arrays_df.select(F.explode('json_arrays').alias('objects'))

简介

作为 Nolan Conaway 的附录,似乎当你的 JSON 格式为

[
  {
    "a": 1.0,
    "b": 1
  },
  {
    "a": 0.0,
    "b": 2
  }
]

其中顶级对象是一个数组(而不是一个对象),pyspark 将spark.read.json()数组视为要转换为行而不是单个行的对象集合。

查看在 PySpark 3.3.0 shell 中运行的示例:

>>> myjson        = """[{"a": 1.0,"b": 1},{"a": 2.0,"b": 2}]"""
>>> myotherjson   = """[{"a": 3.0,"b": 3}]"""
>>> rawobjectjson = """{"a": 4.0,"b": 4}"""
>>> spark_read_df = spark.read.json(sc.parallelize([myjson,myotherjson,rawobjectjson]))
>>> spark_read_df.show()
+---+---+
|  a|  b|
+---+---+
|1.0|  1|
|2.0|  2|
|3.0|  3|
|4.0|  4|
+---+---+

>>> spark_read_df.printSchema()
root
 |-- a: double (nullable = true)
 |-- b: long (nullable = true)

我们可以看到myjsonmyotherjsonJSON 对象的 JSON 数组被扩展为包含每个对象的一行。当其中一个 JSON 字符串rawobjectjson只是一个原始对象时,它也能顺利处理。我认为文档在这里有点不足,因为我找不到关于数组对象的这种处理的提及。

现在让我们创建一个包含 JSON 字符串列的数据框。我们将删除rawobjectjson因为,因为我们将看到from_json要求每个字符串具有相同的架构(并且这包括顶级数组(如果存在))。

>>> from pyspark.sql.types import StructType, StructField, StringType, ArrayType
>>> json_string_data = [
...     (myjson,),
...     (myotherjson,),
... ]
>>> json_df_schema = StructType([
...     StructField('json_strings', StringType(), True),
... ])
>>> raw_json_df = spark.createDataFrame(data=json_string_data, schema=json_df_schema)
>>> raw_json_df.show()
+--------------------+
|        json_strings|
+--------------------+
|[{"a": 1.0,"b": 1...|
| [{"a": 3.0,"b": 3}]|
+--------------------+

现在,我尝试使用由 推断出的模式传递spark.read.json给 来from_json读取 JSON 列到对象,但它一直返回完全的列null。正如 Nolan Conaway 提到的,当传递给 的模式from_json无法应用于给定的字符串时,就会发生这种情况。

问题是,在这些字符串中,它将顶层视为一个数组,但如图spark_read_df.printSchema()所示,推断出的模式spark.read.json()忽略了数组级别。

解决方案

所以我最终选择的解决方案是在读取时仅考虑模式中的顶级数组。

from pyspark.sql import functions as F

# This one won't work for directly passing to from_json as it ignores top-level arrays in json strings
# (if any)!
# json_object_schema = spark_read_df.schema()

# from_json is a bit more "simple", it directly applies the schema to the string. In this case
# the top level type is actually an array, so a simple fix is to just wrap the schema that
# spark.read.json returned in an ArrayType to match the true JSON string
json_array_schema = ArrayType(spark_read_df.schema, True)

json_extracted_df = raw_json_df.select(
    F.from_json('json_strings', json_array_schema)
        .alias('json_arrays')
)
>>> json_extracted_df.show()
+--------------------+
|         json_arrays|
+--------------------+
|[{1.0, 1}, {2.0, 2}]|
|          [{3.0, 3}]|
+--------------------+

>>> json_extracted_df.printSchema()
root
 |-- json_arrays: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: double (nullable = true)
 |    |    |-- b: long (nullable = true)

从那里可以使用以下方法将对象从数组中拉出pyspark.sql.functions.explode

>>> exploded_df = json_extracted_df.select(F.explode('json_arrays').alias('objects'))
>>> exploded_df.show()
+--------+
| objects|
+--------+
|{1.0, 1}|
|{2.0, 2}|
|{3.0, 3}|
+--------+

>>> exploded_df.printSchema()
root
 |-- objects: struct (nullable = true)
 |    |-- a: double (nullable = true)
 |    |-- b: long (nullable = true)
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2482  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1533  
  PLM(产品生命周期管理)项目对于企业优化产品研发流程、提升产品质量以及增强市场竞争力具有至关重要的意义。然而,在项目推进过程中,范围蔓延是一个常见且棘手的问题,它可能导致项目进度延迟、成本超支以及质量下降等一系列不良后果。因此,有效避免PLM项目范围蔓延成为项目成功的关键因素之一。以下将详细阐述三大管控策略,助力企业...
plm系统   0  
  PLM(产品生命周期管理)项目管理在企业产品研发与管理过程中扮演着至关重要的角色。随着市场竞争的加剧和产品复杂度的提升,PLM项目面临着诸多风险。准确量化风险优先级并采取有效措施应对,是确保项目成功的关键。五维评估矩阵作为一种有效的风险评估工具,能帮助项目管理者全面、系统地评估风险,为决策提供有力支持。五维评估矩阵概述...
免费plm软件   0  
  引言PLM(产品生命周期管理)开发流程对于企业产品的全生命周期管控至关重要。它涵盖了从产品概念设计到退役的各个阶段,直接影响着产品质量、开发周期以及企业的市场竞争力。在当今快速发展的科技环境下,客户对产品质量的要求日益提高,市场竞争也愈发激烈,这就使得优化PLM开发流程成为企业的必然选择。缺陷管理工具和六西格玛方法作为...
plm产品全生命周期管理   0  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用