如何在 Spark 中对两个具有不同列数的 DataFrame 执行联合?
- 2025-03-14 08:57:00
- admin 原创
- 58
问题描述:
我有 2DataFrame
个:
我需要这样的联盟:
由于列的数量和名称不同,该unionAll
函数不起作用。
我怎样才能做到这一点?
解决方案 1:
Spark 3.1+
df = df1.unionByName(df2, allowMissingColumns=True)
测试结果:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data1=[
(1 , '2016-08-29', 1 , 2, 3),
(2 , '2016-08-29', 1 , 2, 3),
(3 , '2016-08-29', 1 , 2, 3)]
df1 = spark.createDataFrame(data1, ['code' , 'date' , 'A' , 'B', 'C'])
data2=[
(5 , '2016-08-29', 1, 2, 3, 4),
(6 , '2016-08-29', 1, 2, 3, 4),
(7 , '2016-08-29', 1, 2, 3, 4)]
df2 = spark.createDataFrame(data2, ['code' , 'date' , 'B', 'C', 'D', 'E'])
df = df1.unionByName(df2, allowMissingColumns=True)
df.show()
# +----+----------+----+---+---+----+----+
# |code| date| A| B| C| D| E|
# +----+----------+----+---+---+----+----+
# | 1|2016-08-29| 1| 2| 3|null|null|
# | 2|2016-08-29| 1| 2| 3|null|null|
# | 3|2016-08-29| 1| 2| 3|null|null|
# | 5|2016-08-29|null| 1| 2| 3| 4|
# | 6|2016-08-29|null| 1| 2| 3| 4|
# | 7|2016-08-29|null| 1| 2| 3| 4|
# +----+----------+----+---+---+----+----+
Spark 2.3+
diff1 = [c for c in df2.columns if c not in df1.columns]
diff2 = [c for c in df1.columns if c not in df2.columns]
df = df1.select('*', *[F.lit(None).alias(c) for c in diff1]) \n .unionByName(df2.select('*', *[F.lit(None).alias(c) for c in diff2]))
测试结果:
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
data1=[
(1 , '2016-08-29', 1 , 2, 3),
(2 , '2016-08-29', 1 , 2, 3),
(3 , '2016-08-29', 1 , 2, 3)]
df1 = spark.createDataFrame(data1, ['code' , 'date' , 'A' , 'B', 'C'])
data2=[
(5 , '2016-08-29', 1, 2, 3, 4),
(6 , '2016-08-29', 1, 2, 3, 4),
(7 , '2016-08-29', 1, 2, 3, 4)]
df2 = spark.createDataFrame(data2, ['code' , 'date' , 'B', 'C', 'D', 'E'])
diff1 = [c for c in df2.columns if c not in df1.columns]
diff2 = [c for c in df1.columns if c not in df2.columns]
df = df1.select('*', *[F.lit(None).alias(c) for c in diff1]) \n .unionByName(df2.select('*', *[F.lit(None).alias(c) for c in diff2]))
df.show()
# +----+----------+----+---+---+----+----+
# |code| date| A| B| C| D| E|
# +----+----------+----+---+---+----+----+
# | 1|2016-08-29| 1| 2| 3|null|null|
# | 2|2016-08-29| 1| 2| 3|null|null|
# | 3|2016-08-29| 1| 2| 3|null|null|
# | 5|2016-08-29|null| 1| 2| 3| 4|
# | 6|2016-08-29|null| 1| 2| 3| 4|
# | 7|2016-08-29|null| 1| 2| 3| 4|
# +----+----------+----+---+---+----+----+
解决方案 2:
在 Scala 中,您只需将所有缺失的列附加为nulls
。
import org.apache.spark.sql.functions._
// let df1 and df2 the Dataframes to merge
val df1 = sc.parallelize(List(
(50, 2),
(34, 4)
)).toDF("age", "children")
val df2 = sc.parallelize(List(
(26, true, 60000.00),
(32, false, 35000.00)
)).toDF("age", "education", "income")
val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union
def expr(myCols: Set[String], allCols: Set[String]) = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
})
}
df1.select(expr(cols1, total):_*).unionAll(df2.select(expr(cols2, total):_*)).show()
+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 50| 2| null| null|
| 34| 4| null| null|
| 26| null| true|60000.0|
| 32| null| false|35000.0|
+---+--------+---------+-------+
更新
两个时间DataFrames
都会有相同的列顺序,因为我们total
在两种情况下都进行了映射。
df1.select(expr(cols1, total):_*).show()
df2.select(expr(cols2, total):_*).show()
+---+--------+---------+------+
|age|children|education|income|
+---+--------+---------+------+
| 50| 2| null| null|
| 34| 4| null| null|
+---+--------+---------+------+
+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 26| null| true|60000.0|
| 32| null| false|35000.0|
+---+--------+---------+-------+
解决方案 3:
这是我的 Python 版本:
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql.functions import lit
from pyspark.sql import Row
def customUnion(df1, df2):
cols1 = df1.columns
cols2 = df2.columns
total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
def expr(mycols, allcols):
def processCols(colname):
if colname in mycols:
return colname
else:
return lit(None).alias(colname)
cols = map(processCols, allcols)
return list(cols)
appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
return appended
以下是示例用法:
data = [
Row(zip_code=58542, dma='MIN'),
Row(zip_code=58701, dma='MIN'),
Row(zip_code=57632, dma='MIN'),
Row(zip_code=58734, dma='MIN')
]
firstDF = spark.createDataFrame(data)
data = [
Row(zip_code='534', name='MIN'),
Row(zip_code='353', name='MIN'),
Row(zip_code='134', name='MIN'),
Row(zip_code='245', name='MIN')
]
secondDF = spark.createDataFrame(data)
customUnion(firstDF,secondDF).show()
解决方案 4:
以下是使用 pyspark 的 Python 3.0 代码:
from pyspark.sql.functions import lit
def __order_df_and_add_missing_cols(df, columns_order_list, df_missing_fields):
""" return ordered dataFrame by the columns order list with null in missing columns """
if not df_missing_fields: # no missing fields for the df
return df.select(columns_order_list)
else:
columns = []
for colName in columns_order_list:
if colName not in df_missing_fields:
columns.append(colName)
else:
columns.append(lit(None).alias(colName))
return df.select(columns)
def __add_missing_columns(df, missing_column_names):
""" Add missing columns as null in the end of the columns list """
list_missing_columns = []
for col in missing_column_names:
list_missing_columns.append(lit(None).alias(col))
return df.select(df.schema.names + list_missing_columns)
def __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols):
""" return union of data frames with ordered columns by left_df. """
left_df_all_cols = __add_missing_columns(left_df, left_list_miss_cols)
right_df_all_cols = __order_df_and_add_missing_cols(right_df, left_df_all_cols.schema.names,
right_list_miss_cols)
return left_df_all_cols.union(right_df_all_cols)
def union_d_fs(left_df, right_df):
""" Union between two dataFrames, if there is a gap of column fields,
it will append all missing columns as nulls """
# Check for None input
if left_df is None:
raise ValueError('left_df parameter should not be None')
if right_df is None:
raise ValueError('right_df parameter should not be None')
# For data frames with equal columns and order- regular union
if left_df.schema.names == right_df.schema.names:
return left_df.union(right_df)
else: # Different columns
# Save dataFrame columns name list as set
left_df_col_list = set(left_df.schema.names)
right_df_col_list = set(right_df.schema.names)
# Diff columns between left_df and right_df
right_list_miss_cols = list(left_df_col_list - right_df_col_list)
left_list_miss_cols = list(right_df_col_list - left_df_col_list)
return __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols)
解决方案 5:
一个非常简单的方法来做到这一点 -select
数据框中的列按相同的顺序使用unionAll
df1.select('code', 'date', 'A', 'B', 'C', lit(None).alias('D'), lit(None).alias('E'))\n .unionAll(df2.select('code', 'date', lit(None).alias('A'), 'B', 'C', 'D', 'E'))
解决方案 6:
这是一个 pyspark 解决方案。
它假设如果 中的字段df1
缺失df2
,则将该缺失字段添加到df2
并使用空值。但是它还假设如果该字段存在于两个数据框中,但该字段的类型或可空性不同,则两个数据框会发生冲突并且无法组合。在这种情况下,我提出了TypeError
。
from pyspark.sql.functions import lit
def harmonize_schemas_and_combine(df_left, df_right):
left_types = {f.name: f.dataType for f in df_left.schema}
right_types = {f.name: f.dataType for f in df_right.schema}
left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)
# First go over left-unique fields
for l_name, l_type, l_nullable in left_fields.difference(right_fields):
if l_name in right_types:
r_type = right_types[l_name]
if l_type != r_type:
raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s" % (l_name, l_nullable, not(l_nullable))
df_right = df_right.withColumn(l_name, lit(None).cast(l_type))
# Now go over right-unique fields
for r_name, r_type, r_nullable in right_fields.difference(left_fields):
if r_name in left_types:
l_type = left_types[r_name]
if r_type != l_type:
raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
df_left = df_left.withColumn(r_name, lit(None).cast(r_type))
# Make sure columns are in the same order
df_left = df_left.select(df_right.columns)
return df_left.union(df_right)
解决方案 7:
lit(None)
如果你只是使用简单的解决方法(这也是我知道的唯一方法),我发现这里的大多数 python 答案写得有点太笨拙了。作为替代方案,这可能会有用:
# df1 and df2 are assumed to be the given dataFrames from the question
# Get the lacking columns for each dataframe and set them to null in the respective dataFrame.
# First do so for df1...
for column in [column for column in df1.columns if column not in df2.columns]:
df1 = df1.withColumn(column, lit(None))
# ... and then for df2
for column in [column for column in df2.columns if column not in df1.columns]:
df2 = df2.withColumn(column, lit(None))
之后只需执行union()
您想执行的操作。
注意:如果您的列顺序df1
和df2
使用不同unionByName()
!
result = df1.unionByName(df2)
解决方案 8:
修改了 Alberto Bonsanto 的版本以保留原始列顺序(OP 暗示顺序应与原始表格一致)。此外,该match
部分引发了 Intellij 警告。
这是我的版本:
def unionDifferentTables(df1: DataFrame, df2: DataFrame): DataFrame = {
val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union
val order = df1.columns ++ df2.columns
val sorted = total.toList.sortWith((a,b)=> order.indexOf(a) < order.indexOf(b))
def expr(myCols: Set[String], allCols: List[String]) = {
allCols.map( {
case x if myCols.contains(x) => col(x)
case y => lit(null).as(y)
})
}
df1.select(expr(cols1, sorted): _*).unionAll(df2.select(expr(cols2, sorted): _*))
}
解决方案 9:
在pyspark中:
df = df1.join(df2, ['each', 'shared', 'col'], how='full')
解决方案 10:
我遇到了同样的问题,使用 join 而不是 union 解决了我的问题。因此,例如使用python,而不是这行代码:
result = left.union(right)
,这将无法针对不同数量的列执行,您应该使用这个:
result = left.join(right, left.columns if (len(left.columns) < len(right.columns)) else right.columns, "outer")
请注意,第二个参数包含两个 DataFrame 之间的公共列。如果不使用它,结果将具有重复的列,其中一个为空,另一个不为空。希望对您有所帮助。
解决方案 11:
有许多简洁的方法可以解决这个问题,但需要适度牺牲性能。
def unionWithDifferentSchema(a: DataFrame, b: DataFrame): DataFrame = {
sparkSession.read.json(a.toJSON.union(b.toJSON).rdd)
}
这是可以解决问题的函数。对每个数据框使用 toJSON 会生成一个 json Union。这样可以保留顺序和数据类型。
唯一的缺点是 toJSON 相对昂贵(不过不多,速度可能会减慢 10-15%)。不过这可以保持代码整洁。
解决方案 12:
我的 Java 版本:
private static Dataset<Row> unionDatasets(Dataset<Row> one, Dataset<Row> another) {
StructType firstSchema = one.schema();
List<String> anotherFields = Arrays.asList(another.schema().fieldNames());
another = balanceDataset(another, firstSchema, anotherFields);
StructType secondSchema = another.schema();
List<String> oneFields = Arrays.asList(one.schema().fieldNames());
one = balanceDataset(one, secondSchema, oneFields);
return another.unionByName(one);
}
private static Dataset<Row> balanceDataset(Dataset<Row> dataset, StructType schema, List<String> fields) {
for (StructField e : schema.fields()) {
if (!fields.contains(e.name())) {
dataset = dataset
.withColumn(e.name(),
lit(null));
dataset = dataset.withColumn(e.name(),
dataset.col(e.name()).cast(Optional.ofNullable(e.dataType()).orElse(StringType)));
}
}
return dataset;
}
解决方案 13:
此函数接收两个具有不同架构的数据框(df1 和 df2)并将它们合并。首先,我们需要通过将所有(缺失的)列从 df1 添加到 df2 并反之亦然,将它们带到同一个架构。要向 df 添加新的空列,我们需要指定数据类型。
import pyspark.sql.functions as F
def union_different_schemas(df1, df2):
# Get a list of all column names in both dfs
columns_df1 = df1.columns
columns_df2 = df2.columns
# Get a list of datatypes of the columns
data_types_df1 = [i.dataType for i in df1.schema.fields]
data_types_df2 = [i.dataType for i in df2.schema.fields]
# We go through all columns in df1 and if they are not in df2, we add
# them (and specify the correct datatype too)
for col, typ in zip(columns_df1, data_types_df1):
if col not in df2.columns:
df2 = df2\n .withColumn(col, F.lit(None).cast(typ))
# Now df2 has all missing columns from df1, let's do the same for df1
for col, typ in zip(columns_df2, data_types_df2):
if col not in df1.columns:
df1 = df1\n .withColumn(col, F.lit(None).cast(typ))
# Now df1 and df2 have the same columns, not necessarily in the same
# order, therefore we use unionByName
combined_df = df1\n .unionByName(df2)
return combined_df
解决方案 14:
这是 Scala 中的版本,也在这里回答了,还有一个 Pyspark 版本..(Spark - 将具有不同模式(列名和序列)的 DataFrame 合并/合并到具有主通用模式的 DataFrame) -
它需要合并数据框列表。所有数据框中相同名称的列应该具有相同的数据类型。
def unionPro(DFList: List[DataFrame], spark: org.apache.spark.sql.SparkSession): DataFrame = {
/**
* This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
* Creates a Unioned DataFrame
*/
import spark.implicits._
val MasterColList: Array[String] = DFList.map(_.columns).reduce((x, y) => (x.union(y))).distinct
def unionExpr(myCols: Seq[String], allCols: Seq[String]): Seq[org.apache.spark.sql.Column] = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
})
}
// Create EmptyDF , ignoring different Datatype in StructField and treating them same based on Name ignoring cases
val masterSchema = StructType(DFList.map(_.schema.fields).reduce((x, y) => (x.union(y))).groupBy(_.name.toUpperCase).map(_._2.head).toArray)
val masterEmptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], masterSchema).select(MasterColList.head, MasterColList.tail: _*)
DFList.map(df => df.select(unionExpr(df.columns, MasterColList): _*)).foldLeft(masterEmptyDF)((x, y) => x.union(y))
}
以下是示例测试 -
val aDF = Seq(("A", 1), ("B", 2)).toDF("Name", "ID")
val bDF = Seq(("C", 1, "D1"), ("D", 2, "D2")).toDF("Name", "Sal", "Deptt")
unionPro(List(aDF, bDF), spark).show
输出结果为 -
+----+----+----+-----+
|Name| ID| Sal|Deptt|
+----+----+----+-----+
| A| 1|null| null|
| B| 2|null| null|
| C|null| 1| D1|
| D|null| 2| D2|
+----+----+----+-----+
解决方案 15:
派斯帕克
Alberto 的 Scala 版本运行良好。但是,如果你想要进行 for 循环或一些变量的动态赋值,你可能会遇到一些问题。解决方案来自 Pyspark - 干净的代码:
from pyspark.sql.functions import *
#defining dataframes
df1 = spark.createDataFrame(
[
(1, 'foo','ok'),
(2, 'pro','ok')
],
['id', 'txt','check']
)
df2 = spark.createDataFrame(
[
(3, 'yep',13,'mo'),
(4, 'bro',11,'re')
],
['id', 'txt','value','more']
)
#retrieving columns
cols1 = df1.columns
cols2 = df2.columns
#getting columns from df1 and df2
total = list(set(cols2) | set(cols1))
#defining function for adding nulls (None in case of pyspark)
def addnulls(yourDF):
for x in total:
if not x in yourDF.columns:
yourDF = yourDF.withColumn(x,lit(None))
return yourDF
df1 = addnulls(df1)
df2 = addnulls(df2)
#additional sorting for correct unionAll (it concatenates DFs by column number)
df1.select(sorted(df1.columns)).unionAll(df2.select(sorted(df2.columns))).show()
+-----+---+----+---+-----+
|check| id|more|txt|value|
+-----+---+----+---+-----+
| ok| 1|null|foo| null|
| ok| 2|null|pro| null|
| null| 3| mo|yep| 13|
| null| 4| re|bro| 11|
+-----+---+----+---+-----+
解决方案 16:
from functools import reduce
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
def unionAll(*dfs, fill_by=None):
clmns = {clm.name.lower(): (clm.dataType, clm.name) for df in dfs for clm in df.schema.fields}
dfs = list(dfs)
for i, df in enumerate(dfs):
df_clmns = [clm.lower() for clm in df.columns]
for clm, (dataType, name) in clmns.items():
if clm not in df_clmns:
# Add the missing column
dfs[i] = dfs[i].withColumn(name, F.lit(fill_by).cast(dataType))
return reduce(DataFrame.unionByName, dfs)
unionAll(df1, df2).show()
不区分大小写的列
将返回实际的列大小写
支持现有的数据类型
默认值可自定义
一次传递多个数据框(例如 unionAll(df1, df2, df3, ..., df10))
解决方案 17:
这是另一个:
def unite(df1: DataFrame, df2: DataFrame): DataFrame = {
val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = (cols1 ++ cols2).toSeq.sorted
val expr1 = total.map(c => {
if (cols1.contains(c)) c else "NULL as " + c
})
val expr2 = total.map(c => {
if (cols2.contains(c)) c else "NULL as " + c
})
df1.selectExpr(expr1:_*).union(
df2.selectExpr(expr2:_*)
)
}
解决方案 18:
Pyspark DataFrame 连接的联合和外联合。这适用于具有不同列的多个数据框。
def union_all(*dfs):
return reduce(ps.sql.DataFrame.unionAll, dfs)
def outer_union_all(*dfs):
all_cols = set([])
for df in dfs:
all_cols |= set(df.columns)
all_cols = list(all_cols)
print(all_cols)
def expr(cols, all_cols):
def append_cols(col):
if col in cols:
return col
else:
return sqlfunc.lit(None).alias(col)
cols_ = map(append_cols, all_cols)
return list(cols_)
union_df = union_all(*[df.select(expr(df.columns, all_cols)) for df in dfs])
return union_df
解决方案 19:
还有一个用于合并列表的通用方法DataFrame
。
def unionFrames(dfs: Seq[DataFrame]): DataFrame = {
dfs match {
case Nil => session.emptyDataFrame // or throw an exception?
case x :: Nil => x
case _ =>
//Preserving Column order from left to right DF's column order
val allColumns = dfs.foldLeft(collection.mutable.ArrayBuffer.empty[String])((a, b) => a ++ b.columns).distinct
val appendMissingColumns = (df: DataFrame) => {
val columns = df.columns.toSet
df.select(allColumns.map(c => if (columns.contains(c)) col(c) else lit(null).as(c)): _*)
}
dfs.tail.foldLeft(appendMissingColumns(dfs.head))((a, b) => a.union(appendMissingColumns(b)))
}
解决方案 20:
这是我的 pyspark 版本:
from functools import reduce
from pyspark.sql.functions import lit
def concat(dfs):
# when the dataframes to combine do not have the same order of columns
# https://datascience.stackexchange.com/a/27231/15325
return reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)
def union_all(dfs):
columns = reduce(lambda x, y : set(x).union(set(y)), [ i.columns for i in dfs ] )
for i in range(len(dfs)):
d = dfs[i]
for c in columns:
if c not in d.columns:
d = d.withColumn(c, lit(None))
dfs[i] = d
return concat(dfs)
解决方案 21:
或者您可以使用完全连接。
list_of_files = ['test1.parquet', 'test2.parquet']
def merged_frames():
if list_of_files:
frames = [spark.read.parquet(df.path) for df in list_of_files]
if frames:
df = frames[0]
if frames[1]:
var = 1
for element in range(len(frames)-1):
result_df = df.join(frames[var], 'primary_key', how='full')
var += 1
display(result_df)
解决方案 22:
如果您正在从文件加载,我想您只需使用带有文件列表的读取功能即可。
# file_paths is list of files with different schema
df = spark.read.option("mergeSchema", "true").json(file_paths)
生成的数据框将具有合并的列。
扫码咨询,免费领取项目管理大礼包!