Introduction

A Spark ML Pipeline is an API provided by Apache Spark for building and deploying end-to-end machine learning workflows. It allows you to chain together multiple stages of machine learning algorithms and data transformations into a single pipeline, making it easier to manage and deploy complex machine learning models.
  • Step-1: Import necessary libraries:
  • 
                                    from pyspark.ml import Pipeline
                                    from pyspark.ml.feature import VectorAssembler, StringIndexer
                                    from pyspark.ml.classification import RandomForestClassifier
                                
  • Step-2: Define stages: Define the stages of the pipeline. Stages can include data preprocessing, feature engineering, and model training.
  • 
                                    # Define feature vector assembler
                                    feature_cols = ["feature1", "feature2", ...]
                                    feature_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
                                    
                                    # Define label indexer (if target variable is categorical)
                                    label_indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
                                    
                                    # Define classifier (e.g., RandomForestClassifier)
                                    classifier = RandomForestClassifier(featuresCol="features", labelCol="indexedLabel", seed=42)
                                
  • Step-3: Create pipeline: Create a pipeline by specifying the stages in sequential order.
  • pipeline = Pipeline(stages=[feature_assembler, label_indexer, classifier])
  • Step-4: Fit pipeline: Fit the pipeline to your training data.
  • pipeline_model = pipeline.fit(train_data)
  • Step-5: Make predictions: Use the fitted pipeline to make predictions on new data.
  • predictions = pipeline_model.transform(test_data)
  • Step-6: Evaluate model: Evaluate the performance of the model using appropriate evaluation metrics.
  • 
                                    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
                                    evaluator = BinaryClassificationEvaluator(labelCol="indexedLabel")
                                    accuracy = evaluator.evaluate(predictions)
                                
  • Step-7: Parameter tuning: We can perform hyperparameter tuning using techniques like cross-validation.
  • 
                                    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    
                                    param_grid = ParamGridBuilder() \
                                        .addGrid(classifier.maxDepth, [5, 10, 15]) \
                                        .addGrid(classifier.numTrees, [10, 20, 30]) \
                                        .build()
                                    
                                    crossval = CrossValidator(estimator=pipeline,
                                                                estimatorParamMaps=param_grid,
                                                                evaluator=evaluator,
                                                                numFolds=3)
                                    
                                    cv_model = crossval.fit(train_data)
                                

Objective

In this project, the objective is to develop a classification machine learning model using Spark MLlib's pipeline framework. The goal is to leverage the power of Spark's distributed computing capabilities to build a robust and scalable pipeline that can efficiently preprocess the HR dataset and train a classification algorithm to predict employee terminations. By implementing the pipeline, we aim to explore the relationships between various features such as employee demographics, performance metrics, and engagement scores, and the likelihood of termination. Through this analysis, we seek to achieve a high level of accuracy in predicting employee terminations, thereby providing valuable insights for HR management and facilitating proactive decision-making in employee retention strategies.

About data: In the present project, we have considered following datasets on at Kaggle on "Human Resources Dataset". This is dataset useful for HR in a company. A company records different parameters of an employee (such as satisfaction level, Salary, number of promotion, left the company etc.) . This dataset can be used to predict whether an employee leave the company or stay in the company.

Following are the data columns:

  • satisfaction_level: Level of employee satisfaction.
  • last_evaluation: Last performance evaluation score.
  • number_project: Number of projects assigned to the employee.
  • average_montly_hours: Average number of monthly work hours.
  • time_spend_company: Number of years spent in the company.
  • Work_accident: Indicator for whether the employee had a work accident (e.g., 1 for yes, 0 for no).
  • left: Indicator for whether the employee left the company (e.g., 1 for yes, 0 for no).
  • promotion_last_5: Indicator for whether the employee was promoted in the last 5 years (e.g., 1 for yes, 0 for no).
  • years: Number of years the employee has been with the company.
  • Department: Department in which the employee works.
  • salary: Level of salary for the employee (e.g., low, medium, high).
Building the SparkSession and loading the datasets:

                            from pyspark.sql import SparkSession
                            from pyspark import SparkContext
                            
                            # Defining a SparkContext locally
                            sc = SparkContext.getOrCreate()
                            
                            # Building a Spark Session
                            spark = SparkSession \
                                .builder \
                                .appName("Pipelines Spark ML") \
                                .getOrCreate()
                                
                            spark
                        
Loading the datasets

                            hr = spark.read.csv('HR_comma_sep.csv', header = True)

                            # Displaying an extract from the data frame
                            hr.sample(False, 0.001 , seed = 222).toPandas()
                        
Selecting the Categorical variables:
Before training classification models, it is important to understand that the svmlib format does not support Strings. It is therefore necessary to use an indexer, i.e. a model to transform a categorical variable into a series of indices. In the data column left is the variable to be predicted, indicates that if the employee left the company voluntarily or not.

                            hr = hr.select( 'left',
                                'satisfaction_level',
                                'last_evaluation',
                                'number_project',
                                'average_montly_hours',
                                'time_spend_company',
                                'Work_accident',
                                'promotion_last_5years',
                                'sales',
                                'salary'
                                )

                            # Display of a description of the variables
                            hr.describe().toPandas()
                        
Note: The StringIndexer is a feature transformer in Apache Spark MLlib that is commonly used for handling categorical variables. It indexes each distinct string value in a column to a numerical index. This transformation is useful for algorithms that expect numerical input features, such as decision trees or logistic regression. It is to be noted that the input column should contain string values representing categorical variables (e.g., department names, job titles, etc.). The output column will contain numerical indices corresponding to the string values in the input column. The StringIndexer assigns a unique numerical index to each distinct string value in the input column. The indices start from 0 and increase sequentially for each unique string value. After indexing, the transformed numerical indices can be used as input features for machine learning algorithms. However, some algorithms may require additional processing, such as one-hot encoding for handling categorical variables properly.

                            from pyspark.ml.feature import StringIndexer

                            # Define the StringIndexer transformer
                            string_indexer = StringIndexer(inputCol="department", outputCol="department_index")
                            
                            # Fit the StringIndexer to the input data
                            indexed_data = string_indexer.fit(input_data).transform(input_data)
                            
                            # Show the transformed data
                            indexed_data.show()
                        
In this example, the StringIndexer is applied to the "department" column in the input data, and the transformed output is stored in a new column named "department_index". Each distinct department name in the "department" column will be assigned a unique numerical index in the "department_index" column.

The StringIndexer is used in two steps:
  • Create an indexer by specifying the input and output columns (inputCol=, outputCol=) and search the database for discrete values using the fit method.
  • Apply the indexer to the database using the method transform
In the present case,

                        from pyspark.ml.feature import StringIndexer

                        # Creation of an indexer transforming a dirty variable into indexedSales
                        salesIndexer = StringIndexer(inputCol='sales', outputCol='indexedSales').fit(hr)

                        # Creating a DataFrame hrSalesIndexed indexing the variable sales
                        hrSalesIndexed = salesIndexer.transform(hr)

                        # Displaying an extract from the hrSalesIndexed data frame 
                        hrSalesIndexed.sample(False, 0.001 , seed = 222).toPandas()
                        
An indexer built this way keeps in memory the indexed values. The IndexToString function allows you to find an original variable from an indexed variable, or from a prediction of this variable. It works with the following arguments:
  • inputCol : the name of the indexed input column
  • outputCol : the name of the output column to be rebuilt
  • labels : the location of the labels

                            ### Insert your code here
                            from pyspark.ml.feature import IndexToString
                            
                            # Creating a new salesReconstructed column
                            SalesReconstructor = IndexToString(inputCol='indexedSales',
                                                                outputCol='salesReconstructed',
                                                                labels = salesIndexer.labels)
                            
                            # Apply the SalesReconstructor transformer
                            hrSalesReconstructed = SalesReconstructor.transform(hrSalesIndexed)
                            
                            # Displaying an extract from the database
                            hrSalesReconstructed.sample(False, 0.001 , seed = 222).toPandas()                            
                        
Here we create a variable salesReconstructed from indexedSales and a new table hrSalesReconstructed is created.
Pipelines
A pipeline is a sequence of stages that are executed in a specific order to process and transform data. Each stage typically represents a transformation or an estimator (machine learning algorithm) that operates on the data. The pipeline allows you to chain together multiple data processing and modeling steps, making it easy to manage and deploy complex machine learning workflows. A example syntex is given by: Pipeline(stages=[estimator1, estimator2, estimator3, ...]).

                            from pyspark.ml import Pipeline

                            # Creation of indexers
                            SalesIndexer = StringIndexer(inputCol='sales', outputCol='indexedSales')
                            SalaryIndexer = StringIndexer(inputCol='salary', outputCol='indexedSalary')
                            
                            # Creating a pipeline
                            indexer = Pipeline(stages =  [SalaryIndexer, SalesIndexer])
                            
                            # Index the variables of "hr"
                            hrIndexed = indexer.fit(hr).transform(hr)
                            
                            # Displaying an extract
                            hrIndexed.sample(False, 0.001 , seed = 222).toPandas()
                        
This will first import Pipeline from pyspark.ml and then a >code>SalesIndexer trasnfroming a sales variable from indexedSales. Similarly, a SalesIndexer is also created for Salary variable indo indexedSalary. A pipeline is created using Pipeline
Formatting the database in svmlib format
At this point, the variables are of numerical type. It is therefore possible to transform the database. First of all, it is necessary to exclude the non-indexed versions of the variables.

                            from pyspark.ml.linalg import DenseVector

                            # Creation of a database excluding non-indexed variables
                            hrNumeric = hrIndexed.select('left',
                                                            'satisfaction_level',
                                                            'last_evaluation',
                                                            'number_project',
                                                            'average_montly_hours',
                                                            'time_spend_company',
                                                            'Work_accident',
                                                            'promotion_last_5years',
                                                            'indexedSales',
                                                            'indexedSalary')
                            
                            # Creation of a DenseVector variable containing the features via the RDD structure
                            hrRdd = hrNumeric.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
                            
                            # transformation into DataFrame and naming variables to get a base of the form libsvm
                            hrLibsvm = spark.createDataFrame(hrRdd, ['label', 'features'])
                            
                            # Displaying an extract
                            hrLibsvm.sample(False, .001, seed = 222).toPandas()
                        
The conversion of a database to libsvm format is done as follows :
  • Import the function DenseVector
  • Transform each line into a pair containing a label and a feature vector using the method map
  • Transform this rdd into a DataFrame whose variables are named label and features
Application of a Spark ML classifier
Import VectorIndexer from pyspark.ml.feature. Create a featureIndexer transformer indexing features for a maximum number of values: maxCategories = 10

                            from pyspark.ml.feature import VectorIndexer

                            # Creation of a transformer indexing the features
                            featureIndexer = VectorIndexer(inputCol="features",
                                                        outputCol="indexedFeatures",
                                                        maxCategories = 10).fit(hrLibsvm)
                        
Note: A maximum number of discrete values is one of the limitations of Spark. Indeed, the current version of Spark ML (2.2) does not allow the data scientist to select the continuous and categorical features by himself. For example, 'sales' is a categorical variable with 10 different values, but the variable 'number_project' contains integers between 2 and 7. Setting the threshold to 10 would therefore push the algorithm to consider the variable 'number_project' as categorical. The algorithm then loses the order of this variable and considers that the difference between 2 and 3 realized projects is the same as between 2 and 7. Conversely, if the threshold is set to 4, the algorithm creates an artificial order (depending on the frequency of the discrete values) in the variable 'sales'. Faced with this problem, it is necessary to test several thresholds and look at the importance of the variables concerned. You are also invited to look at the documentation Spark ML if this problem has been solved - Spark is indeed very recent and in constant improvement.
Then, Random Forest is the classifier proposed as a learning algorithm. Random forests are composed of a set of decision trees. These trees are distinguished from each other by the subsample of data on which they are trained. These sub-samples are randomly selected from the initial data set. The principle of a random forest is simple: many small classification trees are produced on a random fraction of data. Random Forest votes on the classification trees in order to deduce the order and importance of the explanatory variables. The function RandomForestClassifier() of the package pyspark.ml.classification creates a classifier and takes the following arguments :
  • labelCol : name of the column to be used as a label
  • featuresCol : name of the column to use containing the feature vector
  • predictionCol : name of the column containing the predictions (by default = prediction)
  • seed : fixed integer to make the results reproducible
  • The Random Forest parameters to refine the analysis, available in the documentation of Random Forest.
In the present case, do following:
  • Import the function RandomForestClassifier from the package pyspark.ml.classification
  • Create two transformers labelIndexer and featureIndexer indexing label and features of hrLibsvm
  • Create a Random Forest Classifier, rf, working on indexed versions of features and labels
  • Using IndexToString, create a transformer labelConverter to restore prediction labels
  • Create a Pipeline, pipeline, containing the 4 estimators/transformers created
  • Create two sets train and test containing respectively 70% and 30% of hrLibsvm
  • Create a model model that fits the Pipeline to the database train

                            from pyspark.ml.feature import IndexToString
                            # Import of the RandomForestClassifier from the pyspark.ml.classification package
                            from pyspark.ml.classification import RandomForestClassifier
                            
                            # Creation of transformers
                            labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(hrLibsvm)
                            featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories = 10).fit(hrLibsvm)
                            
                            # Creation of a classifier 
                            rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", predictionCol='prediction', seed = 222)
                            
                            # Creation of a transformer to restore the labels of predictions
                            labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                                                            labels=labelIndexer.labels)
                            
                            # Creating a Pipeline 
                            pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])
                            
                            # Splitting data into two sets: training and test 
                            (train, test) = hrLibsvm.randomSplit([0.7, 0.3], seed = 222)
                            
                            # Training the model using the training data
                            model = pipeline.fit(train)
                        
Model prediction

                            predictions = model.transform(test)

                            # Display of an extract of the predictions  
                            predictions.sample(False, 0.001 , seed = 222).toPandas()
                        
Model evaluation:
Once the model is built, it is important to check the reliability of the predictions, both to compare it with other classification models and to optimize the parameters. For this purpose, there is a submodule pyspark.ml.evaluation containing all the evaluation metrics. In particular, you will find the function MulticlassClassificationEvaluator to evaluate classification models. This function takes 3 main arguments :
  • metricName : the metric to be used, typically : 'accuracy'
  • labelCol : the name of the column to be predicted
  • predictionCol : the name of the prediction column

                        from pyspark.ml.evaluation import MulticlassClassificationEvaluator

                        # Creation of an evaluator 
                        evaluator = MulticlassClassificationEvaluator(metricName='accuracy',
                                                                    labelCol= 'indexedLabel',
                                                                    predictionCol= 'prediction')
                        # Calculation and display of model accuracy 
                        accuracy = evaluator.evaluate(predictions)
                        print(accuracy)
                        
Which gives:
0.963758389261745
The metric accuracy corresponds to the number of correct predictions divided by the number of predictions made. It is therefore between 0 and 1; 0 accuracy corresponds to completely false predictions and an accuracy of 1 corresponds to the absence of errors in the prediction.
Disclaimer: This notebook is a part of my study on various topics related to data enegineering course provided by Datascientist.com Germany.

References

  1. Official Documentation
  2. Databricks Learning Academy
  3. Spark by Examples
  4. Datacamp tutorial.
  5. For databricks, you can look at tutorial videos on youtube at youtube video by Bryan Cafferky, writer of the book "Master Azure Databricks". A great playlist for someone who just want to learn about the big data analytics at Databricks Azure cloud platform.
  6. See the video for pyspark basics by Krish Naik. Great video for starter.
  7. Great youtube on Apache spark one premise working.

Some other interesting things to know: