Machine learning with Pyspark: Use of ML pipeline
Introduction
A Spark ML Pipeline is an API provided by Apache Spark for building and deploying end-to-end machine learning workflows. It allows you to chain together multiple stages of machine learning algorithms and data transformations into a single pipeline, making it easier to manage and deploy complex machine learning models.- Step-1: Import necessary libraries:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
# Define feature vector assembler
feature_cols = ["feature1", "feature2", ...]
feature_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
# Define label indexer (if target variable is categorical)
label_indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
# Define classifier (e.g., RandomForestClassifier)
classifier = RandomForestClassifier(featuresCol="features", labelCol="indexedLabel", seed=42)
pipeline = Pipeline(stages=[feature_assembler, label_indexer, classifier])
pipeline_model = pipeline.fit(train_data)
predictions = pipeline_model.transform(test_data)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="indexedLabel")
accuracy = evaluator.evaluate(predictions)
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
param_grid = ParamGridBuilder() \
.addGrid(classifier.maxDepth, [5, 10, 15]) \
.addGrid(classifier.numTrees, [10, 20, 30]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=3)
cv_model = crossval.fit(train_data)
Objective
In this project, the objective is to develop a classification machine learning model using Spark MLlib's pipeline framework. The goal is to leverage the power of Spark's distributed computing capabilities to build a robust and scalable pipeline that can efficiently preprocess the HR dataset and train a classification algorithm to predict employee terminations. By implementing the pipeline, we aim to explore the relationships between various features such as employee demographics, performance metrics, and engagement scores, and the likelihood of termination. Through this analysis, we seek to achieve a high level of accuracy in predicting employee terminations, thereby providing valuable insights for HR management and facilitating proactive decision-making in employee retention strategies.
About data: In the present project, we have considered following datasets on at Kaggle on "Human Resources Dataset". This is dataset useful for HR in a company. A company records different parameters of an employee (such as satisfaction level, Salary, number of promotion, left the company etc.) . This dataset can be used to predict whether an employee leave the company or stay in the company.
Following are the data columns:
satisfaction_level
: Level of employee satisfaction.last_evaluation
: Last performance evaluation score.number_project
: Number of projects assigned to the employee.average_montly_hours
: Average number of monthly work hours.time_spend_company
: Number of years spent in the company.Work_accident
: Indicator for whether the employee had a work accident (e.g., 1 for yes, 0 for no).left
: Indicator for whether the employee left the company (e.g., 1 for yes, 0 for no).promotion_last_5
: Indicator for whether the employee was promoted in the last 5 years (e.g., 1 for yes, 0 for no).years
: Number of years the employee has been with the company.Department
: Department in which the employee works.salary
: Level of salary for the employee (e.g., low, medium, high).
Building the SparkSession
and loading the datasets:
from pyspark.sql import SparkSession
from pyspark import SparkContext
# Defining a SparkContext locally
sc = SparkContext.getOrCreate()
# Building a Spark Session
spark = SparkSession \
.builder \
.appName("Pipelines Spark ML") \
.getOrCreate()
spark
Loading the datasets
hr = spark.read.csv('HR_comma_sep.csv', header = True)
# Displaying an extract from the data frame
hr.sample(False, 0.001 , seed = 222).toPandas()
Selecting the Categorical variables:
Before training classification models, it is important to understand that thesvmlib
format does not support Strings
. It is therefore necessary to use an indexer,
i.e. a model to transform a categorical variable into a series of indices. In the data column left
is the variable to be predicted, indicates that if the employee left the company voluntarily or not.
hr = hr.select( 'left',
'satisfaction_level',
'last_evaluation',
'number_project',
'average_montly_hours',
'time_spend_company',
'Work_accident',
'promotion_last_5years',
'sales',
'salary'
)
# Display of a description of the variables
hr.describe().toPandas()
StringIndexer
is a feature transformer in Apache Spark MLlib that is commonly used for handling categorical variables. It indexes each distinct string value in a column
to a numerical index. This transformation is useful for algorithms that expect numerical input features, such as decision trees or logistic regression. It is to be noted that the input column should contain
string values representing categorical variables (e.g., department names, job titles, etc.). The output column will contain numerical
indices corresponding to the string values in the input column. The StringIndexer
assigns a unique numerical index to each distinct string value in the input column.
The indices start from 0 and increase sequentially for each unique string value. After indexing, the transformed numerical indices can be used as input features for machine learning algorithms. However,
some algorithms may require additional processing, such as one-hot encoding for handling categorical variables properly.
from pyspark.ml.feature import StringIndexer
# Define the StringIndexer transformer
string_indexer = StringIndexer(inputCol="department", outputCol="department_index")
# Fit the StringIndexer to the input data
indexed_data = string_indexer.fit(input_data).transform(input_data)
# Show the transformed data
indexed_data.show()
In this example, the StringIndexer is applied to the "department" column in the input data, and the transformed output is stored in a new column named "department_index". Each distinct department name in
the "department" column will be assigned a unique numerical index in the "department_index" column.
The
StringIndexer
is used in two steps:
- Create an indexer by specifying the input and output columns (
inputCol=, outputCol=
) and search the database for discrete values using thefit
method. - Apply the indexer to the database using the method transform
from pyspark.ml.feature import StringIndexer
# Creation of an indexer transforming a dirty variable into indexedSales
salesIndexer = StringIndexer(inputCol='sales', outputCol='indexedSales').fit(hr)
# Creating a DataFrame hrSalesIndexed indexing the variable sales
hrSalesIndexed = salesIndexer.transform(hr)
# Displaying an extract from the hrSalesIndexed data frame
hrSalesIndexed.sample(False, 0.001 , seed = 222).toPandas()
An indexer built this way keeps in memory the indexed values. The IndexToString
function allows you to find an original variable from an indexed variable, or from a prediction of this variable. It works with the following arguments:
inputCol
: the name of the indexed input columnoutputCol
: the name of the output column to be rebuiltlabels
: the location of the labels
### Insert your code here
from pyspark.ml.feature import IndexToString
# Creating a new salesReconstructed column
SalesReconstructor = IndexToString(inputCol='indexedSales',
outputCol='salesReconstructed',
labels = salesIndexer.labels)
# Apply the SalesReconstructor transformer
hrSalesReconstructed = SalesReconstructor.transform(hrSalesIndexed)
# Displaying an extract from the database
hrSalesReconstructed.sample(False, 0.001 , seed = 222).toPandas()
Here we create a variable salesReconstructed
from indexedSales
and a new table hrSalesReconstructed
is created.
Pipelines
A pipeline is a sequence of stages that are executed in a specific order to process and transform data. Each stage typically represents a transformation or an estimator (machine learning algorithm) that operates on the data. The pipeline allows you to chain together multiple data processing and modeling steps, making it easy to manage and deploy complex machine learning workflows. A example syntex is given by:Pipeline(stages=[estimator1, estimator2, estimator3, ...])
.
from pyspark.ml import Pipeline
# Creation of indexers
SalesIndexer = StringIndexer(inputCol='sales', outputCol='indexedSales')
SalaryIndexer = StringIndexer(inputCol='salary', outputCol='indexedSalary')
# Creating a pipeline
indexer = Pipeline(stages = [SalaryIndexer, SalesIndexer])
# Index the variables of "hr"
hrIndexed = indexer.fit(hr).transform(hr)
# Displaying an extract
hrIndexed.sample(False, 0.001 , seed = 222).toPandas()
This will first import Pipeline
from pyspark.ml
and then a >code>SalesIndexer trasnfroming a sales variable from indexedSales
.
Similarly, a SalesIndexer is also created for Salary variable indo indexedSalary
. A pipeline is created using Pipeline
Formatting the database in svmlib
format
At this point, the variables are of numerical type. It is therefore possible to transform the database. First of all, it is necessary to exclude the non-indexed versions of the variables.
from pyspark.ml.linalg import DenseVector
# Creation of a database excluding non-indexed variables
hrNumeric = hrIndexed.select('left',
'satisfaction_level',
'last_evaluation',
'number_project',
'average_montly_hours',
'time_spend_company',
'Work_accident',
'promotion_last_5years',
'indexedSales',
'indexedSalary')
# Creation of a DenseVector variable containing the features via the RDD structure
hrRdd = hrNumeric.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
# transformation into DataFrame and naming variables to get a base of the form libsvm
hrLibsvm = spark.createDataFrame(hrRdd, ['label', 'features'])
# Displaying an extract
hrLibsvm.sample(False, .001, seed = 222).toPandas()
The conversion of a database to libsvm format is done as follows :
- Import the function DenseVector
- Transform each line into a pair containing a label and a feature vector using the method map
- Transform this rdd into a DataFrame whose variables are named label and features
Application of a Spark ML classifier
ImportVectorIndexer
from pyspark.ml.feature
. Create a featureIndexer transformer indexing features for a maximum number of values: maxCategories = 10
from pyspark.ml.feature import VectorIndexer
# Creation of a transformer indexing the features
featureIndexer = VectorIndexer(inputCol="features",
outputCol="indexedFeatures",
maxCategories = 10).fit(hrLibsvm)
labelCol
: name of the column to be used as a labelfeaturesCol
: name of the column to use containing the feature vectorpredictionCol
: name of the column containing the predictions (by default = prediction)seed
: fixed integer to make the results reproducible- The Random Forest parameters to refine the analysis, available in the documentation of Random Forest.
- Import the function RandomForestClassifier from the package pyspark.ml.classification
- Create two transformers labelIndexer and featureIndexer indexing label and features of hrLibsvm
- Create a Random Forest Classifier, rf, working on indexed versions of features and labels
- Using IndexToString, create a transformer labelConverter to restore prediction labels
- Create a Pipeline, pipeline, containing the 4 estimators/transformers created
- Create two sets train and test containing respectively 70% and 30% of hrLibsvm
- Create a model model that fits the Pipeline to the database train
from pyspark.ml.feature import IndexToString
# Import of the RandomForestClassifier from the pyspark.ml.classification package
from pyspark.ml.classification import RandomForestClassifier
# Creation of transformers
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(hrLibsvm)
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories = 10).fit(hrLibsvm)
# Creation of a classifier
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", predictionCol='prediction', seed = 222)
# Creation of a transformer to restore the labels of predictions
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
labels=labelIndexer.labels)
# Creating a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])
# Splitting data into two sets: training and test
(train, test) = hrLibsvm.randomSplit([0.7, 0.3], seed = 222)
# Training the model using the training data
model = pipeline.fit(train)
Model prediction
predictions = model.transform(test)
# Display of an extract of the predictions
predictions.sample(False, 0.001 , seed = 222).toPandas()
Model evaluation:
Once the model is built, it is important to check the reliability of the predictions, both to compare it with other classification models and to optimize the parameters. For this purpose, there is a submodule pyspark.ml.evaluation containing all the evaluation metrics. In particular, you will find the function MulticlassClassificationEvaluator to evaluate classification models. This function takes 3 main arguments :metricName
: the metric to be used, typically : 'accuracy'labelCol
: the name of the column to be predictedpredictionCol
: the name of the prediction column
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Creation of an evaluator
evaluator = MulticlassClassificationEvaluator(metricName='accuracy',
labelCol= 'indexedLabel',
predictionCol= 'prediction')
# Calculation and display of model accuracy
accuracy = evaluator.evaluate(predictions)
print(accuracy)
Which gives:
0.963758389261745The metric accuracy corresponds to the number of correct predictions divided by the number of predictions made. It is therefore between 0 and 1; 0 accuracy corresponds to completely false predictions and an accuracy of 1 corresponds to the absence of errors in the prediction.
References
- Official Documentation
- Databricks Learning Academy
- Spark by Examples
- Datacamp tutorial.
- For databricks, you can look at tutorial videos on youtube at youtube video by Bryan Cafferky, writer of the book "Master Azure Databricks". A great playlist for someone who just want to learn about the big data analytics at Databricks Azure cloud platform.
- See the video for pyspark basics by Krish Naik. Great video for starter.
- Great youtube on Apache spark one premise working.
Some other interesting things to know:
- Visit my website on For Data, Big Data, Data-modeling, Datawarehouse, SQL, cloud-compute.
- Visit my website on Data engineering