First Steps in Machine Learning with Apache Spark
Basic concepts and topics of Spark MLlib package
Introduction
Apache Spark is one of the main tools for data processing and analysis in the BigData context. It’s a very complete (and complex) data processing framework, with functionalities that can be roughly divided into four groups: SparkSQL & DataFrames, the all-purpose data processing needs; Spark Structured Streaming, used to handle data-streams; Spark MLlib, for machine learning and data science and GraphX, the graph processing API.
I’ve already featured the first two in other posts: creating an ETL process for a Data Warehouse and integrating Spark and Kafka for stream processing. Today is the time for the third one — Let’s play with Machine Learning using Spark MLlib.
Machine Learning has a special place in my heart, because it was my entrance door to the data science field and, as probably many of yours, I started it with the classic Scikit-Learn library.
I’ve could write an entire post on why the Scikit-learn library is such a marvelous piece of software. It’s beginner-friendly, easy to use, covers most of the machine learning cycle, has very well-written documentation, and so on.
But why am I talking about this? If you are like me, used to coding with sklearn, keep in mind that the path in Apache Spark is not SO straightforward. It’s not hard but has a steeper learning curve.
Along this post, we’ll learn how to make the ‘full machine learning cycle’ of data preprocessing, feature engineering, model training, and validation with a Hands-On example.
Apache Spark in a Nutshell
Apache Spark is a distributed memory-based data transformation engine. It is geared to operate in distributed environments to parallelize processing between machines, achieving high-performance transformations by using its lazy evaluation philosophy and query optimizations.
And that’s the main reason to learn such a tool — Performance.
Even with optimizations, the Sklearn package (and other python packages) struggle when the dataset gets too big. That’s one of the potential blind spots Spark covers. As it scales horizontally, it is easier to increase the computational power to train models on BigData.
The problem
I’ve chosen the Avocado Price Dataset to make this project. Our task in this dataset is to predict the mean avocado price given the avocado type, date, the amount of avocado bags available and other features. See the dataset page on Kaggle for more information.
Setting up the environment
All you need is docker and docker-compose installed. The code is available on GitHub.
The architecture described (in the docker-compose.yaml) is depicted in the image below.
All the code was developed inside a jupyter/pyspark-notebook container with all the pyspark dependencies already configured.
To start the environment, just run:
docker-compose up
The implementation
Our objective is to learn how to implement our usual ML pipeline using Spark, which covers: Loading data and splitting it into train/test, cleaning the data, preprocessing+feature engineering, model definition, hyperparameter-tuning and final scorings.
The following sections will detail how to make each of these steps.
Connect to Spark
The first thing to do is connect to the Spark cluster, which is quite straightforward.
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# SparkSession
URL_SPARK = "spark://spark:7077"
spark = (
SparkSession.builder
.appName("spark-ml")
.config("executor.memory", "4g")
.master(URL_SPARK)
.getOrCreate()
)
It may take a few seconds in the first run.
Loading the data
Now it’s time to work with the data. This part still has nothing to do with Spark’s MLlib package, just the usual data loading using Spark SQL.
df_avocado = spark.read.csv(
"/data/avocado.csv",
header=True,
inferSchema=True
)
# cache data
df_avocado.cache()
df_avocado.show(4)
As Spark is lazily evaluated, it’s interesting to cache the dataset in memory to speed up the execution of the next steps.
Let’s have a look at the data:
Again, more details about the columns can be found on the original dataset’s Kaggle page.
Let’s move on and split the DataFrame into train (75%) and test (25%) set using the randomSplit() method.
df_avocado_train, df_avocado_test = df_avocado.randomSplit([0.75, 0.25], seed=214)
Preprocess data
Before continuing, let’s know the tools we’ll be using. The Spark MLlib package has two main types of objects: Transformers and Estimators.
A Transformer is an object that’s able to transform Dataframes. They receive a raw DataFrame and return a processed one. Common transformers include PolynomialExpansion, SQLTransformer, and VectorAssembler (very important, discussed later).
Estimators, on the other hand, are objects that need to be fitted/trained on the data to generate a Transformer. These include machine learning predictors (Linear Regression, Logistic Regression, Decision Trees, etc), dimensionality reduction algorithms (PCA, ChiSquare Selector), and also other column transformers (StandardScaler, MinMaxScaler, TF-IDF, etc).
Let’s start by using the SQLTransformer on our Data. This is a powerful transformer, that allows one to select and transform columns using SQL queries.
COLUMNS = ['AveragePrice', 'type']
COLUMNS = [f"`{col}`" for col in COLUMNS]
LOG_COLUMNS = ['4225', '4770', 'Small Bags', 'Large Bags', 'XLarge Bags']
LOG_COLUMNS = [f"LOG(`{col}`+1) AS `LOG {col}`" for col in LOG_COLUMNS]
sql_trans = SQLTransformer(
statement=f"""
SELECT
{', '.join(COLUMNS)}
, {', '.join(LOG_COLUMNS)}
,YEAR(__THIS__.Date)-2000 AS year
,MONTH(__THIS__.Date) AS month
FROM __THIS__
"""
)
# Visualize the data
sql_trans.transform(df_avocado_train).show(4)
The code above selects the AveragePrice and type columns, transforms the numerical columns using the Log function, and creates two new columns extracting the year (after 2000) and the month.
__THIS__ is the default name of the DataFrame currently being transformed.
The result:
Scaling is a prevalent practice in data preprocessing. Let’s scale the month column using the Min-Max scaling technique, putting all values in the [0, 1] interval. The MinMaxScaler is of type estimator so it needs to be fitted to the data before being used to transform it.
Most of the estimators (including all the prediction models) require the input columns to be in Vector form. Vector is a special column type used mostly in Spark MLlib. It is just what the name suggests, a fixed-size array of numbers.
To join columns into a single Vector column, we use the VectorAssembler Transformer.
from pyspark.ml.feature import MinMaxScaler
# Creating a Month vector column
month_vec_ass = VectorAssembler(inputCols=['month'], outputCol='month_vec')
df_avocado_month_ass = month_vec_ass.transform(sql_trans.transform(df_avocado_train))
# Scaling the month column
month_scaler = MinMaxScaler(inputCol='month_vec', outputCol='month_scaled')
month_scaler = month_scaler.fit(df_avocado_month_ass)
month_scaler\
.transform(df_avocado_month_ass)\
.select( ['month', 'month_vec', 'month_scaled'] )\
.show(10)
The image below details the process.
The result:
With these concepts in mind, is just a matter of knowing the available transformers and using them in our pipeline.
For example, the column type has two values, “conventional” and “organic”, that need to be mapped into numbers. The transformer responsible for this is the StringIndexer.
It assigns a numerical value to each category in a column. As the column “type” only has two categories, it will be transformed into a column with only two values: 0 and 1, which is equivalent to applying the one-hot encoding technique.
str_indexer = StringIndexer(inputCol="type", outputCol="type_index")
str_indexer = str_indexer.fit(df_avocado_train)
str_indexer\
.transform(df_avocado_train)\
.select( ["type", "type_index"] )\
.show(4)
From now on I’ll summarize what was done.
The numerical features (all columns except the type_index) generated were assembled in a single Vector called “features_num”, this final vector passes by a StandardScaler.
# Apply transformations
## SQL transformer
df_avocado_train_transformed = sql_trans.transform(df_avocado_train)
## String indexer
df_avocado_train_transformed = str_indexer.transform(df_avocado_train_transformed)
## Month scaler (vector assembler + minmax scaler)
df_avocado_train_transformed = month_vec_ass.transform(df_avocado_train_transformed)
df_avocado_train_transformed = month_scaler.transform(df_avocado_train_transformed)
# Join all features into a single vector
numerical_vec_ass = VectorAssembler(
inputCols=[
'year', 'month_scaled', 'LOG 4225',
'LOG 4770', 'LOG Small Bags',
'LOG Large Bags', 'LOG XLarge Bags'
],
outputCol='features_num'
)
df_avocado_train_transformed = numerical_vec_ass.transform(df_avocado_train_transformed)
# Join all categorical features into a single vector
categorical_vec_ass = VectorAssembler(
inputCols=['type_index'],
outputCol='features_cat'
)
df_avocado_train_transformed = categorical_vec_ass.transform(df_avocado_train_transformed)
# See the result
df_avocado_train_transformed.select(['features_cat', 'features_num', 'AveragePrice']).show(4, False)
# Scaling the numerical features using a StandardScaler
std_scaler = StandardScaler(
inputCol="features_num",
outputCol="features_scaled",
withStd=True,
withMean=True
)
std_scaler = std_scaler.fit(df_avocado_train_transformed)
std_scaler.transform(df_avocado_train_transformed).select(['features_scaled']).show(5, False)
The categorical column “type_index” is then added to the final vector.
The final step is to join all the transformers created into a pipeline. A pipeline is just an object used to encapsulate a set of transformers and estimators to apply them to data sequentially. This helps us to avoid dealing with each intermediate transformation step individually (as we’re doing so far).
# Machine learning pipeline
from pyspark.ml import Pipeline
# Create a preprocessing pipeline
prepro_pipe = Pipeline(stages=[
sql_trans,
str_indexer,
month_vec_ass,
month_scaler,
numerical_vec_ass,
categorical_vec_ass,
std_scaler,
# Join all features into a single vector
VectorAssembler(
inputCols=['features_scaled', 'features_cat'],
outputCol='features'
),
])
# Fit the pipeline
pipeline_model = prepro_pipe.fit(df_avocado_train)
# Transform the data
df_avocado_train_transformed = pipeline_model.transform(df_avocado_train)
# See the result
df_avocado_train_transformed.select(['features', 'AveragePrice']).show(4, False)
The final result:
+--------------------------------------------------------------------------------------------------------------------------------------------+------------+
|features |AveragePrice|
+--------------------------------------------------------------------------------------------------------------------------------------------+------------+
|[-1.2177154955881637,1.6482225355667333,0.9527463109714546,1.0269649008115518,0.5657377199959452,0.8334134211814762,-0.6436162273445295,0.0]|0.49 |
|[-1.2177154955881637,1.6482225355667333,0.7058305701685025,1.0954357394643428,0.7803295242390127,0.8574417380503548,2.012648481596976,0.0] |0.71 |
|[-1.2177154955881637,1.6482225355667333,0.9399552148956506,1.5037797059140563,0.8203168521795554,0.6002078289352569,2.1083545825302594,0.0] |0.8 |
|[-1.2177154955881637,1.6482225355667333,1.1142436751287843,1.5073956355774096,1.4653967110976907,1.0678725104034048,2.0181300922626053,0.0] |0.8 |
+--------------------------------------------------------------------------------------------------------------------------------------------+------------+
only showing top 4 rows
Model training
That’s the moment we’re all waiting for.
After the long path of data preprocessing, all the features are already in their desired final vector form and we’re ready to train the model.
Unfortunately, this part will be very short compared with the previous one ¯\_(ツ)_/¯
As mentioned earlier, ML models are just estimators, so the process repeats: Instantiate, Fit and Transform.
Let’s train a Linear Regression model:
from pyspark.ml.regression import LinearRegression
# Create a linear regression model
lin_reg = LinearRegression(
featuresCol='features',
labelCol='AveragePrice',
predictionCol='prediction',
# Hyperaparameters
maxIter=1000,
regParam=0.3, # Regularization
elasticNetParam=0.8 # Regularization mixing parameter. 1 for L1, 0 for L2.
)
It's necessary to specify the features column, the target/label column, and a name for the prediction column. Just like the other estimators we’ve met, an ML model will just add another column to the DataFrame.
# Fit the model
lin_reg_model = lin_reg.fit(df_avocado_train_transformed)
# See the output
df_avocado_train_pred = lin_reg_model.transform(df_avocado_train_transformed)
df_avocado_train_pred.select(
['features', 'AveragePrice', 'prediction']
).show(4, False)
See the result below:
+--------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------+
|features |AveragePrice|prediction |
+--------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------+
|[-1.2177154955881637,1.6482225355667333,0.9527463109714546,1.0269649008115518,0.5657377199959452,0.8334134211814762,-0.6436162273445295,0.0]|0.49 |1.4003505112793717|
|[-1.2177154955881637,1.6482225355667333,0.7058305701685025,1.0954357394643428,0.7803295242390127,0.8574417380503548,2.012648481596976,0.0] |0.71 |1.4003505112793717|
|[-1.2177154955881637,1.6482225355667333,0.9399552148956506,1.5037797059140563,0.8203168521795554,0.6002078289352569,2.1083545825302594,0.0] |0.8 |1.4003505112793717|
|[-1.2177154955881637,1.6482225355667333,1.1142436751287843,1.5073956355774096,1.4653967110976907,1.0678725104034048,2.0181300922626053,0.0] |0.8 |1.4003505112793717|
+--------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------+
only showing top 4 rows
Model evaluation
To measure the model’s performance, we need an evaluator. I think its name is self-explanatory, it will compute a performance metric between the real labels and the model predictions.
In the cell below, a RegressionEvaluator is instantiated to measure the RMSE (Rooted Mean Squared Error) between predictions and real values (on train data).
from pyspark.ml.evaluation import RegressionEvaluator
reg_eval = RegressionEvaluator(
labelCol='AveragePrice',
predictionCol='prediction',
metricName='rmse' # Root mean squared error
)
# Evaluate the model
reg_eval.evaluate(df_avocado_train_pred)
# Output >> 0.3978489578943717
Hyperparameter Tuning with Cross Validation
Hyperparameter Tuning is one of the last stages in a Machine Learning Pipeline, so our adventure is coming to an end.
In this step, we test several variations of hyperparameters of our model/pipeline to pick the best one according to the chosen metric. A common way of doing this is using a cross-validation technique.
Here, we met the last building blocks of today’s post — The ParamGridBuilder and the CrossValidator.
Starting with the ParamGridBuilder: It is the object used to build a hyperparameter grid.
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
ml_pipeline = Pipeline(stages=[
prepro_pipe, # Preprocessing pipeline
lin_reg # Linear regression model
])
param_grid = ParamGridBuilder() \
.addGrid(lin_reg.regParam, [0.0, 0.1, 0.3, 0.5]) \
.addGrid(lin_reg.elasticNetParam, [0.0, 0.5, 1.0]) \
.build()
In the code above, several values are specified for the linear regression’s regParam and elasticNetParam. It’s important to note that the original object is used to reference the parameters.
The CrossValidator then joins everything together (estimator, hyperparameter grid, and evaluator) …
reg_eval = RegressionEvaluator(
labelCol='AveragePrice',
predictionCol='prediction',
metricName='rmse' # Root mean squared error
)
# Join everything together using a CrossValidator object.
crossval_ml = CrossValidator(
estimator=ml_pipeline,
estimatorParamMaps=param_grid,
evaluator=reg_eval,
numFolds=4
)
… and performs a cross-validation for the given number of folds when the method fit() is called with training data.
crossval_ml_model = crossval_ml.fit(df_avocado_train)
The results are accessed through the fitted Cross Validation object. The code below prints the best model’s name and score.
best_model = crossval_ml_model.bestModel
best_score = crossval_ml_model.avgMetrics[0]
print("Best model: ", best_model)
print("Best score: ", best_score)
# Output >>>
# > Best model: PipelineModel_dc90de555ac1
# > Best score: 0.2833541578138277
Let’s also see the linear regression’s best parameters.
# The last stage in the pipeline is the Linear Regression
best_lin_reg_params = best_model.stages[-1].extractParamMap()
print("Best score (RMSE):", best_score, end="\n\n")
for parameter, value in best_lin_reg_params.items():
print(f"{str(parameter):50s}, {value}")
Output:
Best score (RMSE): 0.2833541578138277
LinearRegression_eeaa1d8bf6ea__aggregationDepth , 2
LinearRegression_eeaa1d8bf6ea__elasticNetParam , 0.0
LinearRegression_eeaa1d8bf6ea__epsilon , 1.35
LinearRegression_eeaa1d8bf6ea__featuresCol , features
LinearRegression_eeaa1d8bf6ea__fitIntercept , True
LinearRegression_eeaa1d8bf6ea__labelCol , AveragePrice
LinearRegression_eeaa1d8bf6ea__loss , squaredError
LinearRegression_eeaa1d8bf6ea__maxBlockSizeInMB , 0.0
LinearRegression_eeaa1d8bf6ea__maxIter , 1000
LinearRegression_eeaa1d8bf6ea__predictionCol , prediction
LinearRegression_eeaa1d8bf6ea__regParam , 0.0
LinearRegression_eeaa1d8bf6ea__solver , auto
LinearRegression_eeaa1d8bf6ea__standardization , True
LinearRegression_eeaa1d8bf6ea__tol , 1e-06
Evaluate the best model on the test set
And all come to this moment. This is the step where we measure the performance of the best model on test data.
Fortunately, there is nothing new to learn here, it’s just a matter of applying the best model to the test data and passing the result to the evaluator.
df_avocado_test_pred = best_model.transform(df_avocado_test)
# show scores
print(reg_eval.evaluate(df_avocado_test_pred))
# Output
# > 0.28368085199676235
The performance was very similar to the one obtained in the cross-validation step.
Conclusion
As ML applications become popular and their requirements become more complex, knowledge of a wider variety of tools with different purposes becomes essential.
In this post, we learned a little about how Apache Spark can be used in the context of Machine Learning through the Spark MLlib module. With a hands-on project, we created a generic ML pipeline, covering the main concepts and basic topics of this module.
Learning a new tool mainly involves becoming familiar with its vocabulary, i.e., understanding the fundamental parts that compose it and how they can be used to solve a problem. Therefore, we focused on understanding the basics of Spark MLlib: Estimators, Transformers, Evaluators, and Pipelines.
I hope this brief post helped you understand how Spark can be used in Machine Learning applications.
As always, this post just scratches the surface of the topics explored, so I strongly encourage further reading, see the references below.
Thank you for reading ;)
References
All the code is available in this GitHub repository.
Data used — Avocado Prices, ODbL v1.0: Open database, Kaggle.
[1] Chambers, B., & Zaharia, M. (2018). Spark: The definitive guide: Big data processing made simple. “ O’Reilly Media, Inc.”
[2] Spark by examples — https://sparkbyexamples.com/
[3] Géron, A. (2022). Hands-on machine learning with Scikit-Learn, Keras, and TensorFlow. “ O’Reilly Media, Inc.”.
[4] Overview: estimators, transformers and pipelines — spark.ml. Spark Official documentation.