Introduction to Apache pySpark

PySpark is the Python API for Apache Spark, an open-source, distributed computing framework designed for large-scale data processing. It allows you to leverage the power of Spark from the familiar and approachable Python environment. It provides Python bindings that allow developers to create, test, and deploy applications using the Spark API.

Summary Apache Spark

  • Definition: Apache Spark is an Open source analytical processing engine for large-scale powerful distributed data processing and machine learning applications. Apache Spark 3.5 is a framework that is supported in Scala, Python, R Programming, and Java. Below are different implementations of Spark.
    • Spark - Default interface for Scala and Java
    • PySpark – Python interface for Spark
    • SparklyR – R interface for Spark.
  • Features of Apache Spark:
    • In-memory computation
    • Distributed processing using parallelize
    • Can be used with many cluster managers (Spark, Yarn, Mesos e.t.c)
    • Fault-tolerant
    • Immutable
    • Lazy evaluation
    • Cache & persistence
    • Inbuild-optimization when using DataFrames
    • Supports ANSI SQL
  • Advantages of Apache Spark:
    • Spark is a general-purpose, in-memory, fault-tolerant, distributed processing engine that allows you to process data efficiently in a distributed fashion.
    • Applications running on Spark are 100x faster than traditional systems.
    • There are so many benefits from using Spark for data ingestion pipelines.
    • Using Spark we can process data from Hadoop HDFS, AWS S3, Databricks DBFS, Azure Blob Storage, and many file systems.
    • Spark also is used to process real-time data using Streaming and Kafka.
    • Using Spark Streaming you can also stream files from the file system and also stream from the socket.
    • Spark natively has machine learning and graph libraries.
    • Provides connectors to store the data in NoSQL databases like MongoDB.
  • Apache Spark Architecture: Spark works in a master-slave architecture where the master is called the “Driver” and slaves are called “Workers”. When you run a Spark application, Spark Driver creates a context that is an entry point to your application, and all operations (transformations and actions) are executed on worker nodes, and the resources are managed by Cluster Manager.
  • Cluster Manager Types: Standalone, Apache Mesos, Hadoop YARN, Kubernetes, local (for master() in order to run Spark on local computer).
  • Spark Modules: Spark Core, Spark SQL, Spark Streaming, Spark MLlib, Spark GraphX
  • Spark Core: Spark Core is the main base library of Spark which provides the abstraction of how distributed task dispatching, scheduling, basic I/O functionalities etc.
  • SparkSession: It is an entry point to underlying Spark functionality in order to programmetically use Spark RDD, DataFrame, and Dataset. It's object spark is default available in spark-shell. The initial step in a Spark program involving RDDs, DataFrames, or Datasets would be to create a SparkSession instance. The Sparksession will be created using SparkSession.builder().
    
                    from pyspark.sql import SparkSession
                    # Create a SparkSession object
                    spark = SparkSession.builder \
                        .appName("YourAppName") \
                        .master("local[*]") \
                        .getOrCreate()
                
                    spark # to call spark
                        
    If you're using SparkSession, which is the entry point to Spark SQL, you can work with DataFrames and Datasets, which provide higher-level abstractions and optimizations compared to RDDs. Here are some common tasks you can perform with SparkSession:
    • Read and write data: SparkSession provides methods to read data from various sources such as Parquet, JSON, CSV, JDBC, Avro, ORC, and many more. Similarly, you can write data to different formats and locations. For example:df = spark.read.csv("file.csv") and df.write.parquet("output.parquet")
    • Create DataFrames: You can create DataFrames from existing RDDs, lists, dictionaries, or by applying transformations on other DataFrames. For example:
      
                                  from pyspark.sql import Row
      
                                  data = [(1, "Alice"), (2, "Bob")]
                                  rdd = sc.parallelize(data)
                                  row_rdd = rdd.map(lambda x: Row(id=x[0], name=x[1]))
                                  df = spark.createDataFrame(row_rdd)
                              
    • SQL queries: SparkSession allows you to run SQL queries on DataFrames using the sql() method. For example:
      
                              df.createOrReplaceTempView("people")
                              result = spark.sql("SELECT * FROM people WHERE age > 20")
                          
    • DataFrame operations: You can perform various operations on DataFrames like select(), filter(), join(), orderBy(), agg() etc. For example:
      result = df.select("name").filter(df.age > 20).orderBy(df.name)
    • Window functions: You can use window functions for advanced analytics tasks like ranking, lead/lag analysis, etc. For example:
      
                          from pyspark.sql.window import Window
                          from pyspark.sql.functions import rank
                          
                          window = Window.partitionBy("department").orderBy("salary")
                          result = df.withColumn("rank", rank().over(window))
                          
    • Machine learning: We can use Spark MLlib, a scalable machine learning library, to train and apply machine learning models on DataFrames. For example:
      
                                  from pyspark.ml.classification import LogisticRegression
      
                                  lr = LogisticRegression(featuresCol="features", labelCol="label")
                                  model = lr.fit(train_data)
                                  predictions = model.transform(test_data)
                              
    • Streaming data:We can process streaming data using Spark Structured Streaming, which provides high-level APIs for streaming processing. For example:
      stream_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:port").load()
  • Spark Context: It is also a entry point to Spark and PySpark. Before SparkSession 2.0, it was the main entry point. Creating SparkContext was the first step to the program with RDD and to connect to Spark Cluster. It’s object sc by default available in spark-shell. Since Spark 2.0, when we create SparkSession, SparkContext object is by default created and it can be accessed using spark.sparkContext.
    
                    from pyspark.sql import SparkSession
    
                    # Create a SparkSession
                    spark = SparkSession.builder \
                        .appName("YourAppName") \
                        .master("local[*]") \
                        .getOrCreate()
                    
                    # Access the SparkContext from the SparkSession
                    sc = spark.sparkContext
                    
    Once the spark context is created, we can do following things:
    • Parallelize data: We can parallelize an existing collection in your driver program (e.g., a list or array) using the parallelize() method. This will distribute the data across the nodes in your Spark cluster and create an RDD. rdd = sc.parallelize(data)
    • Read data from external sources: We can read data from various external sources such as HDFS, S3, HBase, or any supported data source using the methods provided by SparkContext, such as textFile() for reading text files.text_rdd = sc.textFile("hdfs://path/to/file.txt")
    • Transformations: can perform various transformations on RDDs, such as map(), filter(), flatMap(), reduceByKey(), etc., to process and manipulate the data. For example: squared_rdd = rdd.map(lambda x: x * x)
    • Caching: We can cache RDDs in memory to speed up iterative or interactive computations by using the cache() method. For example: rdd.cache()
    • Accumulators: We can can use accumulators to aggregate information across all tasks, such as counting occurrences of certain events. For example: accumulator = sc.accumulator(0).
  • Difference between SparkContext and SparkSession
    1. SparkContext (sc):
      • SparkContext is the entry point to Spark functionality and represents the connection to a Spark cluster. It coordinates the execution of operations on the cluster.
      • It provides access to the underlying Spark functionality, including RDDs (Resilient Distributed Datasets), which are the fundamental data abstraction in Spark.
      • SparkContext is primarily used for low-level operations and interactions with RDDs, such as creating RDDs, performing transformations, and executing actions.
    2. SparkSession (spark):
      • SparkSession, introduced in Spark 2.x, is a higher-level abstraction on top of SparkContext and provides a unified entry point to Spark SQL, DataFrame, and Dataset APIs.
      • It is the recommended way to interact with Spark in modern Spark applications, as it provides a more user-friendly and consistent interface for working with structured data.
      • SparkSession encapsulates SparkContext and provides additional functionalities, including reading and writing structured data from various sources, running SQL queries, and performing DataFrame operations.
      • SparkSession also manages the underlying SparkContext internally, so there's no need to create a SparkContext explicitly when using SparkSession.
    In summary, while SparkContext is primarily focused on low-level distributed computing operations with RDDs, SparkSession provides a higher-level interface for working with structured data, including DataFrames and Datasets, as well as integrating with Spark SQL, MLlib, and other Spark components. It's generally recommended to use SparkSession for modern Spark applications unless you have specific requirements that necessitate working directly with RDDs and SparkContext.

PySpark methods

Functionality PySpark Method
SparkSession
  • SparkSession.builder.appName(): Set the application name.
  • SparkSession.builder.getOrCreate(): Create a SparkSession or get an existing one.
Reading and Writing Data:
  • spark.read.format().option().load(): Read data from various sources.
  • DataFrame.write.format().mode().save(): Write data to various storage systems.
DataFrame Operations:
  • DataFrame.show(): Display the contents of a DataFrame.
  • DataFrame.select(): Select specific columns.
  • DataFrame.filter(): Filter rows based on conditions.
  • DataFrame.groupBy(): Group data based on one or more columns.
  • DataFrame.join(): Perform join operations between DataFrames.
Transformations:
  • DataFrame.withColumn(): Add or replace columns.
  • DataFrame.drop(): Drop specified columns.
  • DataFrame.na.fill(): DataFrame.na.fill()
Actions
  • DataFrame.count(): Count the number of rows.
  • DataFrame.collect(): Retrieve all data from the DataFrame.
  • DataFrame.take(): Retrieve a specified number of rows.
SQL Queries:
  • DataFrame.createOrReplaceTempView(): Register DataFrame as a temporary table.
  • spark.sql(): Execute SQL queries on DataFrames.
Data Aggregation and Summary
  • DataFrame.describe(): Generates descriptive statistics
  • DataFrame.approxQuantile(): Approximate quantiles of numerical columns.
Data Cleaning and Handling:
  • DataFrame.dropDuplicates(): Remove duplicate rows.
  • DataFrame.fillna(): Replace null values with specified values.
  • DataFrame.dropna(): Remove rows with null values.
  • DataFrame.replace(): Replace specified values.
Window Functions:
  • pyspark.sql.functions.row_number(): Assigns a unique number to each row within a partition.
  • pyspark.sql.functions.rank(): Computes the rank of rows within a partition.
  • pyspark.sql.functions.lead() and pyspark.sql.functions.lag(): Access data from subsequent or preceding rows.
Caching
  • DataFrame.cache(): Persist the DataFrame in memory for faster access.
Machine Learning:
  • PySpark includes various MLlib functions for machine learning tasks.
  • pyspark.ml.Pipeline(): Define a machine learning pipeline.
  • PySpark MLlib provides various algorithms for classification, regression, clustering, and collaborative filtering.
  • Methods like pyspark.ml.classification.LogisticRegression(), and pyspark.ml.regression.LinearRegression() etc.
Statistical Functions:
  • pyspark.sql.functions.corr()
  • pyspark.sql.functions.covar_pop() and pyspark.sql.functions.covar_samp(): Compute population and sample covariance.
Time Series and Date Functions:
  • pyspark.sql.functions.year() and pyspark.sql.functions.month(): etc.: Extract components from a date.
  • pyspark.sql.functions.datediff(): Calculate the difference between two dates
Graph Processing (GraphFrames):
  • GraphFrames is an extension of DataFrames for graph processing.
  • Methods like g.vertices, g.edges, g.bfs(), etc., for graph analysis.
User-Defined Functions (UDFs):
  • pyspark.sql.functions.udf(): Define a User-Defined Function.
  • pyspark.sql.functions.when(): Conditional expressions in a DataFrame.

Codes on GitHub

For the details on how pyspark can be used to do data analytics of a large dataset is available at my GitHub repository. In this repository, I have done analysis on some examples datasets using Pyspark Dataframe and RDD methods.

RDD's Resilient Distributed Datasets (RDDs)

The RDD structure is the elementary structure of Spark. It is flexible and optimal in performance for any linear operation. However, this structure has limited performance when it comes to non-linear operations.
SparkContext:
SparkContext is a crucial entry point for any Spark functionality in Apache Spark applications. It coordinates the execution of Spark jobs and manages the resources allocated for these jobs. In Spark versions before 2.0, it was the main entry point for Spark, but in later versions, SparkSession is typically used, which internally creates a SparkContext object.
  • Definition: SparkContext is the main entry point for Spark functionality in Spark applications.
  • Purpose: It coordinates the execution of Spark jobs and manages the resources allocated for these jobs.
  • Usage: In older versions of Spark, it was directly created by developers. In newer versions, it's often implicitly created as part of a SparkSession.
To start a SparkContext, we typically create an instance of it in the Spark application code. However, in newer versions of Spark (2.0 and above), it's recommended to use SparkSession instead, which internally manages the SparkCOntext.
  • Using SparkContext:
    
    from pyspark import SparkContext
    
    # Create a SparkContext object
    sc = SparkContext("local", "MyApp")
                        
    In this example:
    • "local" specifies that Spark should run in local mode using a single thread.
    • "MyApp" is a name for your Spark application.
  • Using SparkSession: For newer versions of Spark, you typically use SparkSession, which internally creates a SparkContext. Here's how you do it:
    
    from pyspark.sql import SparkSession
    
    # Create a SparkSession object
    spark = SparkSession \
        .builder \
        .master("local") \
        .appName("MyApp") \
        .getOrCreate()
    spark
                        
    In this case:
    • "MyApp" is also the name for your Spark application.
    • getOrCreate() method ensures that if a SparkSession already exists, it returns that session; otherwise, it creates a new one.
    The SparkContext can also be created using SparkSession:
    
    # Creating SparkContext through SparkSession
    from pyspark.sql import SparkSession
    
    # Create a SparkContext object
    spark = SparkSession.builder \
        .appName("MyApp") \
        .getOrCreate()
    
    # Access SparkContext from sparksession 
    sc = spark.SparkContext
                        
Example:
The textFile method of SparkContext allows to load a CSV file into a RDD.
rdd = sc.textFile("path_to_file.ext")
To display top 5 elements of the rdd, we can use take(5) function:
rdd.take(5)
To count the number of lines a rdd, count() can be used:
count = rdd.count()
Map & Reduce: Map and Reduce are fundamental operations in distributed computing, popularized by frameworks like Apache Hadoop and Apache Spark. They originated from functional programming concepts and are widely used for processing large-scale datasets efficiently. Here's an explanation of each:
  1. Map:
  2. The map operation applies a function to each element in a dataset and produces a new dataset of the same size. It transforms each element into another element based on the function provided. The map operation is typically used for tasks like data cleaning, transformation, or feature extraction.
    
    # Applying a map operation to double each element in a list
    numbers = [1, 2, 3, 4, 5]
    doubled_numbers = list(map(lambda x: x * 2, numbers))
    print(doubled_numbers)
                    
    Output: [2, 4, 6, 8, 10]
  3. Reduce:
  4. The reduce operation combines elements of a dataset pairwise using a given function until a single result is obtained. It repeatedly applies the function to pairs of elements until only one element remains. The reduce operation is commonly used for tasks like aggregation, summarization, or finding totals.
    
    from functools import reduce
    
    # Applying a reduce operation to find the sum of elements in a list
    numbers = [1, 2, 3, 4, 5]
    total_sum = reduce(lambda x, y: x + y, numbers)
    print(total_sum)
                    
    Output: 15
    The RDD works very well for "line-by-line" operations, which is why the mao and reduce methods are extremely effective for these calculations. Applying successively the map method and then the reduceByKey method allows us to cleverly summarize our data.
    
    rdd.map(lambda x: (x[7]:1)).reduceByKey(lambda x,y:x+y)
                    
    This combines couples with the same key by the lambda function here to them, and pair (x,y) returns x+y.
The map operation is applied in parallel to different partitions of the dataset, and then the results are shuffled and aggregated using the reduce operation to produce the final output.

DataFrame for structured data: The RDD structure is not optimized for column tasks or Machine learning. The DataFrame structure was created to meet this need. It uses the underlying bases of a RDD but it has been structured in columns and rows in a SQL structure in a form inspired by the DataFrame of the python pandas module.

Advantages: The DataFrame structure has two main Advantages. First of all, this structure is similar to the pandas DataFrame and is therefore easy to learn. It is also efficient. A DataFrame om PySpark is as fast as a DataFrame in scala and it is the most optimized distributed structure in machine learning.

Let's take a example to understand the basics of this:
  • Let's first build the spark session:
    
    # Importing Spark Session and SparkContext
    from pyspark.sql import SparkSession
    from pyspark import SparkContext
    
    # Definition of a SparkContext
    SparkContext.getOrCreate() 
    
    # Definition of a SparkSession
    spark = SparkSession \
        .builder \
        .master("local") \
        .appName("Introduction to DataFrame") \
        .getOrCreate()
        
    spark
                            
  • For more details you can always go to my github repo: Pyspark repo.

References

  1. Official Documentation
  2. Databricks Learning Academy
  3. Spark by Examples
  4. Datacamp tutorial.
  5. For databricks, you can look at tutorial videos on youtube at youtube video by Bryan Cafferky, writer of the book "Master Azure Databricks". A great playlist for someone who just want to learn about the big data analytics at Databricks Azure cloud platform.
  6. See the video for pyspark basics by Krish Naik. Great video for starter.
  7. Great youtube on Apache spark one premise working.

Some other interesting things to know: