Scale ML Using PySpark (Part 2)

A collection of examples of how to use MLlib with PySpark for those interesting in running large ML problems.

Juan M. Tirado
8 min readFeb 6, 2024
Photo by Markus Winkler on Unsplash

This is the second part of a collection of examples of how to use the MLlib Spark library with Python. The content in this post is a conversion of this Jupyter notebook.

This second part covers:

  • Linear regression
  • Logistic regression
  • KMeans
  • Topic Modelling using LDA

You can find the first part of this tutorial here.

6. Linear Regression

The Spark MLlib offers a linear regression implementation with L1, L2 and ElasticNet regularization.

from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=10, regParam=0.3)

In a naive example where we have a set of observations corresponding to the equation y=2x+3:


+-----+-----+
| y | x |
+-----+-----+
| 7 | 2 |
| 9 | 3 |
| 23 | 10 |
+-----+-----+

We will train a model based on these observations to predict the output for x=5. Obviously, this will be y=13 (y=2x5 + 3).

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression

session = SparkSession.builder.appName("Example").master("local").getOrCreate()

# our observations
observations = [[7.0 , Vectors.dense ([2.0])] ,
[9.0 , Vectors.dense ([3.0])] ,
[23.0 , Vectors.dense ([10.0])]
]

# We create a dataset as a DataFrame using a colum for label, and a column for the features.
# Observe that in this case, the features are a single column using a Vector.
dataset = session.createDataFrame(observations, ['label', 'features'])

dataset.show()

print("Train our model...")
lr = LinearRegression()
# Train the model
model = lr.fit(dataset)
+-----+--------+
|label|features|
+-----+--------+
| 7.0| [2.0]|
| 9.0| [3.0]|
| 23.0| [10.0]|
+-----+--------+

Train our model…

Now that we have trained our model we can investigate how does it look internally.

# Show fitted model
print("These are the weights for our model:")
print("Coefficients: %s" % model.coefficients)
print("Intercept: %s" % model.intercept)
These are the weights for our model:
Coefficients: [1.9999999999999998]
Intercept: 3.000000000000002

Unsurprisingly, we have a coefficient near 2 and the intercept value is 3. We were trying to model a linear function so it was a straight forward case.

We can check how was the training in terms of error, iterations, etc.

This is the training summary:
Num iterations: 0
Residuals:
+--------------------+
| residuals|
+--------------------+
|-1.77635683940025...|
|-1.77635683940025...|
| 0.0|
+--------------------+

RMSE: 0.000000

Now we are going to predict new outputs. We simply feed the model with dataframes using the same format we used to train it.

# in this case we have the label and the observed feature value
to_predict_items = [[13.0, Vectors.dense([5.0])]]
to_predict = session.createDataFrame(to_predict_items, ['label', 'features'])
predictions = model.transform(to_predict)
predictions.show()
+-----+--------+----------+
|label|features|prediction|
+-----+--------+----------+
| 13.0| [5.0]| 13.0|
+-----+--------+----------+

Because our model is basically perfect, we have a prediction of 13 which is what we were expecting.

In a more complex scenario, we will need to know how good is our model doing. We can check the error using any of the available error evaluators. In this case, for a linear regression we can use the RegressionEvaluator. For our predictions this value will be 0 because we committed no mistakes.

from pyspark.ml.evaluation import RegressionEvaluator
ev = RegressionEvaluator(metricName='rmse')
ev.evaluate(predictions)
0.0

7. Logistic Regression

The Spark MLLib offers a logistic regression implementation for binominal and multinomial problems in the classification package (docs here for more details.)

The example below trains a binary classifier to identify whether a point is contained inside a circle or not.

+------+--------+---------+
| x | y | inside |
+------+--------+---------+
| 0 | 0 | 1 |
| 0 | 2 | 0 |
| 2 | 0 | 0 |
| 1 | 0 | 1 |
| 0 | 1 | 1 |
| 0.3 | 0.87 | 1 |
| 1 | -1.3 | 1 |
| 0.9 | -1.2 | 1 |
+------+--------+---------+

Observations inside and outside the circle.

Similarly to what we did with the linear regression, we define our dataset based on the observations to fit our model.

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

session = SparkSession.builder.appName("Example").master("local").getOrCreate()

observations = [[1.0, Vectors.dense([0.0 ,0.0])],
[0.0, Vectors.dense([0.0 ,2.0])],
[0.0, Vectors.dense([2.0 ,0.0])],
[1.0, Vectors.dense([1.0 ,0.0])],
[1.0, Vectors.dense([0.0 ,1.0])],
[1.0, Vectors.dense([0.3 ,0.87])],
[0.0, Vectors.dense([1.0, -1.3])],
[1.0, Vectors.dense([0.9, -1.2])]]

dataset = session.createDataFrame(observations, ['label', 'features'])
dataset.show()
+-----+----------+
|label| features|
+-----+----------+
| 1.0| [0.0,0.0]|
| 0.0| [0.0,2.0]|
| 0.0| [2.0,0.0]|
| 1.0| [1.0,0.0]|
| 1.0| [0.0,1.0]|
| 1.0|[0.3,0.87]|
| 0.0|[1.0,-1.3]|
| 1.0|[0.9,-1.2]|
+-----+----------+
# Train the model
lr = LogisticRegression()
model = lr.fit(dataset)
# Show model internals
print("Coefficients: %s" % model.coefficients)
print("Intercept: %s" % model.intercept)
Coefficients: [-2.6073306122669933,-1.002969191804841]
Intercept: 2.380173815124733

We can take a look at the errors during the training phase. In this case, we look at the ROC curve, the AUC, and the F-measure by threshold.

We can take a look at the ROC curve and the AUC

# ROC curve
summary = model.summary
summary.roc.show()
# The AUC
print("AUC: %f" % summary.areaUnderROC)
# F-measure by threshold
summary.fMeasureByThreshold.show()
+------------------+---+
| FPR|TPR|
+------------------+---+
| 0.0|0.0|
| 0.0|0.2|
| 0.0|0.4|
| 0.0|0.6|
|0.3333333333333333|0.6|
|0.3333333333333333|0.8|
|0.6666666666666666|0.8|
|0.6666666666666666|1.0|
| 1.0|1.0|
| 1.0|1.0|
+------------------+---+

AUC: 0.800000
+--------------------+-------------------+
| threshold| F-Measure|
+--------------------+-------------------+
| 0.9153029099975847|0.33333333333333337|
| 0.7985416752560925| 0.5714285714285715|
| 0.7750656745222227| 0.7499999999999999|
| 0.7458695786407464| 0.6666666666666665|
| 0.6737931410385649| 0.8000000000000002|
| 0.5924820101887259| 0.7272727272727272|
| 0.44345374176351793| 0.8333333333333333|
|0.055488744288528125| 0.7692307692307693|
+--------------------+-------------------+

The ROC curve looks much nicer if we have a real curve.

import matplotlib.pyplot as pyplot
# Transform it into something more handy
fpr = summary.roc.select('FPR').rdd.map(lambda r: r[0]).collect()
tpr = summary.roc.select('TPR').rdd.map(lambda r: r[0]).collect()
pyplot.plot(fpr, tpr, 'b-')
pyplot.xlabel('FPR')
pyplot.ylabel('TPR')
pyplot.show()

We do not have too many points to draw so our ROC curve looks so steeped.

8. Kmeans Clustering

MLlib offers an implementation of KMeans with centroids initialization using KMeans++.

The following example runs KMeans with k=2.

'''
echo "
x,y
0,0
0,2
2,0
1,0
0,1
0.3,0.87
1,-1.3
0.9,-1.2" > /tmp/kmeans_example.csv
'''

from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

session = SparkSession.builder.appName("Example").master("local").getOrCreate()

# Create the dataset
observations = [[Vectors.dense([0,0])],
[Vectors.dense([0,2])],
[Vectors.dense([2,0])],
[Vectors.dense([1,0])],
[Vectors.dense([0,1])],
[Vectors.dense([0.3,0.87])],
[Vectors.dense([1,-1.3])],
[Vectors.dense([0.9,-1.2])]
]

dataset = session.createDataFrame(observations, ['features'])
dataset.show()

# Configure and fit the model
kmeans = KMeans(k=2)
model = kmeans.fit(dataset)
+----------+
| features|
+----------+
| [0.0,0.0]|
| [0.0,2.0]|
| [2.0,0.0]|
| [1.0,0.0]|
| [0.0,1.0]|
|[0.3,0.87]|
|[1.0,-1.3]|
|[0.9,-1.2]|
+----------+

Once the model is fit, we can take a look at the predicted clusters and the generated centroids.

print("Centroids %s" % model.clusterCenters())
Centroids [array([0.075 , 0.9675]), array([ 1.225, -0.625])]
model.summary.predictions.show()
+----------+----------+
| features|prediction|
+----------+----------+
| [0.0,0.0]| 0|
| [0.0,2.0]| 0|
| [2.0,0.0]| 1|
| [1.0,0.0]| 1|
| [0.0,1.0]| 0|
|[0.3,0.87]| 0|
|[1.0,-1.3]| 1|
|[0.9,-1.2]| 1|
+----------+----------+

For a more visual analysis, we can plot the centroids and the reference circle. There is not too much data for a good clustering. However, the centroids are located inside and outside the circle which in a first attempt looks promising. Obviously the data distribution is a bit biased and forces one of the centroids to the bottom right. How the algorithm would evolve with a more uniformly distributed dataset remains as an additional exercise.

Observe that we need some workaround to get the correct numpy shape for our plot.

import matplotlib.pyplot as pyplot
import numpy as np

# Transform it into something more handy
features = model.summary.predictions.select('features').rdd.map(lambda r: r[0]).collect()
predictions = model.summary.predictions.select('prediction').rdd.map(lambda r: r[0]).collect()

x,y = np.array(features)[:,0], np.array(features)[:,1]

ax = pyplot.gca()
ax.cla()

# Plot the predictions
ax.scatter(x,y,c=predictions)

# Plot the centroids
centroids = np.array(model.clusterCenters())
ax.scatter(x=centroids[:,0], y=centroids[:,1], marker='x')

# Plot the circle as a reference
circle = pyplot.Circle((0,0),1, color='blue', fill=False)
ax.add_patch(circle)

ax.set_xlim((-2.5,2.5))
ax.set_ylim((-2.5,2.5))

# Make a square plot for a better visualization of the circle
ax.set_aspect('equal', adjustable='box')

pyplot.show()

session.stop()
Kmeans classification compared to the unit circle

9. Topic Modelling

The Latent Dirichlet Allocation (LDA) (do not confuse with Linear Discriminant Analysis) is a method for topic modelling. This method can be used to extract the topics contained in a corpus. For example: extract the topics from a collection of users’ opinions, main ideas discussed in wiki-leaks, etc.

LDA represents documents as a mixture that spit out words with certain probabilities. The final model is solved using bayesian inference.

After a certain number of iterations, LDA reaches a steady state with a mixture of topics per document. Every document belongs to the $K$ topics with a certain probability. A plausible example could be as follows:

+---------------------------+-----------------------+----------------------+
| Document | Probability topic 1 | Probability topic 2 |
+---------------------------+-----------------------+----------------------+
| I like apples | 1 | 0 |
| I eat apples | 0 | 1 |
| There is no white apples | 0.8 | 0.2 |
| I don't eat apples | 0.3 | 0.7 |
+---------------------------+-----------------------+----------------------+

Spark MLlib offers and LDA implementation using:
- EMLDAOptimizer: batch approach
- OnlineLDAOptimizer: incremental learning solution

The fitted model offers a description of the $K$ topics and the transformer computes the probability of every document to belong to every topic as described above (ref).

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import IDF, Tokenizer, CountVectorizer

session = SparkSession.builder.appName("Example").master("local").getOrCreate()

corpus = session.createDataFrame ([
(0.0, 'Java has been around for a while'),
(0.0, 'I wish Java could use case classes'),
(0.0, 'Objects belong to classes in Java'),
], ['label', 'sentence'])

# First, get the tokens
tokenizer = Tokenizer(inputCol ='sentence', outputCol ='tokens')
tokens = tokenizer.transform(corpus)
tokens.show(truncate=False)


# Second, compute the frequency of every token. We could remove stop words here.
# We are going to reduce the corpus to a vocabulary of two words for those tokens appearing at least twice in the corpus.
vectorizer = CountVectorizer(inputCol='tokens', outputCol='frequencies', minDF=2.0, vocabSize=2)
model = vectorizer.fit(tokens)
print("This is our vocabulary %s" % model.vocabulary)
frequencies = model.transform(tokens)
frequencies.show()
+-----+----------------------------------+------------------------------------------+
|label|sentence |tokens |
+-----+----------------------------------+------------------------------------------+
|0.0 |Java has been around for a while |[java, has, been, around, for, a, while] |
|0.0 |I wish Java could use case classes|[i, wish, java, could, use, case, classes]|
|0.0 |Objects belong to classes in Java |[objects, belong, to, classes, in, java] |
+-----+----------------------------------+------------------------------------------+

This is our vocabulary ['java', 'classes']
+-----+--------------------+--------------------+-------------------+
|label| sentence| tokens| frequencies|
+-----+--------------------+--------------------+-------------------+
| 0.0|Java has been aro...|[java, has, been,...| (2,[0],[1.0])|
| 0.0|I wish Java could...|[i, wish, java, c...|(2,[0,1],[1.0,1.0])|
| 0.0|Objects belong to...|[objects, belong,...|(2,[0,1],[1.0,1.0])|
+-----+--------------------+--------------------+-------------------+
from pyspark.ml.clustering import LDA
lda = LDA(k=2, maxIter = 10, featuresCol='frequencies', optimizer='em')
lda_model = lda.fit(frequencies)

topics = lda_model.describeTopics(3)
topics.show(truncate=False)

session.stop()
+-----+-----------+----------------------------------------+
|topic|termIndices|termWeights |
+-----+-----------+----------------------------------------+
|0 |[0, 1] |[0.6384828885791629, 0.3615171114208371]|
|1 |[0, 1] |[0.5614431009841063, 0.4385568990158936]|
+-----+-----------+----------------------------------------

--

--

Juan M. Tirado

I help people to create data-based solutions | jmtirado.net | I wrote Build Systems with Go: amazon.com/dp/B091FX4CZX |