使用 PySpark 加载 CSV 文件

2025-03-04 08:24:00
admin
原创
67
摘要:问题描述:我是 Spark 新手,正在尝试使用 Spark 从文件中读取 CSV 数据。以下是我正在做的事情:sc.textFile('file.csv') .map(lambda line: (line.split(',')[0], line.split(',')[1])) .collect...

问题描述:

我是 Spark 新手,正在尝试使用 Spark 从文件中读取 CSV 数据。以下是我正在做的事情:

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()

我希望这个调用能给我提供文件前两列的列表,但是我收到了这个错误:

文件“”,第 1 行,IndexError:列表索引超出范围

尽管我的 CSV 文件有多列。


解决方案 1:

Spark 2.0.0+

您可以直接使用内置的 csv 数据源:

spark.read.csv(
    "some_input_file.csv", 
    header=True, 
    mode="DROPMALFORMED", 
    schema=schema
)

或者

(
    spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv")
)

不包括任何外部依赖。

Spark < 2.0.0

我建议不要进行手动解析,因为在一般情况下,手动解析并不是一件简单的事spark-csv

确保 Spark CSV 包含在路径中 ( --packages, --jars, --driver-class-path)

并按如下方式加载数据:

df = (
    sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv")
)

它可以处理加载、模式推断、删除格式错误的行,并且不需要将数据从 Python 传递到 JVM。

笔记

如果您知道架构,最好避免架构推断并将其传递给DataFrameReader。假设您有三列 - 整数、双精度和字符串:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(
    sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv")
)

解决方案 2:

你确定所有行都至少有 2 列吗?你可以尝试类似以下方法检查一下吗?:

sc.textFile("file.csv") \n    .map(lambda line: line.split(",")) \n    .filter(lambda line: len(line)>1) \n    .map(lambda line: (line[0],line[1])) \n    .collect()

或者,你可以打印罪魁祸首(如果有的话):

sc.textFile("file.csv") \n    .map(lambda line: line.split(",")) \n    .filter(lambda line: len(line)<=1) \n    .collect()

解决方案 3:

from pyspark.sql import SparkSession

spark = SparkSession \n    .builder \n    .appName("Python Spark SQL basic example") \n    .config("spark.some.config.option", "some-value") \n    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|")

print(df.collect())

解决方案 4:

另一个选项是使用 Pandas 读取 CSV 文件,然后将 Pandas DataFrame 导入 Spark。

例如:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)

解决方案 5:

简单地用逗号拆分也会拆分字段内的逗号(例如a,b,"1,2,3",c),因此不建议这样做。如果您想使用 DataFrames API, zero323 的答案很好,但如果您想坚持使用基本 Spark,您可以使用csv模块在基本 Python 中解析 csvs :

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

编辑:正如@muon在评论中提到的那样,这会将标题视为任何其他行,因此您需要手动提取它。例如,header = rdd.first(); rdd = rdd.filter(lambda x: x != header)(确保header在过滤器评估之前不要修改)。但此时,您最好使用内置的csv解析器。

解决方案 6:

这是在PYSPARK中

path="Your file path with file name"

df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(path)

然后你可以检查

df.show(5)
df.count()

解决方案 7:

如果您想将 csv 作为数据框加载,那么您可以执行以下操作:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv') \n    .options(header='true', inferschema='true') \n    .load('sampleFile.csv') # this is your csv file

对我来说效果很好。

解决方案 8:

这与JP Mercier 最初关于使用 Pandas 的建议一致,但有一个重大修改:如果你将数据分块读入 Pandas,它应该更具可塑性。这意味着,你可以解析比 Pandas 实际可以处理的更大的文件,并将其以较小的尺寸传递给 Spark。(这也回答了为什么人们会想使用 Spark,如果他们可以将所有内容加载到 Pandas 中。)

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)

for chunky in chunk_100k:
    Spark_Full +=  sc.parallelize(chunky.values.tolist())

YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()

解决方案 9:

现在,对于任何通用 csv 文件还有另一个选项:https://github.com/seahboonsiew/pyspark-csv如下所示:

假设我们有以下上下文

sc = SparkContext
sqlCtx = SQLContext or HiveContext

首先,使用 SparkContext 将 pyspark-csv.py 分发给执行器

import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')

通过SparkContext读取csv数据并转换为DataFrame

plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)

解决方案 10:

如果你的 csv 数据恰好在任何字段中不包含换行符,你可以加载数据textFile()并进行解析

import csv
import StringIO

def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name1", "name2"])
    return reader.next()

input = sc.textFile(inputFile).map(loadRecord)

解决方案 11:

如果数据集中任何一行或多行的列数少于或多于 2,则可能会出现此错误。

我也是 Pyspark 的新手,正在尝试读取 CSV 文件。以下代码对我有用:

在此代码中,我使用了来自 kaggle 的数据集,链接是:https://www.kaggle.com/carrie1/ecommerce-data

1. 不提及架构:

from pyspark.sql import SparkSession  
scSpark = SparkSession \n    .builder \n    .appName("Python Spark SQL basic example: Reading CSV file without mentioning schema") \n    .config("spark.some.config.option", "some-value") \n    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",")
sdfData.show()

现在检查列:sdfData.columns

输出将是:

['InvoiceNo', 'StockCode','Description','Quantity', 'InvoiceDate', 'CustomerID', 'Country']

检查每列的数据类型:

sdfData.schema
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))

这将使数据框中的所有列的数据类型为 StringType

2. 使用模式:
如果您知道模式或者想要更改上表中任何列的数据类型,请使用此功能(假设我有以下列,并且希望每个列都具有特定的数据类型)

from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
    schema = StructType([\n        StructField("InvoiceNo", IntegerType()),\n        StructField("StockCode", StringType()), \n        StructField("Description", StringType()),\n        StructField("Quantity", IntegerType()),\n        StructField("InvoiceDate", StringType()),\n        StructField("CustomerID", DoubleType()),\n        StructField("Country", StringType())\n    ])

scSpark = SparkSession \n    .builder \n    .appName("Python Spark SQL example: Reading CSV file with schema") \n    .config("spark.some.config.option", "some-value") \n    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",", schema=schema)

现在检查每列的数据类型的架构:

sdfData.schema

StructType(List(StructField(InvoiceNo,IntegerType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

编辑:我们也可以使用以下代码行,而无需明确提及模式:

sdfData = scSpark.read.csv("data.csv", header=True, inferSchema = True)
sdfData.schema

输出为:

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))

输出将如下所示:

sdfData.show()

+---------+---------+--------------------+--------+--------------+----------+-------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|CustomerID|Country|
+---------+---------+--------------------+--------+--------------+----------+-------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|      2.55|  17850|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|      2.75|  17850|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|      7.65|  17850|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|      4.25|  17850|
|   536366|    22633|HAND WARMER UNION...|       6|12/1/2010 8:28|      1.85|  17850|
|   536366|    22632|HAND WARMER RED P...|       6|12/1/2010 8:28|      1.85|  17850|
|   536367|    84879|ASSORTED COLOUR B...|      32|12/1/2010 8:34|      1.69|  13047|
|   536367|    22745|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22748|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22749|FELTCRAFT PRINCES...|       8|12/1/2010 8:34|      3.75|  13047|
|   536367|    22310|IVORY KNITTED MUG...|       6|12/1/2010 8:34|      1.65|  13047|
|   536367|    84969|BOX OF 6 ASSORTED...|       6|12/1/2010 8:34|      4.25|  13047|
|   536367|    22623|BOX OF VINTAGE JI...|       3|12/1/2010 8:34|      4.95|  13047|
|   536367|    22622|BOX OF VINTAGE AL...|       2|12/1/2010 8:34|      9.95|  13047|
|   536367|    21754|HOME BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21755|LOVE BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21777|RECIPE BOX WITH M...|       4|12/1/2010 8:34|      7.95|  13047|
+---------+---------+--------------------+--------+--------------+----------+-------+
only showing top 20 rows

解决方案 12:

使用时spark.read.csv,我发现使用选项escape='"'multiLine=True提供最一致的解决方案,符合CSV 标准,并且根据我的经验,最适合从 Google 表格导出的 CSV 文件。

那是,

#set inferSchema=False to read everything as string
df = spark.read.csv("myData.csv", escape='"', multiLine=True,
     inferSchema=False, header=True)

解决方案 13:

以这样的方式读取你的 csv 文件:

df= spark.read.format("csv").option("multiline", True).option("quote", "\"").option("escape", "\"").option("header",True).load(df_path)

spark版本是3.0.1

相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   4008  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   2751  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Freshdesk、ClickUp、nTask、Hubstaff、Plutio、Productive、Targa、Bonsai、Wrike。在当今快速变化的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多企业在项目管理过程中面临着诸多痛点,如任务分配不...
项目管理系统   86  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Monday、TeamGantt、Filestage、Chanty、Visor、Smartsheet、Productive、Quire、Planview。在当今快速变化的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多项目经理和团队在管理复杂项目时,常...
开源项目管理工具   97  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Smartsheet、GanttPRO、Backlog、Visor、ResourceGuru、Productive、Xebrio、Hive、Quire。在当今快节奏的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多企业在选择项目管理工具时常常面临困惑:...
项目管理系统   85  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用