Scale ML Using PySpark (Part 1)
A collection of examples of how to use MLlib with PySpark for those interesting in running large ML problems.
This is the first part of a collection of examples of how to use the MLlib Spark library with Python. The content in this post is a conversion of this Jupyter notebook.
1. Introduction
This notebook is a collection of examples that illustrate how to use PySpark with MLlib. This whole collection of examples is intended to be a gentle introduction to those interested in the topic, want to have additional examples, or simply are curious about how to start with this library.
The following skills are expected to follow these examples:
- Python
- Understanding Spark structures (Dataframes, RDD)
- Basic ML notions. This is not intended to be a ML course although, you can find some theoretical explanations.
The examples are designed to work with a simple local environment using the MLlib Dataframe. The MLlib RDD-based API is now in maintenance (see here). This is why you will see that the main import statement comes from pyspark.ml
not pyspark.mllib
.
You will need a spark environment to be available in your local path. Refer here to the official guide for more details.
You will need java to be available on your local path. Check it out running java --version
. If nothing is displayed, check out how to install Java on your machine (here). Next, you can easily set up a local environment and install the dependencies used in this notebook.
virtualenv env
source env/bin/activate
pip install urllib3 numpy matplotlib pyspark
Wait until the dependencies are already satisfied. If you are not reading these lines from a Jupyter Notebook :) install and run it.
pip install jupyter
jupyter-notebook
2. Data types
Spark Dataframes support a collection of rows containing elements with different data types. In the context of ML algorithms, data types such as boolean
, string
or even integer
are not the expected input for most ML algorithms. In this sense, MLlib supports data types such as vectors or matrices.
2.1 Dense Vectors
A dense vector is an array. PySpark uses numpy
to run algebraical operations.
from pyspark.ml.linalg import DenseVector
a = DenseVector([0,1,2,3,4])
b = DenseVector([10,10,10,10,10])
print('Sum: ', a + b)
print('Difference: ', a - b)
print('Multiplication: ', a * 2)
print('Division: ', b / 2)
print('Non-zeros: ', a.nonzero())
print('Squared distance: ', a.squared_distance(b))
Sum: [10.0,11.0,12.0,13.0,14.0]
Difference: [-10.0,-9.0,-8.0,-7.0,-6.0]
Multiplication: [0.0,2.0,4.0,6.0,8.0]
Division: [5.0,5.0,5.0,5.0,5.0]
Non-zeros: (array([1, 2, 3, 4]),)
Squared distance: 330.0
2.2 Sparse Vectors
Sparse vectors are designed to represent those vectors where a large number of elements is expected to be zero. These vectors are defined by specifying which positions of the array are different from zero and the assigned values. In the following vector:
SparseVector (5 ,[0 ,2 ,4] ,[1 ,3 ,5])
we have five elements with entries 0, 2, and 4 take values 1, 3, and 5.
from pyspark.ml.linalg import SparseVector
sparse_vector = SparseVector (5 ,[0 ,2 ,4] ,[1 ,3 ,5])
print('Sparse vector: ', sparse_vector.toArray ())
print('Indices: ', sparse_vector.indices )
print('Non zeros: ', sparse_vector.numNonzeros ())
Sparse vector: [1. 0. 3. 0. 5.]
Indices: [0 2 4]
Non zeros: 3
3. Input/Output
We can expect datasets to be available from different storage sources:
- Hard disks
- HDFS
- Databases
- Others
The SparkSession
object facilitates the load of data from these sources under different formats (CSV, JSON, text, parquet, databases, etc.). We will show examples for CSV, libSVM, and images.
3.1 CSV
Let’s assume the following dataset in a CSV format:
csv
label,f1,f2,f3,f4
0,0,"one",2.0,true
1,4,"five",6.0,false
We instantiate a SparkSession
object and load the dataset indicating that we have a header and the separation character.
'''
For this example we need the dataset.csv file to be available.
Copy and paste the following lines:
echo "\
label,f1,f2,f3,f4
0,0,"one",2.0,true
1,4,"five",6.0,false" > /tmp/dataset.csv
'''
from pyspark.sql import SparkSession
# Get a session object for our current Spark master
session = SparkSession.builder.appName("Example").master("local").getOrCreate()
dataset = session.read.format('csv')\
.option('header', 'true')\
.option('sep', ',')\
.load('/tmp/dataset.csv')
dataset.show()
# we stop the session
session.stop()
+-----+---+----+---+-----+
|label| f1| f2| f3| f4|
+-----+---+----+---+-----+
| 0| 0| one|2.0| true|
| 1| 4|five|6.0|false|
+-----+---+----+---+-----+
3.2 libSVM
LibSVM is a popular format to represent numeric sparse data.
The following dataset:
0 128:51 129:159
1 130:253 131:159 132:50
1 155:48 156:238
Where the first row 0 128:51 129:159
indicates an observation with label 0 and feature 128th and 129th equal to 51 and 159 respectively. We can load this dataset using the SparkSession
object as we did for the CSV
format.
'''
For this example we need the dataset.libsvm file to be available.
Copy and paste the following lines:
echo "\
0 128:51 129:159
1 130:253 131:159 132:50
1 155:48 156:238" > /tmp/dataset.libsvm
'''
from pyspark.sql import SparkSession
session = SparkSession.builder.appName("Example").master("local").getOrCreate()
dataset = session.read.format('libsvm').option('numFeatures',157).load('/tmp/dataset.libsvm')
dataset.show()
# we stop the session
session.stop()
+-----+--------------------+
|label| features|
+-----+--------------------+
| 0.0|(157,[127,128],[5...|
| 1.0|(157,[129,130,131...|
| 1.0|(157,[154,155],[4...|
+-----+--------------------+
3.3 Images
LMlib can load images in variety of formats (jpeg, png, etc.). It also supports compressed formats. The resulting DataFrame has a column image
containing information of the schema.
More details in the docs.
# We download a cat image for later
import urllib3
import tempfile
from IPython.display import Image
import sys
url='https://unsplash.com/photos/ECfPmkOVZPA/download?ixid=M3wxMjA3fDB8MXxhbGx8fHx8fHx8fHwxNzA2MDA3NDAyfA&force=true&w=640'
cat_image = tempfile.gettempdir() + '/kitty.png'
http = urllib3.PoolManager()
r = http.request('GET', url, preload_content=False)
with open(cat_image, 'wb') as f:
while True:
data = r.read()
if not data:
break
f.write(data)
r.release_conn()
Image(filename=cat_image)
from pyspark.sql import SparkSession
session = SparkSession.builder.appName('Example').master('local').getOrCreate()
df = session.read.format('image').option('dropInvalid', True).load(cat_image)
df.select('image.origin', 'image.width', 'image.height', 'image.nChannels', 'image.mode').show(truncate=False)
# The image data is stored in the image.data column, one image per row.
img_data = df.select('image.data').collect()[0]
# Do something with img_data...
session.stop()
+---------------------+-----+------+---------+----+
|origin |width|height|nChannels|mode|
+---------------------+-----+------+---------+----+
|file:///tmp/kitty.png|640 |960 |3 |16 |
+---------------------+-----+------+---------+----+
4. Features
One of the main tasks for any data engineer is data preparation. For two reasons:
- Raw data is not ready to be consumed by algorithms
- Preprocessing data is required to improve algorithms performance
Incoming data has to be processed in different steps until we reach a successful representation to be consumed by algorithms. MLlib offers a collection of feature-related operations. We can distinguish:
- Extraction: extract features from raw data
- Transformation: modifying/converting features
- Selection: select features based on a certain criteria
- Locality Sensitive Hashing (LSH): algorithms combining feature transformation with other algorithms
In general feature processing in MLlib follows these steps:
- Instantiate the operator indicating the name of the input and output columns and additional params.
- Fit the model invoking the
.fit(...)
method to train a model. Some operators may not require this step if they are not associated with a model. - Transform the input data using the model
No need to mention that these steps, the input params, and the input format vary from operator to operator.
The following sections present succint examples of different operators.
4.1 Normal standardization
Scale a feature to obtain a normal distribution with mean 0 and unit-variance.
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StandardScaler
session = SparkSession.builder.appName('Example').master('local').getOrCreate()
values = [[0, Vectors .dense ([1.0,0.1, -1.0])],
[1, Vectors .dense ([2.0 ,1.1 ,1.0])] ,
[2, Vectors .dense ([3.0 ,10.1 ,3.0])]]
dataset = session.createDataFrame(values, ['id', 'features'])
dataset.show()
+---+--------------+
| id| features|
+---+--------------+
| 0|[1.0,0.1,-1.0]|
| 1| [2.0,1.1,1.0]|
| 2|[3.0,10.1,3.0]|
+---+--------------+
# Fit
scaler = StandardScaler(inputCol = 'features', outputCol='standardized', withMean=True, withStd=True)
scalerModel = scaler.fit(dataset)
# Print some statistics
print("Mean is: %s with sd: %s" % (scalerModel.mean, scalerModel.std))
# Transform
standardized = scalerModel.transform(dataset)
standardized.show()
Mean is: [2.0,3.7666666666666666,1.0] with sd: [1.0,5.507570547286102,2.0]
+---+--------------+--------------------+
| id| features| standardized|
+---+--------------+--------------------+
| 0|[1.0,0.1,-1.0]|[-1.0,-0.66575028...|
| 1| [2.0,1.1,1.0]|[0.0,-0.484182026...|
| 2|[3.0,10.1,3.0]|[1.0,1.1499323120...|
+---+--------------+--------------------+
We can check if the transformed data has the desired distribution using a Summarizer
(docs here). For every feature we have mean set to 0 and standard deviation equal to 1.
# Let's see what are the mean and the std now
from pyspark.ml.stat import Summarizer
summarizer = Summarizer.metrics("mean", "std")
standardized.select(summarizer.summary(standardized.standardized)).show(truncate=False)
session.stop()
+---------------------------------------------+
|aggregate_metrics(standardized, 1.0) |
+---------------------------------------------+
|{[0.0,0.0,0.0], [1.0,0.9999999999999999,1.0]}|
+---------------------------------------------+
4.2 Elementwise product
This transformer multiplies each input vector by a provided vector, using element-wise multiplication. This operation scales each column by a given scalar (Hadamard product).
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import ElementwiseProduct
session = SparkSession.builder.appName('Example').master('local').getOrCreate()
a = [[Vectors.dense ([2,3,1])],
[Vectors.dense ([0,8,-2])]]
b = Vectors.dense ([3,1,4])
print('b =',b)
df_a = session.createDataFrame(a, ['features'])
df_a.show()
ewp = ElementwiseProduct(inputCol='features', outputCol='product', scalingVec=b)
a_b = ewp.transform(df_a)
a_b.show()
b = [3.0,1.0,4.0
+--------------+
| features|
+--------------+
| [2.0,3.0,1.0]|
|[0.0,8.0,-2.0]|
+--------------+
+--------------+--------------+
| features| product|
+--------------+--------------+
| [2.0,3.0,1.0]| [6.0,3.0,4.0]|
|[0.0,8.0,-2.0]|[0.0,8.0,-8.0]|
+--------------+--------------+
4.3 Principal Component Analysis
When dealing with many features we can come across the curse of dimensionality.
- More than three variables are difficult to plot
- Performance issues for a large number of features
- Features that only add “noise” to the problem
- Algorithms may find difficult to converge to a solution
Principal Component Analysis (PCA) is a dimensionality reduction technique that aims to find the components that maximize the variance. These are the steps to follow:
- Standardize the data
- Compute eigenvectors and eigenvalues of the covariance matrix
- Sort eigenvalues and pick the d largest values
- Construct matrix W using the d corresponding eigenvectors
- Transform dataset X multiplying it by W
In MLlib there is a PCA transformer that implements all these steps. By applying the PCA we can obtain a reduced version of the original that maintains most of the relevant information brought by the features.
In the example below, we compute the PCA for a dataset of 5 features we wish to convert in a new 3 features dataset.
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import PCA
session = SparkSession.builder.appName("Example").master("local").getOrCreate()
data = [[Vectors.dense ([1.0 , 0.0, 3.0, 0.0, 7.0])],
[Vectors .dense ([2.0 , 0.0, 3.0, 4.0, 5.0])],
[Vectors .dense ([4.0 , 0.0, 0.0, 6.0, 7.0])]]
dataset = session.createDataFrame(data, ['features'])
dataset.show()
+--------------------+
| features|
+--------------------+
|[1.0,0.0,3.0,0.0,...|
|[2.0,0.0,3.0,4.0,...|
|[4.0,0.0,0.0,6.0,...|
+--------------------+
# Fit PCA
pca = PCA(inputCol='features', outputCol='pcaFeatures', k=3)
pcaModel = pca.fit(dataset)
print("The variance for every new feature %s" % pcaModel.explainedVariance)The variance for every new feature [0.84375,0.15625000000000008,4.509331675237028e-17]
The variance for every new feature [0.84375,0.15625000000000008,4.509331675237028e-17
# Transform the original dataset
pcaDataset = pcaModel.transform(dataset)
pcaDataset.show(truncate=False)
+---------------------+---------------------------------------------------------+
|features |pcaFeatures |
+---------------------+---------------------------------------------------------+
|[1.0,0.0,3.0,0.0,7.0]|[0.8164965809277265,3.65148371670111,-2.5144734900027204]|
|[2.0,0.0,3.0,4.0,5.0]|[-2.857738033247042,0.9128709291752779,-2.51447349000272]|
|[4.0,0.0,0.0,6.0,7.0]|[-6.53197264742181,3.6514837167011094,-2.514473490002719]|
+---------------------+---------------------------------------------------------+
Observe that the transformed dataset does no longer correspond to any real observation. Any model predictions generated using this transformed data for training, has to be reconstructed. Otherwise, the output will no make any sense.
4.4 StringIndexer
This is a label indexer that assigns a label to every string in a column. If the value is numeric, first it is casted to string and then indexed.
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
session = SparkSession.builder.appName('Example').master('local').getOrCreate()
data = session.createDataFrame([['blue'], ['red'],
['red'], ['white'],
['yellow'], ['red']], ['feature'])
data.show()
si = StringIndexer(inputCol='feature', outputCol='index', )
model = si.fit(data)
print('Found labels: ', model.labels)
model.transform(data).show()
session.stop()
+-------+
|feature|
+-------+
| blue|
| red|
| red|
| white|
| yellow|
| red|
+-------+
Found labels: ['red', 'blue', 'white', 'yellow']
+-------+-----+
|feature|index|
+-------+-----+
| blue| 1.0|
| red| 0.0|
| red| 0.0|
| white| 2.0|
| yellow| 3.0|
| red| 0.0|
+-------+-----+
4.4 One hot encoder
This encoder maps a column of indices into a single binary vector. If we have 4 labels, for index 3 we will have [0,0,0,1,0]. The output is a SparseVector
.
Observe that the param dropLast
is True
by default ignoring the label with index n-1.
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer, OneHotEncoder
session = SparkSession.builder.appName('Example').master('local').getOrCreate()
data = session.createDataFrame([['blue'], ['red'],
['red'], ['white'],
['yellow'], ['red']], ['feature'])
data.show()
# First we need an input column with indices instead of strings.
si = StringIndexer(inputCol='feature', outputCol='indexed')
si_model = si.fit(data)
indexed = si_model.transform(data)
indexed.show()
# If we let dropLast=True, the index for yellow will be dropped
ohe = OneHotEncoder(inputCol='indexed', outputCol='encoded', dropLast=False)
ohe_model = ohe.fit(indexed)
ohe_transformed = ohe_model.transform(indexed)
'''
You can check how setting dropLast=True, the 4th row will be
+-------+-------+-------------+
|feature|indexed| encoded|
+-------+-------+-------------+
| yellow| 3.0|(3,[],[]) |
+-------+-------+-------------+
'''
ohe_transformed.show()
+-------+
|feature|
+-------+
| blue|
| red|
| red|
| white|
| yellow|
| red|
+-------+
+-------+-------+
|feature|indexed|
+-------+-------+
| blue| 1.0|
| red| 0.0|
| red| 0.0|
| white| 2.0|
| yellow| 3.0|
| red| 0.0|
+-------+-------+
+-------+-------+-------------+
|feature|indexed| encoded|
+-------+-------+-------------+
| blue| 1.0|(4,[1],[1.0])|
| red| 0.0|(4,[0],[1.0])|
| red| 0.0|(4,[0],[1.0])|
| white| 2.0|(4,[2],[1.0])|
| yellow| 3.0|(4,[3],[1.0])|
| red| 0.0|(4,[0],[1.0])|
+-------+-------+-------------+
4.5 Tokenization
Tokenization is the process of splitting a document into a vector of differentiated tokens. The sentence "The quick brown fox jumps over the lazy dog"
will be split into tokens like in ["The", "quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog"]
. Different approaches may split the document using white spaces, commas, regular expressions, or any other character.
In MLlib there is a Tokenizer transformer for this purpose.
from pyspark .sql import SparkSession
from pyspark .ml. feature import Tokenizer
session = SparkSession.builder.appName("Example").master("local").getOrCreate()
sentenceDataFrame = session.createDataFrame ([(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic, regression, models, are, neat")], ['id', 'sentence'])
tokenizer = Tokenizer(inputCol='sentence', outputCol='words')
tokenizer.transform(sentenceDataFrame).show(truncate=False)
+---+---------------------------------------+---------------------------------------------+
|id |sentence |words |
+---+---------------------------------------+---------------------------------------------+
|0 |Hi I heard about Spark |[hi, i, heard, about, spark] |
|1 |I wish Java could use case classes |[i, wish, java, could, use, case, classes] |
|2 |Logistic, regression, models, are, neat|[logistic,, regression,, models,, are,, neat]|
+---+---------------------------------------+---------------------------------------------+
4.6 Stop words
Natural language is redundant and not every term provides the same amount of information. By stop words we refer to the most common words in a given language. These words are so common that result into non-relevant chunks of information. These words are removed previously to any analysis. There is no a single list of stop words and this changes with every language.
MLlib implements the StopWordsRemover
that filters out stop words using a dictionary.
from pyspark .sql import SparkSession
from pyspark .ml. feature import StopWordsRemover
session = SparkSession.builder.appName("Example").master("local").getOrCreate()
text = session.createDataFrame ([(0, ["I", "saw", "the", "red", " balloon "]),
(1, ["Mary", "had", "a", " little ", "lamb"])
], ["id", "raw"])
text.show(truncate=False)
remover = StopWordsRemover(inputCol='raw', outputCol='filtered')
remover.transform(text).show(truncate=False)
+---+------------------------------+
|id |raw |
+---+------------------------------+
|0 |[I, saw, the, red, balloon ] |
|1 |[Mary, had, a, little , lamb]|
+---+------------------------------+
+---+------------------------------+----------------------+
|id |raw |filtered |
+---+------------------------------+----------------------+
|0 |[I, saw, the, red, balloon ] |[saw, red, balloon ] |
|1 |[Mary, had, a, little , lamb]|[Mary, little , lamb]|
+---+------------------------------+----------------------+
4.7 Count Vectorizer
This estimator counts the number of occurrences of items in a vocabulary represented in a sparse vector. This is particularly useful to represent a document in terms of the frequency of its elements and it is normally used in probabilistic models.
from pyspark .sql import SparkSession
from pyspark.ml.feature import CountVectorizer
session = SparkSession.builder.appName("Example").master("local").getOrCreate()
text = session.createDataFrame ([(0,'yellow red blue'.split()),
(1, 'red'.split()),
(2, 'blue white blue'.split()),
], ["id", "raw"])
text.show()
cv = CountVectorizer(inputCol='raw', outputCol='frequencies')
cv_model = cv.fit(text)
print('The vocabulary: ',cv_model.vocabulary)
frequencies = cv_model.transform(text)
frequencies.show(truncate=False)
+---+-------------------+
| id| raw|
+---+-------------------+
| 0|[yellow, red, blue]|
| 1| [red]|
| 2|[blue, white, blue]|
+---+-------------------+
The vocabulary: ['blue', 'red', 'white', 'yellow']
+---+-------------------+-------------------------+
|id |raw |frequencies |
+---+-------------------+-------------------------+
|0 |[yellow, red, blue]|(4,[0,1,3],[1.0,1.0,1.0])|
|1 |[red] |(4,[1],[1.0]) |
|2 |[blue, white, blue]|(4,[0,2],[2.0,1.0]) |
+---+-------------------+-------------------------+
4.8 N-grams
N-grams are a common input for many algorithms to understand the probability of n words to occur together. The NGram
transformer outputs a collection of these N-grams.
from pyspark .sql import SparkSession
from pyspark.ml.feature import IDF, Tokenizer, NGram
session = SparkSession.builder.appName("Example").master("local").getOrCreate()
text = session.createDataFrame ([(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes "),
(0, "Logistic regression models are neat")
], ["label", "sentence"])
# First we tokenize our dataset
tokenizer = Tokenizer(inputCol ='sentence', outputCol ='words')
words = tokenizer.transform(text)
words.show(truncate=False)
# Compute 2-grams
ngram = NGram(inputCol='words', outputCol='ngrams', n=2)
ngrams = ngram.transform(words)
ngrams.show(truncate=False)
+-----+-----------------------------------+------------------------------------------+
|label|sentence |words |
+-----+-----------------------------------+------------------------------------------+
|0 |Hi I heard about Spark |[hi, i, heard, about, spark] |
|0 |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|
|0 |Logistic regression models are neat|[logistic, regression, models, are, neat] |
+-----+-----------------------------------+------------------------------------------+
+-----+-----------------------------------+------------------------------------------+------------------------------------------------------------------+
|label|sentence |words |ngrams |
+-----+-----------------------------------+------------------------------------------+------------------------------------------------------------------+
|0 |Hi I heard about Spark |[hi, i, heard, about, spark] |[hi i, i heard, heard about, about spark] |
|0 |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|[i wish, wish java, java could, could use, use case, case classes]|
|0 |Logistic regression models are neat|[logistic, regression, models, are, neat] |[logistic regression, regression models, models are, are neat] |
+-----+-----------------------------------+------------------------------------------+------------------------------------------------------------------+
4.9 Word2Vec
The Word2Vec represents the words of a document in a vector. This makes possible to operate with documents as vectors which makes possible to easily computes distances and enables other algorithms specially in NLP. Take a look at the original Google code here.
from pyspark.sql import SparkSession
from pyspark.ml.feature import Word2Vec
session = SparkSession.builder.appName('Example').master('local').getOrCreate()
corpus = session.createDataFrame([
('Spark is quite useful'.split(),),
('I can use Spark with Python'.split(),),
('Spark is not so difficult after all'.split(),),
], ['words'])
corpus.show(truncate=False)
w2v = Word2Vec(inputCol='words', outputCol='result', vectorSize=3, minCount=0)
w2v_model = w2v.fit(corpus)
vectors = w2v_model.transform(corpus)
vectors.show(truncate=False)
+-------------------------------------------+
|words |
+-------------------------------------------+
|[Spark, is, quite, useful] |
|[I, can, use, Spark, with, Python] |
|[Spark, is, not, so, difficult, after, all]|
+-------------------------------------------+
+-------------------------------------------+------------------------------------------------------------------+
|words |result |
+-------------------------------------------+------------------------------------------------------------------+
|[Spark, is, quite, useful] |[0.08182145655155182,-0.07318692095577717,-0.0631803400174249] |
|[I, can, use, Spark, with, Python] |[0.016474373017748196,-1.7273581276337305E-4,-0.04478610997709135]|
|[Spark, is, not, so, difficult, after, all]|[0.019738022836723497,0.029656097292900085,-0.033315843919159045] |
+-------------------------------------------+------------------------------------------------------------------+
5. Pipelines
Most models are computed as a concatenation of operations, each operation transforming the original dataset. For example, normalization -> component analysis -> regression. MLlib uses the concept of pipelines (similinar to the one used in SciKit) to unify the execution of a sequence of steps into a single object.
The pipeline is defined as a sequence of stages connecting transformers and estimators:
- Transformer: receives an input dataframe and returns a transformed version (standardizers)
- Estimator: receives an input dataframe and after fitting returns a transformer (linear regression, logistic regression, etc.)
Creating a pipeline is equivalent to set the sequence of stages to be executed.
from pyspark.ml.pipeline import Pipeline
pipeline = Pipeline(stages=[standardizer, pca, lr])
Then we fit the model and transform the dataset to get the corresponding results:
model = pipeline.fit(dataset)
model.transform(dataset).show()
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.pipeline import Pipeline
session = SparkSession.builder.appName('Example').master('local').getOrCreate()
corpus = session.createDataFrame([
('Spark is quite useful',),
('I can use Spark with Python',),
('Spark is not so difficult after all',),
], ['docs'])
corpus.show(truncate=False)
tokenizer = Tokenizer(inputCol='docs', outputCol='tokens')
stop_remover = StopWordsRemover(inputCol='tokens', outputCol='filtered')
cv = CountVectorizer(inputCol='filtered', outputCol='frequencies')
pipeline = Pipeline(stages=[tokenizer, stop_remover, cv])
fitted = pipeline.fit(corpus)
result = fitted.transform(corpus)
result.show(truncate=False)
for m in fitted.stages:
print('-->',m.uid)
print(m.params)
+-----------------------------------+
|docs |
+-----------------------------------+
|Spark is quite useful |
|I can use Spark with Python |
|Spark is not so difficult after all|
+-----------------------------------++-----------------------------------+-------------------------------------------+----------------------+-------------------------+
|docs |tokens |filtered |frequencies |
+-----------------------------------+-------------------------------------------+----------------------+-------------------------+
|Spark is quite useful |[spark, is, quite, useful] |[spark, quite, useful]|(6,[0,2,3],[1.0,1.0,1.0])|
|I can use Spark with Python |[i, can, use, spark, with, python] |[use, spark, python] |(6,[0,1,5],[1.0,1.0,1.0])|
|Spark is not so difficult after all|[spark, is, not, so, difficult, after, all]|[spark, difficult] |(6,[0,4],[1.0,1.0]) |
+-----------------------------------+-------------------------------------------+----------------------+-------------------------+--> Tokenizer_346e29794e54
[Param(parent='Tokenizer_346e29794e54', name='inputCol', doc='input column name.'), Param(parent='Tokenizer_346e29794e54', name='outputCol', doc='output column name.')]
--> StopWordsRemover_cdda6836267e
[Param(parent='StopWordsRemover_cdda6836267e', name='caseSensitive', doc='whether to do a case sensitive comparison over the stop words'), Param(parent='StopWordsRemover_cdda6836267e', name='inputCol', doc='input column name.'), Param(parent='StopWordsRemover_cdda6836267e', name='inputCols', doc='input column names.'), Param(parent='StopWordsRemover_cdda6836267e', name='locale', doc='locale of the input. ignored when case sensitive is true'), Param(parent='StopWordsRemover_cdda6836267e', name='outputCol', doc='output column name.'), Param(parent='StopWordsRemover_cdda6836267e', name='outputCols', doc='output column names.'), Param(parent='StopWordsRemover_cdda6836267e', name='stopWords', doc='The words to be filtered out')]
--> CountVectorizer_983c07eb2c8f
[Param(parent='CountVectorizer_983c07eb2c8f', name='binary', doc='Binary toggle to control the output vector values. If True, all nonzero counts (after minTF filter applied) are set to 1. This is useful for discrete probabilistic models that model binary events rather than integer counts. Default False'), Param(parent='CountVectorizer_983c07eb2c8f', name='inputCol', doc='input column name.'), Param(parent='CountVectorizer_983c07eb2c8f', name='maxDF', doc='Specifies the maximum number of different documents a term could appear in to be included in the vocabulary. A term that appears more than the threshold will be ignored. If this is an integer >= 1, this specifies the maximum number of documents the term could appear in; if this is a double in [0,1), then this specifies the maximum fraction of documents the term could appear in. Default (2^63) - 1'), Param(parent='CountVectorizer_983c07eb2c8f', name='minDF', doc='Specifies the minimum number of different documents a term must appear in to be included in the vocabulary. If this is an integer >= 1, this specifies the number of documents the term must appear in; if this is a double in [0,1), then this specifies the fraction of documents. Default 1.0'), Param(parent='CountVectorizer_983c07eb2c8f', name='minTF', doc="Filter to ignore rare words in a document. For each document, terms with frequency/count less than the given threshold are ignored. If this is an integer >= 1, then this specifies a count (of times the term must appear in the document); if this is a double in [0,1), then this specifies a fraction (out of the document's token count). Note that the parameter is only used in transform of CountVectorizerModel and does not affect fitting. Default 1.0"), Param(parent='CountVectorizer_983c07eb2c8f', name='outputCol', doc='output column name.'), Param(parent='CountVectorizer_983c07eb2c8f', name='vocabSize', doc='max size of the vocabulary. Default 1 << 18.')]
The second part will show how to use common ML algorithms.
The original Jupyter notebook is available in this repo.
If you are interested in this content, you can check my blog.
Thanks for reading.