PySpark- An Introduction to Apache Spark

PySpark is the Python API for Apache Spark. It enables us to perform real-time, large-scale data processing in a distributed environment using Python. It also provides a PySpark shell for interactively analyzing your data.

Exercise: In this exercise, we will about various commands of PySpark.

Note: In this exercise, we are using the datasource data.csv. You can download the datasource and use for the transformation.

First, we must install the pyspark, to install PySpark using the pip use the following command:
pip install pyspark

In PySpark, the pyspark.sql.functions module is a collection of built-in functions that are essential for data manipulation and transformation when working with Spark DataFrames. By importing these functions, you can perform a wide range of operations on your data, such as filtering, aggregating, modifying, and creating new columns.

Use the following command to import the module.
from pyspark.sql.functions import *

The entry point into all functionality in Spark is the SparkSession class. To create a basic SparkSession, just use SparkSession.builder:

PySpark

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()   

OR

PySpark

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("App Name") \
    .getOrCreate()  

Create a Dataframe

In general, DataFrames can be defined as a data structure, which is tabular in nature.

Features of dataframe

To create a dataframe use the following pyspark syntax:

PySpark

spark.createDataFrame(data, schema)  

Example: The following shows an example of how to create a dataframe:

PySpark

data = [
    (15779, "small_business", 1.204, "high"),
    (87675, "large_business", 0.167, "low")
]

columns= ["Salary", "Business Type", "Score", "Standard"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Show DataFrame
df.show()   
Apache PySpark

To read a DataFrame from CSV file use the following command:

PySpark

df = spark.read.csv("path/to/csvfile.csv", header=True, inferSchema=True)  

Example: To read the file data.csv, we can use the following PySpark code.

PySpark

df = spark.read.csv("data.csv", header=True, inferSchema=True)  

To see the dataframe we can use the dataframe.show() command.

Apache PySpark

To get the dataframe schema just type the dataframe name

Apache PySpark

Replace the value in the Dataframe The dataframe.replace(oldvalue, newvalue, ["Columnname1", "columnname2"]) is used to replace the values in the dataframe.

In the above formula, specifying the column names are optional, if we are not specified then the value specified is replaced in every column.

Apache PySpark

Create or update a column in Dataframe The df.withColumn("Newcolumnname", ColumnExpression) is used to create a new column in the dataframe, or replace the existing columns that have the same names.

Here, we are creating a new column named “New Salary”.

Apache PySpark

Using complex expressions: We can use other functions and transformations within withColumn, such as string operations, aggregations, or conditional logic.

PySpark

# Add a new column with conditional logic
newdf = df.withColumn("New Salary", when(df["Salary"] >25000,"High").otherwise("Low"))
newdf.show() 
Apache PySpark

Rename Dataframe Column

The df.withColumnRenamed("OldColumnName", "newColumnName") is used to rename the existing column in the dataframe.

PySpark

newdf = newdf.withColumnRenamed("Salary", "Employee Salary") # Rename the column
newdf.show() # Display the DataFrame with the renamed column
Apache PySpark

Filter Dataframe

It is used to filter rows using the given condition. The where() is an alias for filter(). We can use the following syntax:

Syntax 1: dataframe.filter(conditions) Syntax 2: dataframe.where(conditions)

Here, we filtered the rows which has Salary column value greater than 25000.

Apache PySpark

Here, we can specify the multiple conditions in the filter function.

PySpark

newdf.filter((newdf["Salary"] > 25000) & (newdf["Company"]=="BDO")).show()

Note: Always put the condition in the parenthesis ().

Apache PySpark

Drop Duplicates from dataframe

It is used to return a new DataFrame with duplicate rows removed, optionally only considering certain columns.

Syntax 1: dataframe.dropDuplicates() In this case to check duplicates, it check all the all values of the columns, and it has to be same to consider duplicate.

Syntax 2: dataframe.dropDuplicates(["Column1", "Column2"]) In this case to check duplicates, we specified the column(s) to check the duplicate criteria.

Apache PySpark

Here, we can see that the no row is deleted.

Apache PySpark

Here, we can see that when we specify the column name also, to check duplicate, one row is deleted.

Apache PySpark

Select specific columns from dataframe

It is used to select the specific columns from the dataframe and returns the transformed new dataframe.

Syntax df.select("column1", "column2").show()

Apache PySpark

Drop a Column from the dataframe

It is used to drop the specific columns from the dataframe and returns the transformed new dataframe.

Syntax df = df.drop("column_name")

Apache PySpark

Drop Nulls

It is used to remove the nulls from the dataframe. To remove the nulls, we can use dataframe.dropna() syntax. It returns a new DataFrame after omitting rows with null values.

First Row of DataFrame

To returns the first row of the dataframe, use the syntax dataframe.first(), if dataframe is not empty, otherwise None.

Apache PySpark

Count Rows in DataFrame

The dataframe.count() is used to count the number of rows in the dataframe.

Syntax df.count()

Apache PySpark

Return DataFrame Columns

The dataframe.columns is used to return the column names in the dataframe.

Apache PySpark

GroupBy and agg() function in DataFrame

The groupBy() function groups the DataFrame using the specified columns, so we can run aggregation on them. The groupby() is an alias for groupBy().

Syntax DataFrame.groupBy(columnNames)

If we did not specify the column name in groupBy(), it triggers a global aggregation.

Example:

Apache PySpark

PySpark

df.groupBy(df["Company"]).agg(sum("Salary")).show()
Apache PySpark

Here, we can use the alias with the agg function. The alias function is used to name the column.

PySpark

df.groupBy(df["Company"]).agg(sum("Salary").alias("Total Salary")).show()
Apache PySpark

In PySpark, the agg() function is used to perform aggregate operations on DataFrame columns. It allows us to compute multiple aggregates at once, such as sum(), avg(), count(), min(), and max(). We can use any combination of aggregate functions inside agg().

Sort the Dataframe Columns

Syntax DataFrame.sort(Column, SortOrder)

Here, the Column specifies the name of the column on which we want to sort, and it can be in the string, or list type. The SortOrder specifies the sort order i.e. ascending or descending, it will be in boolean data type, i.e. True or False. True for ascending order and False for the descending Order. By default, its value is True.

Example: Sort the dataframe by Ascending order.

PySpark

groupdf=df.groupBy(df["Company"]).agg(sum("Salary").alias("Total Salary"))
groupdf.sort("Total Salary").show()
Apache PySpark

Example: Sort the dataframe in descending order.

PySpark

groupdf=df.groupBy(df["Company"]).agg(sum("Salary").alias("Total Salary"))
groupdf.sort("Total Salary", ascending=False).show()
Apache PySpark

Use coalesce() function

The coalesce() function in PySpark is used to reduce the number of partitions in a DataFrame. It's a key function for optimizing the performance and resource utilization of your distributed data processing.

What is Partitioning? When we create a DataFrame or read data into Spark, Spark automatically splits the data into partitions. These partitions are distributed across multiple nodes (in a cluster) or cores (in a local machine) for parallel processing. The number of partitions determines how much parallelism Spark can use.

However, in certain scenarios, having too many partitions can cause inefficiency because each partition introduces some overhead. This is where coalesce() comes in.

Example: Let's assume that after some transformations, our DataFrame data has 10 partitions, but we want to write it into a single CSV file. Spark will write each partition as a separate file by default, but we can use coalesce(1) to combine all the data into one partition, so only one file will be written.

PySpark

data.coalesce(1).write.csv(output_path, header=True, mode='overwrite')

Explanation:

Write the DataFrame to CSV

The below syntax is used in Apache Spark to save the contents of a DataFrame (df) as a CSV file.

If the file already exists in the directory, Spark will throw an error unless we specify a mode like mode(“overwrite”) to overwrite existing files.

Example: To write the dataframe as the file “csvfile.csv”.

PySpark

df.write.csv("csvfile.csv", header=True)
OR
# newdf.write.mode("overwrite").csv("csvfile.csv", header=True) # Set the write mode to 'overwrite'

Stop Spark Session

To stop the Spark session, use the syntax spark.stop(), it releases the resources used by the Spark application. It’s a good practice to stop the session when all operations are complete.