Pyspark:解析一列json字符串
- 2025-03-20 08:46:00
- admin 原创
- 42
问题描述:
我有一个由一列组成的 pyspark 数据框,称为json
,其中每行都是 json 的 unicode 字符串。我想解析每一行并返回一个新的数据框,其中每行都是解析后的 json。
# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])
我尝试使用以下方法映射每一行json.loads
:
(df
.select('json')
.rdd
.map(lambda x: json.loads(x))
.toDF()
).show()
但这会返回TypeError: expected string or buffer
我怀疑问题的一部分在于从 转换为 时dataframe
,rdd
架构信息丢失,因此我也尝试手动输入架构信息:
schema = StructType([StructField('json', StringType(), True)])
rdd = (df
.select('json')
.rdd
.map(lambda x: json.loads(x))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()
但我得到的结果是一样的TypeError
。
看看这个答案,看起来用展平行flatMap
可能在这里很有用,但我也没有成功:
schema = StructType([StructField('json', StringType(), True)])
rdd = (df
.select('json')
.rdd
.flatMap(lambda x: x)
.flatMap(lambda x: json.loads(x))
.map(lambda x: x.get('body'))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()
我收到此错误:AttributeError: 'unicode' object has no attribute 'get'
。
解决方案 1:
对于Spark 2.1+,您可以使用from_json
允许保存数据框内的其他非 json 列,如下所示:
from pyspark.sql.functions import from_json, col
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
df.withColumn('json', from_json(col('json'), json_schema))
您让 Spark 导出 json 字符串列的模式。然后该df.json
列不再是 StringType,而是正确解码的 json 结构,即嵌套,StrucType
并且所有其他列都df
按原样保留。
您可以按如下方式访问 json 内容:
df.select(col('json.header').alias('header'))
解决方案 2:
如果您之前将数据框转换为字符串的 RDD,那么在 spark 中将带有 json 字符串的数据框转换为结构化数据框实际上非常简单(请参阅: http: //spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets)
例如:
>>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))
>>> new_df.printSchema()
root
|-- body: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- name: string (nullable = true)
| |-- sub_json: struct (nullable = true)
| | |-- id: long (nullable = true)
| | |-- sub_sub_json: struct (nullable = true)
| | | |-- col1: long (nullable = true)
| | | |-- col2: string (nullable = true)
|-- header: struct (nullable = true)
| |-- foo: string (nullable = true)
| |-- id: long (nullable = true)
解决方案 3:
如果您的 JSON 格式不是完美/传统格式,则现有答案将不起作用。例如,基于 RDD 的架构推断需要花括号中的 JSON,并且如果您的数据如下所示,{}
则会提供不正确的架构(导致值):null
[
{
"a": 1.0,
"b": 1
},
{
"a": 0.0,
"b": 2
}
]
我编写了一个函数来解决这个问题,通过清理 JSON 使其存在于另一个 JSON 对象中:
def parseJSONCols(df, *cols, sanitize=True):
"""Auto infer the schema of a json column and parse into a struct.
rdd-based schema inference works if you have well-formatted JSON,
like ``{"key": "value", ...}``, but breaks if your 'JSON' is just a
string (``"data"``) or is an array (``[1, 2, 3]``). In those cases you
can fix everything by wrapping the data in another JSON object
(``{"key": [1, 2, 3]}``). The ``sanitize`` option (default True)
automatically performs the wrapping and unwrapping.
The schema inference is based on this
`SO Post <https://stackoverflow.com/a/45880574)/>`_.
Parameters
----------
df : pyspark dataframe
Dataframe containing the JSON cols.
*cols : string(s)
Names of the columns containing JSON.
sanitize : boolean
Flag indicating whether you'd like to sanitize your records
by wrapping and unwrapping them in another JSON object layer.
Returns
-------
pyspark dataframe
A dataframe with the decoded columns.
"""
res = df
for i in cols:
# sanitize if requested.
if sanitize:
res = (
res.withColumn(
i,
psf.concat(psf.lit('{"data": '), i, psf.lit('}'))
)
)
# infer schema and apply it
schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema
res = res.withColumn(i, psf.from_json(psf.col(i), schema))
# unpack the wrapped object if needed
if sanitize:
res = res.withColumn(i, psf.col(i).data)
return res
注意:psf
= pyspark.sql.functions
。
解决方案 4:
这是@nolan-conawayparseJSONCols
函数的简洁(spark SQL)版本。
SELECT
explode(
from_json(
concat('{"data":',
'[{"a": 1.0,"b": 1},{"a": 0.0,"b": 2}]',
'}'),
'data array<struct<a:DOUBLE, b:INT>>'
).data) as data;
PS. 我还添加了爆炸功能 :P
您需要了解一些HIVE SQL 类型
解决方案 5:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
def map2json(dict):
import json
return json.dumps(dict)
from pyspark.sql.types import StringType
spark.udf.register("map2json", lambda dict: map2json(dict), StringType())
spark.sql("select map2json(map('a', '1'))").show()
解决方案 6:
如果您不知道每个 JSON 的模式(并且它可能不同),您可以使用:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# ... here you get your DF
# Assuming the first column of your DF is the JSON to parse
my_df = spark.read.json(my_df.rdd.map(lambda x: x[0]))
请注意,它不会保留数据集中存在的任何其他列。来自:https ://github.com/apache/spark/pull/22775
解决方案 7:
如果您的 JSON 字符串是 JSON 数组而不是对象(我无法评论,因为我没有 rep),则此答案是为了增加上下文。如果您使用Martin Tapp 的可靠答案,它将为您的列返回空值。
总结
如果你的 JSON 字符串是数组对象,如下所示:
[{"a":1, "b":1.0}]
spark.read.json
将返回一个数据框,其中包含这些数组中元素的模式,而不包含数组本身。from_json
对此并不满意,因此为了尽可能具体,您可以将推断出的模式包装spark.read.json
在一个中ArrayType
,它将正确解析(而不是为所有内容返回空值)。
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType
array_item_schema = \n spark.read.json(df.rdd.map(lambda row: row['json_string_column'])).schema
json_array_schema = ArrayType(array_item_schema, True)
arrays_df = df.select(F.from_json('json_string_column', json_array_schema).alias('json_arrays'))
objects_df = arrays_df.select(F.explode('json_arrays').alias('objects'))
简介
作为 Nolan Conaway 的附录,似乎当你的 JSON 格式为
[
{
"a": 1.0,
"b": 1
},
{
"a": 0.0,
"b": 2
}
]
其中顶级对象是一个数组(而不是一个对象),pyspark 将spark.read.json()
数组视为要转换为行而不是单个行的对象集合。
查看在 PySpark 3.3.0 shell 中运行的示例:
>>> myjson = """[{"a": 1.0,"b": 1},{"a": 2.0,"b": 2}]"""
>>> myotherjson = """[{"a": 3.0,"b": 3}]"""
>>> rawobjectjson = """{"a": 4.0,"b": 4}"""
>>> spark_read_df = spark.read.json(sc.parallelize([myjson,myotherjson,rawobjectjson]))
>>> spark_read_df.show()
+---+---+
| a| b|
+---+---+
|1.0| 1|
|2.0| 2|
|3.0| 3|
|4.0| 4|
+---+---+
>>> spark_read_df.printSchema()
root
|-- a: double (nullable = true)
|-- b: long (nullable = true)
我们可以看到myjson
,myotherjson
JSON 对象的 JSON 数组被扩展为包含每个对象的一行。当其中一个 JSON 字符串rawobjectjson
只是一个原始对象时,它也能顺利处理。我认为文档在这里有点不足,因为我找不到关于数组对象的这种处理的提及。
现在让我们创建一个包含 JSON 字符串列的数据框。我们将删除rawobjectjson
因为,因为我们将看到from_json
要求每个字符串具有相同的架构(并且这包括顶级数组(如果存在))。
>>> from pyspark.sql.types import StructType, StructField, StringType, ArrayType
>>> json_string_data = [
... (myjson,),
... (myotherjson,),
... ]
>>> json_df_schema = StructType([
... StructField('json_strings', StringType(), True),
... ])
>>> raw_json_df = spark.createDataFrame(data=json_string_data, schema=json_df_schema)
>>> raw_json_df.show()
+--------------------+
| json_strings|
+--------------------+
|[{"a": 1.0,"b": 1...|
| [{"a": 3.0,"b": 3}]|
+--------------------+
现在,我尝试使用由 推断出的模式传递spark.read.json
给 来from_json
读取 JSON 列到对象,但它一直返回完全的列null
。正如 Nolan Conaway 提到的,当传递给 的模式from_json
无法应用于给定的字符串时,就会发生这种情况。
问题是,在这些字符串中,它将顶层视为一个数组,但如图spark_read_df.printSchema()
所示,推断出的模式spark.read.json()
忽略了数组级别。
解决方案
所以我最终选择的解决方案是在读取时仅考虑模式中的顶级数组。
from pyspark.sql import functions as F
# This one won't work for directly passing to from_json as it ignores top-level arrays in json strings
# (if any)!
# json_object_schema = spark_read_df.schema()
# from_json is a bit more "simple", it directly applies the schema to the string. In this case
# the top level type is actually an array, so a simple fix is to just wrap the schema that
# spark.read.json returned in an ArrayType to match the true JSON string
json_array_schema = ArrayType(spark_read_df.schema, True)
json_extracted_df = raw_json_df.select(
F.from_json('json_strings', json_array_schema)
.alias('json_arrays')
)
>>> json_extracted_df.show()
+--------------------+
| json_arrays|
+--------------------+
|[{1.0, 1}, {2.0, 2}]|
| [{3.0, 3}]|
+--------------------+
>>> json_extracted_df.printSchema()
root
|-- json_arrays: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: double (nullable = true)
| | |-- b: long (nullable = true)
从那里可以使用以下方法将对象从数组中拉出pyspark.sql.functions.explode
:
>>> exploded_df = json_extracted_df.select(F.explode('json_arrays').alias('objects'))
>>> exploded_df.show()
+--------+
| objects|
+--------+
|{1.0, 1}|
|{2.0, 2}|
|{3.0, 3}|
+--------+
>>> exploded_df.printSchema()
root
|-- objects: struct (nullable = true)
| |-- a: double (nullable = true)
| |-- b: long (nullable = true)
扫码咨询,免费领取项目管理大礼包!