· Spark SQL · 7 min read
Learn How to Read and Write CSV Files with Apache Spark.
Read CSV files with PySpark
Let’s learn all about reading CSV files with PySpark.
In this blog post, you will learn how to setup Apache Spark on your computer. This means you can learn Apache Spark with a local install at 0 cost.
Data to use
We’ve created 3 tables that you should download, and put into the location of your Jupyter notebook, before you get started. Just click the links below to download.
Managers
Establishments
Sales
Reading into a Spark Dataframe
We’re going to be reading in the sales CSV file into our Spark Dataframe.
Setup the Spark Session
If you’re following from the previous post, then you will have already done this.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Reading data from a csv file is extremely easy with PySpark.
Read / Write data
Read data from a CSV file
We have the method spark.read.csv()
provided by PySpark to read CSV files. Here is how to use it.
raw = spark.read.csv("./sales.csv", header=True)
raw.show()
raw = spark.read.csv("./sales.csv", header=True)
: This reads a CSV file called “sales.csv” and stores the data in a DataFrame. The header option specifies that the first row of the CSV file contains the column names, so these will be used to name the columns in the DataFrame.
raw.show()
: This displays the first 20 rows of the DataFrame. The show() method is used to print the contents of a DataFrame to the console.
It can also be written like this for short
raw = spark.read.csv("./sales.csv", header=True).show()
You should get the following output
date | est_ref | capacity | occupancy | rooms_sold | avg_rate_paid | sales_value |
---|---|---|---|---|---|---|
2022-12-27 | 0 | 289 | 0.75 | 217 | 35.97 | 7805.49 |
2022-12-27 | 1 | 203 | 0.35 | 71 | 82.31 | 5844.01 |
2022-12-27 | 2 | 207 | 0.51 | 106 | 227.83 | 24149.98 |
Arguments for spark.read.csv()
Here are the arguments you may need to use when reading a csv file with PySpark.
Argument | Description |
---|---|
path | The file path or URL to the CSV file. |
header | A boolean indicating whether the CSV file has a header row (default is False ). |
schema | A StructType object defining the schema of the data (default is None , which means the schema will be inferred from the data). |
sep | The character used to separate fields in the CSV file (default is ',' ). |
quote | The character used to quote fields in the CSV file (default is '"' ). |
escape | The character used to escape quotes inside quoted fields (default is '\u0000' , which means the feature is disabled). |
mode | The mode for dealing with malformed records (default is 'PERMISSIVE' , which means it will try to parse the data anyway). |
charset | The character set to use when reading the CSV file (default is UTF-8 ). |
inferSchema | A boolean indicating whether to infer the schema from the data (default is False ). |
columnNameOfCorruptRecord | The name of the column in which to place malformed records (default is "_corrupt_record" ). |
Reading a CSV file from Github with PySpark
I’ve uploaded the files to Github, so you can read them directly from there with the following code to follow along with the course
url = 'https://raw.githubusercontent.com/adamrichardson14/PySpark-Tutorial/main/sales.csv'
from pyspark import SparkFiles
spark.sparkContext.addFile(url)
raw = spark.read.csv(SparkFiles.get("sales.csv"), header=True).show()
Types when reading from a CSV file
By default, all of the columns will be of Type String when reading a CSV file. It will not try to infer the schema by default and this is good. You can check this out for yourself with the following code.
raw.printSchema()
|-- date: string (nullable = true)
|-- est_ref: string (nullable = true)
|-- capacity: string (nullable = true)
|-- occupancy: string (nullable = true)
|-- rooms_sold: string (nullable = true)
|-- avg_rate_paid: string (nullable = true)
|-- sales_value: string (nullable = true)
This is telling us that all of the columns are of type string
.
As a best practice you should always declare the Data Types for your data.
Data Types
For more information about types with Apache Spark, read all about them in the following post.
Spark data types can be confusing, but this guide breaks it down for you. Learn about the different data types and how to use them in your Spark applications.
Reading CSV files with the correct types
Let’s amend our code to ensure that we’re reading the CSV files and setting the correct data types
schema = StructType([
StructField("date", DateType(), True),
StructField("est_ref", IntegerType(), True),
StructField("capacity", IntegerType(), True),
StructField("occupancy", DoubleType(), True),
StructField("rooms_sold", IntegerType(), True),
StructField("avg_rate_paid", DoubleType(), True),
StructField("sales_value", DoubleType(), True)
])
raw = spark.read.csv('sales.csv', header=True, schema=schema)
raw.printSchema()
Our output will now look like this
|-- date: date (nullable = true)
|-- est_ref: integer (nullable = true)
|-- capacity: integer (nullable = true)
|-- occupancy: double (nullable = true)
|-- rooms_sold: integer (nullable = true)
|-- avg_rate_paid: double (nullable = true)
|-- sales_value: double (nullable = true)
Reading multiple CSV files
It’s quite common to need to read multiple CSV files into a dataframe. This is also really easy to achieve, simply by passing multiple paths in a string, separated by commas. Here is an example of how we might do that.
raw = spark.read.csv("sales.csv, sales1.csv, sales2.csv", header=True).show()
Writing a Spark Dataframe to a CSV file
The first thing to understand when writing files with PySpark is the different modes available.
PySpark write modes
Write Mode | Description |
---|---|
overwrite | Overwrites the file if it already exists. |
append | Appends the data to the end of the file if it already exists. If the file does not exist, it creates a new file and writes the data to it. |
errorIfExists | Throws an error if the file already exists. This is the default behavior if no write mode is specified. |
ignore | If the file already exists, do nothing. If the file does not exist, create a new file and write the data to it. This mode can be useful if you want to avoid overwriting data. |
To specify the write mode when writing a CSV file with PySpark, you can use the mode argument in the write.csv
method.
df.write.csv('write/sales.csv', mode='overwrite')
This will write the data from the DataFrame to a CSV file located at /write/sales.csv, overwriting the file if it already exists.
PySpark write arguments
Argument | Description |
---|---|
path | The file path to write the data to. |
mode | The write mode to use when writing the data. This can be one of the following: overwrite , append , errorIfExists , or ignore . See the previous table for a description of each mode. |
header | A boolean indicating whether to write the column names as the first row of the CSV file (default is True ). |
sep | The character used to separate fields in the CSV file (default is ',' ). |
quote | The character used to quote fields in the CSV file (default is '"' ). |
escape | The character used to escape quotes inside quoted fields (default is '\u0000' , which means the feature is disabled). |
quoteAll | A boolean indicating whether to quote all fields in the CSV file, regardless of whether they contain special characters (default is False ). |
escapeQuotes | A boolean indicating whether to escape quotes inside quoted fields (default is True ). This option is only used if quote is set to a character other than '\u0000' . |
nullValue | The string representation of a null value (default is 'null' ). |
compression | The compression codec to use when writing the data. This can be one of the following: none, gzip, bzip2, or lz4. |
Quote and Escape Characters
The quote character is used to enclose fields in a CSV file that contain characters that would otherwise cause the file to be parsed incorrectly. For example, if a field contains a , character, it will be interpreted as a field separator unless it is quoted. Similarly, if a field contains a ” character, it will be interpreted as the end of the field unless it is escaped.
The escape character is used to escape quotes inside quoted fields. For example, if a field is quoted with ” characters and the field value contains a ” character, it can be escaped with the escape character so that it is not interpreted as the end of the field.
Multiple line CSV files
Multiple lines mean that one record can span multiple lines.
As a best practice do not generate CSV files with multiple lines from your data source. Think about creating additional columns or structuring the data inside of the column as JSON or Key value pairs.
A field can span multiple lines if the field value is quoted and the line breaks are included within the quotes.
Reading Multiline CSV files
To read a CSV file with multiline fields using PySpark, you can use the quote and escape arguments in the spark.read.csv method to specify the characters used to quote and escape fields in the file.
For example, to read a CSV file with ’”’ as the quote character and ” as the escape character, you could use the following. A file would look like this
"col 1","col 2","col 3"
"value 1","value 2","value
3"
"value 4","value 5","value
6"
"value 7","value 8","value
9"
df = spark.read.csv('/path/to/file.csv', quote='"', escape='\\')
This will read in the CSV file and correctly parse the multiline fields.
It is important to note that the quote and escape characters must be specified correctly in order for the multiline fields to be parsed correctly. If the characters are incorrect, the file may be parsed incorrectly and you may encounter errors.
Conclusion
Hope you are fully equipped now to read all types of CSV files using PySpark.
In this post, you will learn how to rename columns of a Dataframe with PySpark