如何将 PySpark 中的表格数据框导出为 csv?
- 2025-04-16 08:58:00
- admin 原创
- 16
问题描述:
我正在使用 Spark 1.3.1 (PySpark),并使用 SQL 查询生成了一个表。现在我有一个对象DataFrame
。我想将此DataFrame
对象(我将其称为“表”)导出到 csv 文件,以便对其进行操作并绘制列。如何将DataFrame
“表”导出到 csv 文件?
解决方案 1:
如果数据框适合驱动程序内存,并且您想要保存到本地文件系统,则可以使用方法将Spark DataFrame转换为本地Pandas DataFrametoPandas
,然后只需使用to_csv
:
df.toPandas().to_csv('mycsv.csv')
否则,您可以使用spark-csv:
Spark 1.3
df.save('mycsv.csv', 'com.databricks.spark.csv')
Spark 1.4+
df.write.format('com.databricks.spark.csv').save('mycsv.csv')
在 Spark 2.0+ 中,您可以csv
直接使用数据源:
df.write.csv('mycsv.csv')
解决方案 2:
对于 Apache Spark 2+,为了将数据帧保存到单个 csv 文件中,请使用以下命令
query.repartition(1).write.csv("cc_out.csv", sep='|')
这里1
表明我只需要一个 csv 分区。你可以根据自己的要求进行更改。
解决方案 3:
如果无法使用spark-csv,可以执行以下操作:
df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv")
如果你需要处理带有换行符或逗号的字符串,那么这种方法就行不通了。使用这个:
import csv
import cStringIO
def row2csv(row):
buffer = cStringIO.StringIO()
writer = csv.writer(buffer)
writer.writerow([str(s).encode("utf-8") for s in row])
buffer.seek(0)
return buffer.read().strip()
df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv")
解决方案 4:
您需要将 Dataframe 重新分区到单个分区中,然后以 Unix 文件系统格式定义文件的格式、路径和其他参数,如下所示,
df.repartition(1).write.format('com.databricks.spark.csv').save("/path/to/file/myfile.csv",header = 'true')
阅读有关重新分区功能的更多信息
阅读有关保存功能的更多信息
然而,repartition 函数开销较大,而 toPandas() 函数的性能最差。为了获得更好的性能,请尝试使用 .coalesce(1) 代替之前的语法中的 .repartition(1)。
阅读有关重新分区与合并功能的更多信息。
解决方案 5:
使用 PySpark
在 Spark 3.0+ 中写入 csv 的最简单方法
sdf.write.csv("/path/to/csv/data.csv")
根据您使用的 Spark 节点数量,此命令可以生成多个文件。如果您希望将其合并到单个文件中,请使用 repartition 功能。
sdf.repartition(1).write.csv("/path/to/csv/data.csv")
-或者更有效的选择-
sdf.coalesce(1).write.csv("/path/to/csv/data.csv")
使用 Pandas
如果你的数据不是太多,并且可以保存在本地 Python 中,那么你也可以使用 Pandas
sdf.toPandas().to_csv("/path/to/csv/data.csv", index=False)
使用考拉
sdf.to_koalas().to_csv("/path/to/csv/data.csv", index=False)
解决方案 6:
这样怎么样(如果你不想要一行的话)?
for row in df.collect():
d = row.asDict()
s = "%d %s %s
" % (d["int_column"], d["string_column"], d["string_column"])
f.write(s)
f 是打开的文件描述符。分隔符也是 TAB 字符,但您可以轻松将其更改为任何您想要的字符。
解决方案 7:
'''
I am late to the pary but: this will let me rename the file, move it to a desired directory and delete the unwanted additional directory spark made
'''
import shutil
import os
import glob
path = 'test_write'
#write single csv
students.repartition(1).write.csv(path)
#rename and relocate the csv
shutil.move(glob.glob(os.getcwd() + '\\' + path + '\\' + r'*.csv')[0], os.getcwd()+ '\\' + path+ '.csv')
#remove additional directory
shutil.rmtree(os.getcwd()+'\\'+path)
解决方案 8:
我在 Pandas 上用过这个方法,但性能很差。最后耗费了太长时间,我决定放弃,转而寻找其他方法。
如果您正在寻找一种写入一个 csv 而不是多个 csv 的方法,那么这就是您正在寻找的:
df.coalesce(1).write.csv("train_dataset_processed", header=True)
它将我的数据集处理时间从 2 小时以上缩短到 2 分钟
解决方案 9:
尝试使用 display(df) 并在结果中使用下载选项。请注意:此选项只能下载 100 万行数据,但速度非常快。
扫码咨询,免费领取项目管理大礼包!