Scale ML Using PySpark (Part 1)

A collection of examples of how to use MLlib with PySpark for those interesting in running large ML problems.

Juan M. Tirado
15 min readJan 27, 2024
Photo by Markus Winkler on Unsplash

This is the first part of a collection of examples of how to use the MLlib Spark library with Python. The content in this post is a conversion of this Jupyter notebook.

1. Introduction

This notebook is a collection of examples that illustrate how to use PySpark with MLlib. This whole collection of examples is intended to be a gentle introduction to those interested in the topic, want to have additional examples, or simply are curious about how to start with this library.

The following skills are expected to follow these examples:

  • Python
  • Understanding Spark structures (Dataframes, RDD)
  • Basic ML notions. This is not intended to be a ML course although, you can find some theoretical explanations.

The examples are designed to work with a simple local environment using the MLlib Dataframe. The MLlib RDD-based API is now in maintenance (see here). This is why you will see that the main import statement comes from pyspark.ml not pyspark.mllib.

You will need a spark environment to be available in your local path. Refer here to the official guide for more details.

You will need java to be available on your local path. Check it out running java --version. If nothing is displayed, check out how to install Java on your machine (here). Next, you can easily set up a local environment and install the dependencies used in this notebook.

virtualenv env
source env/bin/activate
pip install urllib3 numpy matplotlib pyspark

Wait until the dependencies are already satisfied. If you are not reading these lines from a Jupyter Notebook :) install and run it.

pip install jupyter
jupyter-notebook

2. Data types

Spark Dataframes support a collection of rows containing elements with different data types. In the context of ML algorithms, data types such as boolean, string or even integer are not the expected input for most ML algorithms. In this sense, MLlib supports data types such as vectors or matrices.

2.1 Dense Vectors

A dense vector is an array. PySpark uses numpy to run algebraical operations.

Docs

from pyspark.ml.linalg import DenseVector

a = DenseVector([0,1,2,3,4])
b = DenseVector([10,10,10,10,10])
print('Sum: ', a + b)
print('Difference: ', a - b)
print('Multiplication: ', a * 2)
print('Division: ', b / 2)
print('Non-zeros: ', a.nonzero())
print('Squared distance: ', a.squared_distance(b))
Sum:  [10.0,11.0,12.0,13.0,14.0]
Difference: [-10.0,-9.0,-8.0,-7.0,-6.0]
Multiplication: [0.0,2.0,4.0,6.0,8.0]
Division: [5.0,5.0,5.0,5.0,5.0]
Non-zeros: (array([1, 2, 3, 4]),)
Squared distance: 330.0

2.2 Sparse Vectors

Sparse vectors are designed to represent those vectors where a large number of elements is expected to be zero. These vectors are defined by specifying which positions of the array are different from zero and the assigned values. In the following vector:

SparseVector (5 ,[0 ,2 ,4] ,[1 ,3 ,5])

we have five elements with entries 0, 2, and 4 take values 1, 3, and 5.

Docs

from pyspark.ml.linalg import SparseVector

sparse_vector = SparseVector (5 ,[0 ,2 ,4] ,[1 ,3 ,5])
print('Sparse vector: ', sparse_vector.toArray ())
print('Indices: ', sparse_vector.indices )
print('Non zeros: ', sparse_vector.numNonzeros ())
Sparse vector:  [1. 0. 3. 0. 5.]
Indices: [0 2 4]
Non zeros: 3

3. Input/Output

We can expect datasets to be available from different storage sources:

  • Hard disks
  • HDFS
  • Databases
  • Others

The SparkSession object facilitates the load of data from these sources under different formats (CSV, JSON, text, parquet, databases, etc.). We will show examples for CSV, libSVM, and images.

3.1 CSV

Let’s assume the following dataset in a CSV format:

csv
label,f1,f2,f3,f4
0,0,"one",2.0,true
1,4,"five",6.0,false

We instantiate a SparkSession object and load the dataset indicating that we have a header and the separation character.

'''
For this example we need the dataset.csv file to be available.

Copy and paste the following lines:

echo "\
label,f1,f2,f3,f4
0,0,"one",2.0,true
1,4,"five",6.0,false" > /tmp/dataset.csv

'''

from pyspark.sql import SparkSession

# Get a session object for our current Spark master
session = SparkSession.builder.appName("Example").master("local").getOrCreate()

dataset = session.read.format('csv')\
.option('header', 'true')\
.option('sep', ',')\
.load('/tmp/dataset.csv')

dataset.show()

# we stop the session
session.stop()
+-----+---+----+---+-----+
|label| f1| f2| f3| f4|
+-----+---+----+---+-----+
| 0| 0| one|2.0| true|
| 1| 4|five|6.0|false|
+-----+---+----+---+-----+

3.2 libSVM

LibSVM is a popular format to represent numeric sparse data.

The following dataset:

0 128:51  129:159
1 130:253 131:159 132:50
1 155:48 156:238

Where the first row 0 128:51 129:159 indicates an observation with label 0 and feature 128th and 129th equal to 51 and 159 respectively. We can load this dataset using the SparkSession object as we did for the CSV format.

'''
For this example we need the dataset.libsvm file to be available.

Copy and paste the following lines:

echo "\
0 128:51 129:159
1 130:253 131:159 132:50
1 155:48 156:238" > /tmp/dataset.libsvm

'''

from pyspark.sql import SparkSession

session = SparkSession.builder.appName("Example").master("local").getOrCreate()

dataset = session.read.format('libsvm').option('numFeatures',157).load('/tmp/dataset.libsvm')

dataset.show()

# we stop the session
session.stop()
+-----+--------------------+
|label| features|
+-----+--------------------+
| 0.0|(157,[127,128],[5...|
| 1.0|(157,[129,130,131...|
| 1.0|(157,[154,155],[4...|
+-----+--------------------+

3.3 Images

LMlib can load images in variety of formats (jpeg, png, etc.). It also supports compressed formats. The resulting DataFrame has a column image containing information of the schema.

More details in the docs.

# We download a cat image for later

import urllib3
import tempfile
from IPython.display import Image
import sys

url='https://unsplash.com/photos/ECfPmkOVZPA/download?ixid=M3wxMjA3fDB8MXxhbGx8fHx8fHx8fHwxNzA2MDA3NDAyfA&force=true&w=640'
cat_image = tempfile.gettempdir() + '/kitty.png'

http = urllib3.PoolManager()
r = http.request('GET', url, preload_content=False)

with open(cat_image, 'wb') as f:
while True:
data = r.read()
if not data:
break
f.write(data)
r.release_conn()

Image(filename=cat_image)
from pyspark.sql import SparkSession

session = SparkSession.builder.appName('Example').master('local').getOrCreate()
df = session.read.format('image').option('dropInvalid', True).load(cat_image)

df.select('image.origin', 'image.width', 'image.height', 'image.nChannels', 'image.mode').show(truncate=False)

# The image data is stored in the image.data column, one image per row.
img_data = df.select('image.data').collect()[0]

# Do something with img_data...

session.stop()
+---------------------+-----+------+---------+----+
|origin |width|height|nChannels|mode|
+---------------------+-----+------+---------+----+
|file:///tmp/kitty.png|640 |960 |3 |16 |
+---------------------+-----+------+---------+----+

4. Features

One of the main tasks for any data engineer is data preparation. For two reasons:

  1. Raw data is not ready to be consumed by algorithms
  2. Preprocessing data is required to improve algorithms performance

Incoming data has to be processed in different steps until we reach a successful representation to be consumed by algorithms. MLlib offers a collection of feature-related operations. We can distinguish:

  • Extraction: extract features from raw data
  • Transformation: modifying/converting features
  • Selection: select features based on a certain criteria
  • Locality Sensitive Hashing (LSH): algorithms combining feature transformation with other algorithms

In general feature processing in MLlib follows these steps:

  1. Instantiate the operator indicating the name of the input and output columns and additional params.
  2. Fit the model invoking the .fit(...) method to train a model. Some operators may not require this step if they are not associated with a model.
  3. Transform the input data using the model

No need to mention that these steps, the input params, and the input format vary from operator to operator.

The following sections present succint examples of different operators.

4.1 Normal standardization

Scale a feature to obtain a normal distribution with mean 0 and unit-variance.

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StandardScaler

session = SparkSession.builder.appName('Example').master('local').getOrCreate()
values = [[0, Vectors .dense ([1.0,0.1, -1.0])],
[1, Vectors .dense ([2.0 ,1.1 ,1.0])] ,
[2, Vectors .dense ([3.0 ,10.1 ,3.0])]]
dataset = session.createDataFrame(values, ['id', 'features'])
dataset.show()
+---+--------------+
| id| features|
+---+--------------+
| 0|[1.0,0.1,-1.0]|
| 1| [2.0,1.1,1.0]|
| 2|[3.0,10.1,3.0]|
+---+--------------+
# Fit
scaler = StandardScaler(inputCol = 'features', outputCol='standardized', withMean=True, withStd=True)
scalerModel = scaler.fit(dataset)

# Print some statistics
print("Mean is: %s with sd: %s" % (scalerModel.mean, scalerModel.std))

# Transform
standardized = scalerModel.transform(dataset)
standardized.show()
Mean is: [2.0,3.7666666666666666,1.0] with sd: [1.0,5.507570547286102,2.0]
+---+--------------+--------------------+
| id| features| standardized|
+---+--------------+--------------------+
| 0|[1.0,0.1,-1.0]|[-1.0,-0.66575028...|
| 1| [2.0,1.1,1.0]|[0.0,-0.484182026...|
| 2|[3.0,10.1,3.0]|[1.0,1.1499323120...|
+---+--------------+--------------------+

We can check if the transformed data has the desired distribution using a Summarizer (docs here). For every feature we have mean set to 0 and standard deviation equal to 1.

# Let's see what are the mean and the std now
from pyspark.ml.stat import Summarizer
summarizer = Summarizer.metrics("mean", "std")
standardized.select(summarizer.summary(standardized.standardized)).show(truncate=False)

session.stop()
+---------------------------------------------+
|aggregate_metrics(standardized, 1.0) |
+---------------------------------------------+
|{[0.0,0.0,0.0], [1.0,0.9999999999999999,1.0]}|
+---------------------------------------------+

4.2 Elementwise product

This transformer multiplies each input vector by a provided vector, using element-wise multiplication. This operation scales each column by a given scalar (Hadamard product).

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import ElementwiseProduct

session = SparkSession.builder.appName('Example').master('local').getOrCreate()

a = [[Vectors.dense ([2,3,1])],
[Vectors.dense ([0,8,-2])]]

b = Vectors.dense ([3,1,4])
print('b =',b)

df_a = session.createDataFrame(a, ['features'])
df_a.show()

ewp = ElementwiseProduct(inputCol='features', outputCol='product', scalingVec=b)

a_b = ewp.transform(df_a)
a_b.show()
b = [3.0,1.0,4.0

+--------------+
| features|
+--------------+
| [2.0,3.0,1.0]|
|[0.0,8.0,-2.0]|
+--------------+

+--------------+--------------+
| features| product|
+--------------+--------------+
| [2.0,3.0,1.0]| [6.0,3.0,4.0]|
|[0.0,8.0,-2.0]|[0.0,8.0,-8.0]|
+--------------+--------------+

4.3 Principal Component Analysis

When dealing with many features we can come across the curse of dimensionality.

  • More than three variables are difficult to plot
  • Performance issues for a large number of features
  • Features that only add “noise” to the problem
  • Algorithms may find difficult to converge to a solution

Principal Component Analysis (PCA) is a dimensionality reduction technique that aims to find the components that maximize the variance. These are the steps to follow:

  • Standardize the data
  • Compute eigenvectors and eigenvalues of the covariance matrix
  • Sort eigenvalues and pick the d largest values
  • Construct matrix W using the d corresponding eigenvectors
  • Transform dataset X multiplying it by W

In MLlib there is a PCA transformer that implements all these steps. By applying the PCA we can obtain a reduced version of the original that maintains most of the relevant information brought by the features.

In the example below, we compute the PCA for a dataset of 5 features we wish to convert in a new 3 features dataset.

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import PCA

session = SparkSession.builder.appName("Example").master("local").getOrCreate()

data = [[Vectors.dense ([1.0 , 0.0, 3.0, 0.0, 7.0])],
[Vectors .dense ([2.0 , 0.0, 3.0, 4.0, 5.0])],
[Vectors .dense ([4.0 , 0.0, 0.0, 6.0, 7.0])]]

dataset = session.createDataFrame(data, ['features'])
dataset.show()
+--------------------+
| features|
+--------------------+
|[1.0,0.0,3.0,0.0,...|
|[2.0,0.0,3.0,4.0,...|
|[4.0,0.0,0.0,6.0,...|
+--------------------+
# Fit PCA
pca = PCA(inputCol='features', outputCol='pcaFeatures', k=3)
pcaModel = pca.fit(dataset)

print("The variance for every new feature %s" % pcaModel.explainedVariance)The variance for every new feature [0.84375,0.15625000000000008,4.509331675237028e-17]
The variance for every new feature [0.84375,0.15625000000000008,4.509331675237028e-17
# Transform the original dataset
pcaDataset = pcaModel.transform(dataset)
pcaDataset.show(truncate=False)
+---------------------+---------------------------------------------------------+
|features |pcaFeatures |
+---------------------+---------------------------------------------------------+
|[1.0,0.0,3.0,0.0,7.0]|[0.8164965809277265,3.65148371670111,-2.5144734900027204]|
|[2.0,0.0,3.0,4.0,5.0]|[-2.857738033247042,0.9128709291752779,-2.51447349000272]|
|[4.0,0.0,0.0,6.0,7.0]|[-6.53197264742181,3.6514837167011094,-2.514473490002719]|
+---------------------+---------------------------------------------------------+

Observe that the transformed dataset does no longer correspond to any real observation. Any model predictions generated using this transformed data for training, has to be reconstructed. Otherwise, the output will no make any sense.

4.4 StringIndexer

This is a label indexer that assigns a label to every string in a column. If the value is numeric, first it is casted to string and then indexed.

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer

session = SparkSession.builder.appName('Example').master('local').getOrCreate()
data = session.createDataFrame([['blue'], ['red'],
['red'], ['white'],
['yellow'], ['red']], ['feature'])
data.show()
si = StringIndexer(inputCol='feature', outputCol='index', )
model = si.fit(data)
print('Found labels: ', model.labels)
model.transform(data).show()
session.stop()
+-------+
|feature|
+-------+
| blue|
| red|
| red|
| white|
| yellow|
| red|
+-------+

Found labels: ['red', 'blue', 'white', 'yellow']
+-------+-----+
|feature|index|
+-------+-----+
| blue| 1.0|
| red| 0.0|
| red| 0.0|
| white| 2.0|
| yellow| 3.0|
| red| 0.0|
+-------+-----+

4.4 One hot encoder

This encoder maps a column of indices into a single binary vector. If we have 4 labels, for index 3 we will have [0,0,0,1,0]. The output is a SparseVector.

Observe that the param dropLast is True by default ignoring the label with index n-1.

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer, OneHotEncoder

session = SparkSession.builder.appName('Example').master('local').getOrCreate()

data = session.createDataFrame([['blue'], ['red'],
['red'], ['white'],
['yellow'], ['red']], ['feature'])
data.show()

# First we need an input column with indices instead of strings.
si = StringIndexer(inputCol='feature', outputCol='indexed')
si_model = si.fit(data)
indexed = si_model.transform(data)
indexed.show()

# If we let dropLast=True, the index for yellow will be dropped
ohe = OneHotEncoder(inputCol='indexed', outputCol='encoded', dropLast=False)
ohe_model = ohe.fit(indexed)
ohe_transformed = ohe_model.transform(indexed)

'''
You can check how setting dropLast=True, the 4th row will be

+-------+-------+-------------+
|feature|indexed| encoded|
+-------+-------+-------------+
| yellow| 3.0|(3,[],[]) |
+-------+-------+-------------+
'''
ohe_transformed.show()
+-------+
|feature|
+-------+
| blue|
| red|
| red|
| white|
| yellow|
| red|
+-------+

+-------+-------+
|feature|indexed|
+-------+-------+
| blue| 1.0|
| red| 0.0|
| red| 0.0|
| white| 2.0|
| yellow| 3.0|
| red| 0.0|
+-------+-------+

+-------+-------+-------------+
|feature|indexed| encoded|
+-------+-------+-------------+
| blue| 1.0|(4,[1],[1.0])|
| red| 0.0|(4,[0],[1.0])|
| red| 0.0|(4,[0],[1.0])|
| white| 2.0|(4,[2],[1.0])|
| yellow| 3.0|(4,[3],[1.0])|
| red| 0.0|(4,[0],[1.0])|
+-------+-------+-------------+

4.5 Tokenization

Tokenization is the process of splitting a document into a vector of differentiated tokens. The sentence "The quick brown fox jumps over the lazy dog" will be split into tokens like in ["The", "quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog"]. Different approaches may split the document using white spaces, commas, regular expressions, or any other character.

In MLlib there is a Tokenizer transformer for this purpose.

from pyspark .sql import SparkSession
from pyspark .ml. feature import Tokenizer

session = SparkSession.builder.appName("Example").master("local").getOrCreate()
sentenceDataFrame = session.createDataFrame ([(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic, regression, models, are, neat")], ['id', 'sentence'])

tokenizer = Tokenizer(inputCol='sentence', outputCol='words')
tokenizer.transform(sentenceDataFrame).show(truncate=False)
+---+---------------------------------------+---------------------------------------------+
|id |sentence |words |
+---+---------------------------------------+---------------------------------------------+
|0 |Hi I heard about Spark |[hi, i, heard, about, spark] |
|1 |I wish Java could use case classes |[i, wish, java, could, use, case, classes] |
|2 |Logistic, regression, models, are, neat|[logistic,, regression,, models,, are,, neat]|
+---+---------------------------------------+---------------------------------------------+

4.6 Stop words

Natural language is redundant and not every term provides the same amount of information. By stop words we refer to the most common words in a given language. These words are so common that result into non-relevant chunks of information. These words are removed previously to any analysis. There is no a single list of stop words and this changes with every language.

MLlib implements the StopWordsRemover that filters out stop words using a dictionary.

from pyspark .sql import SparkSession
from pyspark .ml. feature import StopWordsRemover

session = SparkSession.builder.appName("Example").master("local").getOrCreate()

text = session.createDataFrame ([(0, ["I", "saw", "the", "red", " balloon "]),
(1, ["Mary", "had", "a", " little ", "lamb"])
], ["id", "raw"])
text.show(truncate=False)

remover = StopWordsRemover(inputCol='raw', outputCol='filtered')
remover.transform(text).show(truncate=False)
+---+------------------------------+
|id |raw |
+---+------------------------------+
|0 |[I, saw, the, red, balloon ] |
|1 |[Mary, had, a, little , lamb]|
+---+------------------------------+

+---+------------------------------+----------------------+
|id |raw |filtered |
+---+------------------------------+----------------------+
|0 |[I, saw, the, red, balloon ] |[saw, red, balloon ] |
|1 |[Mary, had, a, little , lamb]|[Mary, little , lamb]|
+---+------------------------------+----------------------+

4.7 Count Vectorizer

This estimator counts the number of occurrences of items in a vocabulary represented in a sparse vector. This is particularly useful to represent a document in terms of the frequency of its elements and it is normally used in probabilistic models.

from pyspark .sql import SparkSession
from pyspark.ml.feature import CountVectorizer

session = SparkSession.builder.appName("Example").master("local").getOrCreate()

text = session.createDataFrame ([(0,'yellow red blue'.split()),
(1, 'red'.split()),
(2, 'blue white blue'.split()),
], ["id", "raw"])

text.show()

cv = CountVectorizer(inputCol='raw', outputCol='frequencies')
cv_model = cv.fit(text)
print('The vocabulary: ',cv_model.vocabulary)
frequencies = cv_model.transform(text)
frequencies.show(truncate=False)
+---+-------------------+
| id| raw|
+---+-------------------+
| 0|[yellow, red, blue]|
| 1| [red]|
| 2|[blue, white, blue]|
+---+-------------------+
The vocabulary: ['blue', 'red', 'white', 'yellow']
+---+-------------------+-------------------------+
|id |raw |frequencies |
+---+-------------------+-------------------------+
|0 |[yellow, red, blue]|(4,[0,1,3],[1.0,1.0,1.0])|
|1 |[red] |(4,[1],[1.0]) |
|2 |[blue, white, blue]|(4,[0,2],[2.0,1.0]) |
+---+-------------------+-------------------------+

4.8 N-grams

N-grams are a common input for many algorithms to understand the probability of n words to occur together. The NGram transformer outputs a collection of these N-grams.

from pyspark .sql import SparkSession
from pyspark.ml.feature import IDF, Tokenizer, NGram

session = SparkSession.builder.appName("Example").master("local").getOrCreate()


text = session.createDataFrame ([(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes "),
(0, "Logistic regression models are neat")
], ["label", "sentence"])

# First we tokenize our dataset
tokenizer = Tokenizer(inputCol ='sentence', outputCol ='words')
words = tokenizer.transform(text)
words.show(truncate=False)

# Compute 2-grams
ngram = NGram(inputCol='words', outputCol='ngrams', n=2)
ngrams = ngram.transform(words)
ngrams.show(truncate=False)
+-----+-----------------------------------+------------------------------------------+
|label|sentence |words |
+-----+-----------------------------------+------------------------------------------+
|0 |Hi I heard about Spark |[hi, i, heard, about, spark] |
|0 |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|
|0 |Logistic regression models are neat|[logistic, regression, models, are, neat] |
+-----+-----------------------------------+------------------------------------------+

+-----+-----------------------------------+------------------------------------------+------------------------------------------------------------------+
|label|sentence |words |ngrams |
+-----+-----------------------------------+------------------------------------------+------------------------------------------------------------------+
|0 |Hi I heard about Spark |[hi, i, heard, about, spark] |[hi i, i heard, heard about, about spark] |
|0 |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|[i wish, wish java, java could, could use, use case, case classes]|
|0 |Logistic regression models are neat|[logistic, regression, models, are, neat] |[logistic regression, regression models, models are, are neat] |
+-----+-----------------------------------+------------------------------------------+------------------------------------------------------------------+

4.9 Word2Vec

The Word2Vec represents the words of a document in a vector. This makes possible to operate with documents as vectors which makes possible to easily computes distances and enables other algorithms specially in NLP. Take a look at the original Google code here.

from pyspark.sql import SparkSession
from pyspark.ml.feature import Word2Vec

session = SparkSession.builder.appName('Example').master('local').getOrCreate()

corpus = session.createDataFrame([
('Spark is quite useful'.split(),),
('I can use Spark with Python'.split(),),
('Spark is not so difficult after all'.split(),),
], ['words'])

corpus.show(truncate=False)

w2v = Word2Vec(inputCol='words', outputCol='result', vectorSize=3, minCount=0)
w2v_model = w2v.fit(corpus)
vectors = w2v_model.transform(corpus)
vectors.show(truncate=False)
+-------------------------------------------+
|words |
+-------------------------------------------+
|[Spark, is, quite, useful] |
|[I, can, use, Spark, with, Python] |
|[Spark, is, not, so, difficult, after, all]|
+-------------------------------------------+

+-------------------------------------------+------------------------------------------------------------------+
|words |result |
+-------------------------------------------+------------------------------------------------------------------+
|[Spark, is, quite, useful] |[0.08182145655155182,-0.07318692095577717,-0.0631803400174249] |
|[I, can, use, Spark, with, Python] |[0.016474373017748196,-1.7273581276337305E-4,-0.04478610997709135]|
|[Spark, is, not, so, difficult, after, all]|[0.019738022836723497,0.029656097292900085,-0.033315843919159045] |
+-------------------------------------------+------------------------------------------------------------------+

5. Pipelines

Most models are computed as a concatenation of operations, each operation transforming the original dataset. For example, normalization -> component analysis -> regression. MLlib uses the concept of pipelines (similinar to the one used in SciKit) to unify the execution of a sequence of steps into a single object.

The pipeline is defined as a sequence of stages connecting transformers and estimators:

  • Transformer: receives an input dataframe and returns a transformed version (standardizers)
  • Estimator: receives an input dataframe and after fitting returns a transformer (linear regression, logistic regression, etc.)

Creating a pipeline is equivalent to set the sequence of stages to be executed.

from pyspark.ml.pipeline import Pipeline
pipeline = Pipeline(stages=[standardizer, pca, lr])

Then we fit the model and transform the dataset to get the corresponding results:

model = pipeline.fit(dataset)
model.transform(dataset).show()
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.pipeline import Pipeline

session = SparkSession.builder.appName('Example').master('local').getOrCreate()

corpus = session.createDataFrame([
('Spark is quite useful',),
('I can use Spark with Python',),
('Spark is not so difficult after all',),
], ['docs'])

corpus.show(truncate=False)

tokenizer = Tokenizer(inputCol='docs', outputCol='tokens')
stop_remover = StopWordsRemover(inputCol='tokens', outputCol='filtered')
cv = CountVectorizer(inputCol='filtered', outputCol='frequencies')

pipeline = Pipeline(stages=[tokenizer, stop_remover, cv])

fitted = pipeline.fit(corpus)
result = fitted.transform(corpus)
result.show(truncate=False)

for m in fitted.stages:
print('-->',m.uid)
print(m.params)
+-----------------------------------+
|docs |
+-----------------------------------+
|Spark is quite useful |
|I can use Spark with Python |
|Spark is not so difficult after all|
+-----------------------------------++-----------------------------------+-------------------------------------------+----------------------+-------------------------+
|docs |tokens |filtered |frequencies |
+-----------------------------------+-------------------------------------------+----------------------+-------------------------+
|Spark is quite useful |[spark, is, quite, useful] |[spark, quite, useful]|(6,[0,2,3],[1.0,1.0,1.0])|
|I can use Spark with Python |[i, can, use, spark, with, python] |[use, spark, python] |(6,[0,1,5],[1.0,1.0,1.0])|
|Spark is not so difficult after all|[spark, is, not, so, difficult, after, all]|[spark, difficult] |(6,[0,4],[1.0,1.0]) |
+-----------------------------------+-------------------------------------------+----------------------+-------------------------+--> Tokenizer_346e29794e54
[Param(parent='Tokenizer_346e29794e54', name='inputCol', doc='input column name.'), Param(parent='Tokenizer_346e29794e54', name='outputCol', doc='output column name.')]
--> StopWordsRemover_cdda6836267e
[Param(parent='StopWordsRemover_cdda6836267e', name='caseSensitive', doc='whether to do a case sensitive comparison over the stop words'), Param(parent='StopWordsRemover_cdda6836267e', name='inputCol', doc='input column name.'), Param(parent='StopWordsRemover_cdda6836267e', name='inputCols', doc='input column names.'), Param(parent='StopWordsRemover_cdda6836267e', name='locale', doc='locale of the input. ignored when case sensitive is true'), Param(parent='StopWordsRemover_cdda6836267e', name='outputCol', doc='output column name.'), Param(parent='StopWordsRemover_cdda6836267e', name='outputCols', doc='output column names.'), Param(parent='StopWordsRemover_cdda6836267e', name='stopWords', doc='The words to be filtered out')]
--> CountVectorizer_983c07eb2c8f
[Param(parent='CountVectorizer_983c07eb2c8f', name='binary', doc='Binary toggle to control the output vector values. If True, all nonzero counts (after minTF filter applied) are set to 1. This is useful for discrete probabilistic models that model binary events rather than integer counts. Default False'), Param(parent='CountVectorizer_983c07eb2c8f', name='inputCol', doc='input column name.'), Param(parent='CountVectorizer_983c07eb2c8f', name='maxDF', doc='Specifies the maximum number of different documents a term could appear in to be included in the vocabulary. A term that appears more than the threshold will be ignored. If this is an integer >= 1, this specifies the maximum number of documents the term could appear in; if this is a double in [0,1), then this specifies the maximum fraction of documents the term could appear in. Default (2^63) - 1'), Param(parent='CountVectorizer_983c07eb2c8f', name='minDF', doc='Specifies the minimum number of different documents a term must appear in to be included in the vocabulary. If this is an integer >= 1, this specifies the number of documents the term must appear in; if this is a double in [0,1), then this specifies the fraction of documents. Default 1.0'), Param(parent='CountVectorizer_983c07eb2c8f', name='minTF', doc="Filter to ignore rare words in a document. For each document, terms with frequency/count less than the given threshold are ignored. If this is an integer >= 1, then this specifies a count (of times the term must appear in the document); if this is a double in [0,1), then this specifies a fraction (out of the document's token count). Note that the parameter is only used in transform of CountVectorizerModel and does not affect fitting. Default 1.0"), Param(parent='CountVectorizer_983c07eb2c8f', name='outputCol', doc='output column name.'), Param(parent='CountVectorizer_983c07eb2c8f', name='vocabSize', doc='max size of the vocabulary. Default 1 << 18.')]

The second part will show how to use common ML algorithms.

The original Jupyter notebook is available in this repo.

If you are interested in this content, you can check my blog.

Thanks for reading.

--

--

Juan M. Tirado

I help people to create data-based solutions | jmtirado.net | I wrote Build Systems with Go: amazon.com/dp/B091FX4CZX |