WorryFree Computers   »   [go: up one dir, main page]

First Steps in Machine Learning with Apache Spark

Basic concepts and topics of Spark MLlib package

João Pedro
Towards Data Science

--

Photo by Element5 Digital on Unsplash

Introduction

Apache Spark is one of the main tools for data processing and analysis in the BigData context. It’s a very complete (and complex) data processing framework, with functionalities that can be roughly divided into four groups: SparkSQL & DataFrames, the all-purpose data processing needs; Spark Structured Streaming, used to handle data-streams; Spark MLlib, for machine learning and data science and GraphX, the graph processing API.

Spark libraries section on the official documentation. Print by Author.

I’ve already featured the first two in other posts: creating an ETL process for a Data Warehouse and integrating Spark and Kafka for stream processing. Today is the time for the third one — Let’s play with Machine Learning using Spark MLlib.

Machine Learning has a special place in my heart, because it was my entrance door to the data science field and, as probably many of yours, I started it with the classic Scikit-Learn library.

I’ve could write an entire post on why the Scikit-learn library is such a marvelous piece of software. It’s beginner-friendly, easy to use, covers most of the machine learning cycle, has very well-written documentation, and so on.

But why am I talking about this? If you are like me, used to coding with sklearn, keep in mind that the path in Apache Spark is not SO straightforward. It’s not hard but has a steeper learning curve.

Along this post, we’ll learn how to make the ‘full machine learning cycle’ of data preprocessing, feature engineering, model training, and validation with a Hands-On example.

Apache Spark in a Nutshell

Apache Spark is a distributed memory-based data transformation engine. It is geared to operate in distributed environments to parallelize processing between machines, achieving high-performance transformations by using its lazy evaluation philosophy and query optimizations.

And that’s the main reason to learn such a tool — Performance.

Even with optimizations, the Sklearn package (and other python packages) struggle when the dataset gets too big. That’s one of the potential blind spots Spark covers. As it scales horizontally, it is easier to increase the computational power to train models on BigData.

The problem

I’ve chosen the Avocado Price Dataset to make this project. Our task in this dataset is to predict the mean avocado price given the avocado type, date, the amount of avocado bags available and other features. See the dataset page on Kaggle for more information.

Setting up the environment

All you need is docker and docker-compose installed. The code is available on GitHub.

The architecture described (in the docker-compose.yaml) is depicted in the image below.

Project’s architecture & Docker Containers. Image by Author.

All the code was developed inside a jupyter/pyspark-notebook container with all the pyspark dependencies already configured.

To start the environment, just run:

docker-compose up

The implementation

Our objective is to learn how to implement our usual ML pipeline using Spark, which covers: Loading data and splitting it into train/test, cleaning the data, preprocessing+feature engineering, model definition, hyperparameter-tuning and final scorings.

The following sections will detail how to make each of these steps.

Connect to Spark

The first thing to do is connect to the Spark cluster, which is quite straightforward.

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# SparkSession
URL_SPARK = "spark://spark:7077"

spark = (
SparkSession.builder
.appName("spark-ml")
.config("executor.memory", "4g")
.master(URL_SPARK)
.getOrCreate()
)

It may take a few seconds in the first run.

Loading the data

Now it’s time to work with the data. This part still has nothing to do with Spark’s MLlib package, just the usual data loading using Spark SQL.

df_avocado = spark.read.csv(
"/data/avocado.csv",
header=True,
inferSchema=True
)

# cache data
df_avocado.cache()
df_avocado.show(4)

As Spark is lazily evaluated, it’s interesting to cache the dataset in memory to speed up the execution of the next steps.

Let’s have a look at the data:

Again, more details about the columns can be found on the original dataset’s Kaggle page.

Let’s move on and split the DataFrame into train (75%) and test (25%) set using the randomSplit() method.

df_avocado_train, df_avocado_test = df_avocado.randomSplit([0.75, 0.25], seed=214)

Preprocess data

Before continuing, let’s know the tools we’ll be using. The Spark MLlib package has two main types of objects: Transformers and Estimators.

A Transformer is an object that’s able to transform Dataframes. They receive a raw DataFrame and return a processed one. Common transformers include PolynomialExpansion, SQLTransformer, and VectorAssembler (very important, discussed later).

Estimators, on the other hand, are objects that need to be fitted/trained on the data to generate a Transformer. These include machine learning predictors (Linear Regression, Logistic Regression, Decision Trees, etc), dimensionality reduction algorithms (PCA, ChiSquare Selector), and also other column transformers (StandardScaler, MinMaxScaler, TF-IDF, etc).

Transformers and Estimators on Spark MLlib. Image by Author.

Let’s start by using the SQLTransformer on our Data. This is a powerful transformer, that allows one to select and transform columns using SQL queries.

COLUMNS = ['AveragePrice', 'type']
COLUMNS = [f"`{col}`" for col in COLUMNS]

LOG_COLUMNS = ['4225', '4770', 'Small Bags', 'Large Bags', 'XLarge Bags']
LOG_COLUMNS = [f"LOG(`{col}`+1) AS `LOG {col}`" for col in LOG_COLUMNS]

sql_trans = SQLTransformer(
statement=f"""

SELECT
{', '.join(COLUMNS)}
, {', '.join(LOG_COLUMNS)}
,YEAR(__THIS__.Date)-2000 AS year
,MONTH(__THIS__.Date) AS month

FROM __THIS__

"""
)

# Visualize the data
sql_trans.transform(df_avocado_train).show(4)

The code above selects the AveragePrice and type columns, transforms the numerical columns using the Log function, and creates two new columns extracting the year (after 2000) and the month.

__THIS__ is the default name of the DataFrame currently being transformed.

The result:

Scaling is a prevalent practice in data preprocessing. Let’s scale the month column using the Min-Max scaling technique, putting all values in the [0, 1] interval. The MinMaxScaler is of type estimator so it needs to be fitted to the data before being used to transform it.

Most of the estimators (including all the prediction models) require the input columns to be in Vector form. Vector is a special column type used mostly in Spark MLlib. It is just what the name suggests, a fixed-size array of numbers.

To join columns into a single Vector column, we use the VectorAssembler Transformer.

Vector Assembler in action. Image by Author.
from pyspark.ml.feature import MinMaxScaler

# Creating a Month vector column
month_vec_ass = VectorAssembler(inputCols=['month'], outputCol='month_vec')

df_avocado_month_ass = month_vec_ass.transform(sql_trans.transform(df_avocado_train))

# Scaling the month column
month_scaler = MinMaxScaler(inputCol='month_vec', outputCol='month_scaled')
month_scaler = month_scaler.fit(df_avocado_month_ass)

month_scaler\
.transform(df_avocado_month_ass)\
.select( ['month', 'month_vec', 'month_scaled'] )\
.show(10)

The image below details the process.

The process to apply the MinMaxScaler to a column. Image by Author.

The result:

With these concepts in mind, is just a matter of knowing the available transformers and using them in our pipeline.

For example, the column type has two values, “conventional” and “organic”, that need to be mapped into numbers. The transformer responsible for this is the StringIndexer.

It assigns a numerical value to each category in a column. As the column “type” only has two categories, it will be transformed into a column with only two values: 0 and 1, which is equivalent to applying the one-hot encoding technique.

str_indexer = StringIndexer(inputCol="type", outputCol="type_index")

str_indexer = str_indexer.fit(df_avocado_train)

str_indexer\
.transform(df_avocado_train)\
.select( ["type", "type_index"] )\
.show(4)

From now on I’ll summarize what was done.

The numerical features (all columns except the type_index) generated were assembled in a single Vector called “features_num”, this final vector passes by a StandardScaler.

# Apply transformations
## SQL transformer
df_avocado_train_transformed = sql_trans.transform(df_avocado_train)

## String indexer
df_avocado_train_transformed = str_indexer.transform(df_avocado_train_transformed)

## Month scaler (vector assembler + minmax scaler)
df_avocado_train_transformed = month_vec_ass.transform(df_avocado_train_transformed)
df_avocado_train_transformed = month_scaler.transform(df_avocado_train_transformed)


# Join all features into a single vector
numerical_vec_ass = VectorAssembler(
inputCols=[
'year', 'month_scaled', 'LOG 4225',
'LOG 4770', 'LOG Small Bags',
'LOG Large Bags', 'LOG XLarge Bags'
],
outputCol='features_num'
)
df_avocado_train_transformed = numerical_vec_ass.transform(df_avocado_train_transformed)

# Join all categorical features into a single vector
categorical_vec_ass = VectorAssembler(
inputCols=['type_index'],
outputCol='features_cat'
)
df_avocado_train_transformed = categorical_vec_ass.transform(df_avocado_train_transformed)


# See the result
df_avocado_train_transformed.select(['features_cat', 'features_num', 'AveragePrice']).show(4, False)
# Scaling the numerical features using a StandardScaler
std_scaler = StandardScaler(
inputCol="features_num",
outputCol="features_scaled",
withStd=True,
withMean=True
)

std_scaler = std_scaler.fit(df_avocado_train_transformed)
std_scaler.transform(df_avocado_train_transformed).select(['features_scaled']).show(5, False)

The categorical column “type_index” is then added to the final vector.

The final step is to join all the transformers created into a pipeline. A pipeline is just an object used to encapsulate a set of transformers and estimators to apply them to data sequentially. This helps us to avoid dealing with each intermediate transformation step individually (as we’re doing so far).

# Machine learning pipeline
from pyspark.ml import Pipeline

# Create a preprocessing pipeline
prepro_pipe = Pipeline(stages=[
sql_trans,
str_indexer,
month_vec_ass,
month_scaler,
numerical_vec_ass,
categorical_vec_ass,
std_scaler,

# Join all features into a single vector
VectorAssembler(
inputCols=['features_scaled', 'features_cat'],
outputCol='features'
),
])


# Fit the pipeline
pipeline_model = prepro_pipe.fit(df_avocado_train)

# Transform the data
df_avocado_train_transformed = pipeline_model.transform(df_avocado_train)

# See the result
df_avocado_train_transformed.select(['features', 'AveragePrice']).show(4, False)

The final result:

+--------------------------------------------------------------------------------------------------------------------------------------------+------------+
|features |AveragePrice|
+--------------------------------------------------------------------------------------------------------------------------------------------+------------+
|[-1.2177154955881637,1.6482225355667333,0.9527463109714546,1.0269649008115518,0.5657377199959452,0.8334134211814762,-0.6436162273445295,0.0]|0.49 |
|[-1.2177154955881637,1.6482225355667333,0.7058305701685025,1.0954357394643428,0.7803295242390127,0.8574417380503548,2.012648481596976,0.0] |0.71 |
|[-1.2177154955881637,1.6482225355667333,0.9399552148956506,1.5037797059140563,0.8203168521795554,0.6002078289352569,2.1083545825302594,0.0] |0.8 |
|[-1.2177154955881637,1.6482225355667333,1.1142436751287843,1.5073956355774096,1.4653967110976907,1.0678725104034048,2.0181300922626053,0.0] |0.8 |
+--------------------------------------------------------------------------------------------------------------------------------------------+------------+
only showing top 4 rows

Model training

That’s the moment we’re all waiting for.

After the long path of data preprocessing, all the features are already in their desired final vector form and we’re ready to train the model.

Unfortunately, this part will be very short compared with the previous one ¯\_(ツ)_/¯

As mentioned earlier, ML models are just estimators, so the process repeats: Instantiate, Fit and Transform.

Let’s train a Linear Regression model:

from pyspark.ml.regression import LinearRegression

# Create a linear regression model
lin_reg = LinearRegression(
featuresCol='features',
labelCol='AveragePrice',
predictionCol='prediction',

# Hyperaparameters
maxIter=1000,
regParam=0.3, # Regularization
elasticNetParam=0.8 # Regularization mixing parameter. 1 for L1, 0 for L2.
)

It's necessary to specify the features column, the target/label column, and a name for the prediction column. Just like the other estimators we’ve met, an ML model will just add another column to the DataFrame.

A Machine Learning model is just an estimator. Image by Author.
# Fit the model
lin_reg_model = lin_reg.fit(df_avocado_train_transformed)

# See the output
df_avocado_train_pred = lin_reg_model.transform(df_avocado_train_transformed)
df_avocado_train_pred.select(
['features', 'AveragePrice', 'prediction']
).show(4, False)

See the result below:

+--------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------+
|features |AveragePrice|prediction |
+--------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------+
|[-1.2177154955881637,1.6482225355667333,0.9527463109714546,1.0269649008115518,0.5657377199959452,0.8334134211814762,-0.6436162273445295,0.0]|0.49 |1.4003505112793717|
|[-1.2177154955881637,1.6482225355667333,0.7058305701685025,1.0954357394643428,0.7803295242390127,0.8574417380503548,2.012648481596976,0.0] |0.71 |1.4003505112793717|
|[-1.2177154955881637,1.6482225355667333,0.9399552148956506,1.5037797059140563,0.8203168521795554,0.6002078289352569,2.1083545825302594,0.0] |0.8 |1.4003505112793717|
|[-1.2177154955881637,1.6482225355667333,1.1142436751287843,1.5073956355774096,1.4653967110976907,1.0678725104034048,2.0181300922626053,0.0] |0.8 |1.4003505112793717|
+--------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------+
only showing top 4 rows

Model evaluation

To measure the model’s performance, we need an evaluator. I think its name is self-explanatory, it will compute a performance metric between the real labels and the model predictions.

Evaluator in action. Image by Author.

In the cell below, a RegressionEvaluator is instantiated to measure the RMSE (Rooted Mean Squared Error) between predictions and real values (on train data).

from pyspark.ml.evaluation import RegressionEvaluator

reg_eval = RegressionEvaluator(
labelCol='AveragePrice',
predictionCol='prediction',
metricName='rmse' # Root mean squared error
)

# Evaluate the model
reg_eval.evaluate(df_avocado_train_pred)

# Output >> 0.3978489578943717

Hyperparameter Tuning with Cross Validation

Hyperparameter Tuning is one of the last stages in a Machine Learning Pipeline, so our adventure is coming to an end.

In this step, we test several variations of hyperparameters of our model/pipeline to pick the best one according to the chosen metric. A common way of doing this is using a cross-validation technique.

Here, we met the last building blocks of today’s post — The ParamGridBuilder and the CrossValidator.

Starting with the ParamGridBuilder: It is the object used to build a hyperparameter grid.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


ml_pipeline = Pipeline(stages=[
prepro_pipe, # Preprocessing pipeline
lin_reg # Linear regression model
])


param_grid = ParamGridBuilder() \
.addGrid(lin_reg.regParam, [0.0, 0.1, 0.3, 0.5]) \
.addGrid(lin_reg.elasticNetParam, [0.0, 0.5, 1.0]) \
.build()

In the code above, several values are specified for the linear regression’s regParam and elasticNetParam. It’s important to note that the original object is used to reference the parameters.

The CrossValidator then joins everything together (estimator, hyperparameter grid, and evaluator) …

reg_eval = RegressionEvaluator(
labelCol='AveragePrice',
predictionCol='prediction',
metricName='rmse' # Root mean squared error
)

# Join everything together using a CrossValidator object.
crossval_ml = CrossValidator(
estimator=ml_pipeline,
estimatorParamMaps=param_grid,
evaluator=reg_eval,
numFolds=4
)

… and performs a cross-validation for the given number of folds when the method fit() is called with training data.

crossval_ml_model = crossval_ml.fit(df_avocado_train)

The results are accessed through the fitted Cross Validation object. The code below prints the best model’s name and score.

best_model = crossval_ml_model.bestModel
best_score = crossval_ml_model.avgMetrics[0]

print("Best model: ", best_model)
print("Best score: ", best_score)


# Output >>>
# > Best model: PipelineModel_dc90de555ac1
# > Best score: 0.2833541578138277

Let’s also see the linear regression’s best parameters.

# The last stage in the pipeline is the Linear Regression
best_lin_reg_params = best_model.stages[-1].extractParamMap()

print("Best score (RMSE):", best_score, end="\n\n")
for parameter, value in best_lin_reg_params.items():
print(f"{str(parameter):50s}, {value}")

Output:

Best score (RMSE): 0.2833541578138277

LinearRegression_eeaa1d8bf6ea__aggregationDepth , 2
LinearRegression_eeaa1d8bf6ea__elasticNetParam , 0.0
LinearRegression_eeaa1d8bf6ea__epsilon , 1.35
LinearRegression_eeaa1d8bf6ea__featuresCol , features
LinearRegression_eeaa1d8bf6ea__fitIntercept , True
LinearRegression_eeaa1d8bf6ea__labelCol , AveragePrice
LinearRegression_eeaa1d8bf6ea__loss , squaredError
LinearRegression_eeaa1d8bf6ea__maxBlockSizeInMB , 0.0
LinearRegression_eeaa1d8bf6ea__maxIter , 1000
LinearRegression_eeaa1d8bf6ea__predictionCol , prediction
LinearRegression_eeaa1d8bf6ea__regParam , 0.0
LinearRegression_eeaa1d8bf6ea__solver , auto
LinearRegression_eeaa1d8bf6ea__standardization , True
LinearRegression_eeaa1d8bf6ea__tol , 1e-06

Evaluate the best model on the test set

And all come to this moment. This is the step where we measure the performance of the best model on test data.

Fortunately, there is nothing new to learn here, it’s just a matter of applying the best model to the test data and passing the result to the evaluator.

df_avocado_test_pred = best_model.transform(df_avocado_test)

# show scores
print(reg_eval.evaluate(df_avocado_test_pred))

# Output
# > 0.28368085199676235

The performance was very similar to the one obtained in the cross-validation step.

Conclusion

As ML applications become popular and their requirements become more complex, knowledge of a wider variety of tools with different purposes becomes essential.

In this post, we learned a little about how Apache Spark can be used in the context of Machine Learning through the Spark MLlib module. With a hands-on project, we created a generic ML pipeline, covering the main concepts and basic topics of this module.

Learning a new tool mainly involves becoming familiar with its vocabulary, i.e., understanding the fundamental parts that compose it and how they can be used to solve a problem. Therefore, we focused on understanding the basics of Spark MLlib: Estimators, Transformers, Evaluators, and Pipelines.

I hope this brief post helped you understand how Spark can be used in Machine Learning applications.

As always, this post just scratches the surface of the topics explored, so I strongly encourage further reading, see the references below.

Thank you for reading ;)

References

All the code is available in this GitHub repository.
Data used — Avocado Prices, ODbL v1.0: Open database, Kaggle.

[1] Chambers, B., & Zaharia, M. (2018). Spark: The definitive guide: Big data processing made simple. “ O’Reilly Media, Inc.”
[2] Spark by examples — https://sparkbyexamples.com/
[3] Géron, A. (2022). Hands-on machine learning with Scikit-Learn, Keras, and TensorFlow. “ O’Reilly Media, Inc.”.
[4] Overview: estimators, transformers and pipelines — spark.ml. Spark Official documentation.

--

--

Bachelor of IT at UFRN. Graduate of BI at UFRN — IMD. Strongly interested in Machine Learning, Data Science and Data Engineering.