PySpark- An Introduction to Apache Spark
PySpark is the Python API for Apache Spark. It enables us to perform real-time, large-scale data processing in a distributed environment using Python. It also provides a PySpark shell for interactively analyzing your data.
Exercise: In this exercise, we will about various commands of PySpark.
Note: In this exercise, we are using the datasource data.csv. You can download the datasource and use for the transformation.
First, we must install the pyspark, to install PySpark using the pip use the following command:
pip install pyspark
In PySpark, the pyspark.sql.functions module is a collection of built-in functions that are essential for data manipulation and transformation when working with Spark DataFrames. By importing these functions, you can perform a wide range of operations on your data, such as filtering, aggregating, modifying, and creating new columns.
Use the following command to import the module.
from pyspark.sql.functions import *
The entry point into all functionality in Spark is the SparkSession class. To create a basic SparkSession, just use SparkSession.builder:
PySpark
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate()
OR
PySpark
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("App Name") \ .getOrCreate()
Create a Dataframe
In general, DataFrames can be defined as a data structure, which is tabular in nature.
Features of dataframe
- DataFrames are Distributed in Nature, which makes it fault tolerant and highly available data structure.
- Lazy Evaluation is an evaluation strategy which will hold the evaluation of an expression until its value is needed.
- DataFrames are Immutable in nature which means that it is an object whose state cannot be modified after it is created.
To create a dataframe use the following pyspark syntax:
PySpark
spark.createDataFrame(data, schema)
Example: The following shows an example of how to create a dataframe:
PySpark
data = [ (15779, "small_business", 1.204, "high"), (87675, "large_business", 0.167, "low") ] columns= ["Salary", "Business Type", "Score", "Standard"] # Create DataFrame df = spark.createDataFrame(data, columns) # Show DataFrame df.show()
To read a DataFrame from CSV file use the following command:
PySpark
df = spark.read.csv("path/to/csvfile.csv", header=True, inferSchema=True)
Example: To read the file data.csv, we can use the following PySpark code.
PySpark
df = spark.read.csv("data.csv", header=True, inferSchema=True)
To see the dataframe we can use the dataframe.show() command.
To get the dataframe schema just type the dataframe name
Replace the value in the Dataframe The dataframe.replace(oldvalue, newvalue, ["Columnname1", "columnname2"]) is used to replace the values in the dataframe.
In the above formula, specifying the column names are optional, if we are not specified then the value specified is replaced in every column.
Create or update a column in Dataframe The df.withColumn("Newcolumnname", ColumnExpression) is used to create a new column in the dataframe, or replace the existing columns that have the same names.
Here, we are creating a new column named “New Salary”.
Using complex expressions: We can use other functions and transformations within withColumn, such as string operations, aggregations, or conditional logic.
PySpark
# Add a new column with conditional logic newdf = df.withColumn("New Salary", when(df["Salary"] >25000,"High").otherwise("Low")) newdf.show()
Rename Dataframe Column
The df.withColumnRenamed("OldColumnName", "newColumnName") is used to rename the existing column in the dataframe.
PySpark
newdf.show() # Display the DataFrame with the renamed column
Filter Dataframe
It is used to filter rows using the given condition. The where() is an alias for filter(). We can use the following syntax:
Syntax 1: dataframe.filter(conditions) Syntax 2: dataframe.where(conditions)
Here, we filtered the rows which has Salary column value greater than 25000.
Here, we can specify the multiple conditions in the filter function.
PySpark
Note: Always put the condition in the parenthesis ().
Drop Duplicates from dataframe
It is used to return a new DataFrame with duplicate rows removed, optionally only considering certain columns.
Syntax 1: dataframe.dropDuplicates() In this case to check duplicates, it check all the all values of the columns, and it has to be same to consider duplicate.
Syntax 2: dataframe.dropDuplicates(["Column1", "Column2"]) In this case to check duplicates, we specified the column(s) to check the duplicate criteria.
Here, we can see that the no row is deleted.
Here, we can see that when we specify the column name also, to check duplicate, one row is deleted.
Select specific columns from dataframe
It is used to select the specific columns from the dataframe and returns the transformed new dataframe.
Syntax df.select("column1", "column2").show()
Drop a Column from the dataframe
It is used to drop the specific columns from the dataframe and returns the transformed new dataframe.
Syntax df = df.drop("column_name")
Drop Nulls
It is used to remove the nulls from the dataframe. To remove the nulls, we can use dataframe.dropna() syntax. It returns a new DataFrame after omitting rows with null values.
First Row of DataFrame
To returns the first row of the dataframe, use the syntax dataframe.first(), if dataframe is not empty, otherwise None.
Count Rows in DataFrame
The dataframe.count() is used to count the number of rows in the dataframe.
Syntax df.count()
Return DataFrame Columns
The dataframe.columns is used to return the column names in the dataframe.
GroupBy and agg() function in DataFrame
The groupBy() function groups the DataFrame using the specified columns, so we can run aggregation on them. The groupby() is an alias for groupBy().
Syntax DataFrame.groupBy(columnNames)
If we did not specify the column name in groupBy(), it triggers a global aggregation.
Example:
PySpark
Here, we can use the alias with the agg function. The alias function is used to name the column.
PySpark
In PySpark, the agg() function is used to perform aggregate operations on DataFrame columns. It allows us to compute multiple aggregates at once, such as sum(), avg(), count(), min(), and max(). We can use any combination of aggregate functions inside agg().
Sort the Dataframe Columns
Syntax DataFrame.sort(Column, SortOrder)
Here, the Column specifies the name of the column on which we want to sort, and it can be in the string, or list type. The SortOrder specifies the sort order i.e. ascending or descending, it will be in boolean data type, i.e. True or False. True for ascending order and False for the descending Order. By default, its value is True.
Example: Sort the dataframe by Ascending order.
PySpark
groupdf.sort("Total Salary").show()
Example: Sort the dataframe in descending order.
PySpark
groupdf.sort("Total Salary", ascending=False).show()
Use coalesce() function
The coalesce() function in PySpark is used to reduce the number of partitions in a DataFrame. It's a key function for optimizing the performance and resource utilization of your distributed data processing.
What is Partitioning? When we create a DataFrame or read data into Spark, Spark automatically splits the data into partitions. These partitions are distributed across multiple nodes (in a cluster) or cores (in a local machine) for parallel processing. The number of partitions determines how much parallelism Spark can use.
However, in certain scenarios, having too many partitions can cause inefficiency because each partition introduces some overhead. This is where coalesce() comes in.
Example: Let's assume that after some transformations, our DataFrame data has 10 partitions, but we want to write it into a single CSV file. Spark will write each partition as a separate file by default, but we can use coalesce(1) to combine all the data into one partition, so only one file will be written.
PySpark
Explanation:
- coalesce(1): This reduces the number of partitions to 1, meaning all the data will be written into a single file (which can be useful when you want only one output file, such as for CSV).
- .write.csv(): This writes the data to the specified output_path.
- header=True: Includes the column names as the header in the output CSV file.
- mode='overwrite': This overwrites any existing files at the specified output path.
Write the DataFrame to CSV
The below syntax is used in Apache Spark to save the contents of a DataFrame (df) as a CSV file.
- df.write.csv("path/to/save/csvfile.csv", header=True)
If the file already exists in the directory, Spark will throw an error unless we specify a mode like mode(“overwrite”) to overwrite existing files.
- df.write.mode("overwrite").csv("path/to/save/csvfile.csv", header=True)
- header=True: This argument indicates that the first row of the CSV file should contain the column names from the DataFrame. This is important for preserving the schema when the data is read back or used elsewhere.
Example: To write the dataframe as the file “csvfile.csv”.
PySpark
OR
# newdf.write.mode("overwrite").csv("csvfile.csv", header=True) # Set the write mode to 'overwrite'
Stop Spark Session
To stop the Spark session, use the syntax spark.stop(), it releases the resources used by the Spark application. It’s a good practice to stop the session when all operations are complete.