Introduction to PySpark and PySpark ML Libraries
Introduction
This tutorial will explain and illustrate some of the key differences between two data processing libraries, Pandas and PySpark. While both can be used to similar effects, PySpark has several unique features that separate it from Pandas, namely its ability to run operations in parallel and in distributed environments. A Pandas Dataframe is constrained in size by the memory of the server the application is executed on, which can limit the amount of data processed. A PySpark dataframe on the other hand can also act as a distributed SQL like query engine, enabling large amounts of data to be stored and read from a single dataframe, even in a distributed environment. This is especially useful for Big-Data applications. While many of its benefits are focused on distributed computing applications and Big-Data, this tutorial will provide some basics of using the PySpark library, including reading in data, creating user-defined functions, and using the PySpark machine learning modules.
Tutorial Content
This tutorial will show how to set up PySpark in a Jupyter Notebook and how using PySpark can enable big data processing.
We will be using Amazon product data gathered from UCSD’s Julian McAuley in order to execute operations. While PySpark is best suited to distributed execution, this tutorial will focus on execution on a single node. However, these operations are ubiquitous to both local and distributed Spark use cases.
- Installing Libraries
- Installing Java, Spark, etc
- SparkSession vs SparkContext
- Reading in Data with PySpark
- Reading in Data with Pandas
- Predicting Amazon Ratings from Reviews on Big Data Sets
- Summary and References
Installing the Libraries
Before starting, you will need to install PySpark using pip:
pip3 install --upgrade pyspark
or conda install -c conda-forge pyspark
additionally, if any error with python version mismatch occurs when executing any of the code, you may need to upgrade your python version to match.
In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, CountVectorizer, StopWordsRemover, IDF
import pyspark.sql.functions as F
from pyspark.sql import types as T
import re
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import math
Since pyspark relies on Apache Spark, we need to install Apache Spark on our system.
Installing Java, Spark (and winutils for windows)
The following instructions are for Windows only. See Note for Mac.
Java
In addition to pyspark, we will need to install Java 8 or newer. https://www.java.com/en/download/. To check if already installed, type java -version
in your command prompt. This tutorial walks through how to install with windows. The process may differ slightly for MacOS or other operating systems.
Spark
Go to https://spark.apache.org/downloads.html and download the latest version of Apache Spark. Install directly under your C: directory in a file path with no spaces. We will refer to this spark download folder as SPARK_HOME. After downloading, navigate to bin\pyspark.exe and execute. There should be numerous error messages, with one stating that the winutils binary could not be located.
Winutils
Navigate to https://github.com/steveloughran/winutils and download the bin of the corresponding Hadoop version you selected when downloading Spark. Move the bin to a new folder called hadoop
within the SPARK_HOME folder. In order for these effects to take place, we must set the SPARK_HOME and HADOOP_HOME environment variables. Type in enviornment variables
in the windows search bar and select the tile Edit the system environment variables
. Click the Enviornment_Variables button at the bottom of the advanced tab. Under system variables, click New... Create a SPARK_HOME environment variable with the correct path and a HADOOP_HOME environment variable with a path to the Hadoop folder.
Rerun pyspark.exe. There may still be an error stating Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
however this occurs if you are running on a 64-bit machine and the Hadoop libraries are compiled for 32-bits. This warning can be ignored.
Fixing log level for Spark
1) Save a copy of the log4j.properties.template file in the \conf folder under SPARK_HOME and rename it to log4j.propertiesfile.
2) Set the log4j.rootCategory value in the file to WARN, console and save.
Now rerunning pyspark should only display warnings and errors to the console.
Note: For MacOS
To install apache-spark, you only need to install with homebrew: homebrew install apache-spark
Spark Session vs Spark Context
Prior to the 2.0 version of Spark, a Spark Context was considered the entry point to the spark application and the entry point was created using sc = SparkContext("local", "SparkFile App")
. For other applications, including SQL, SQL Context, or streaming, a specific context had to be created. Now, a spark session creates a new spark context for all of the above and is contained within the SparkSession. To use the spark context, create the spark session and access the spark context with .sparkContext(). An example is provided below.
In [2]:
# Create the spark session
sc = SparkSession.builder.appName("AmazonData").getOrCreate()
PySpark Dataframe Structure
PySpark Dataframes are based on the PySpark Resilient Distributed Dataset (RDD). This is a fault-tolerant collection of elements that can be operated on in parallel. RDDs can be created using the parallelize() function from existing data, or read in from files.
In [3]:
# Create an RDD using the spark context of 1 million numbers from 0 on.
rdd1 = sc.sparkContext.parallelize(range(100000))
# Get the number of partitions.
rdd1.getNumPartitions()
Out[3]:
12
Here we see that the number of partitions depends on the number of cores in your CPU. In this example, the computer this code was executed on has 6 physical cores and 12 logical cores, therefore there are 12 Partitions. Each partition is a separate portion of the data that can be processed in parallel. Next, we will define some function which we will apply to the RDD and compare to the execution time to the same function applied to a python list.
In [4]:
# Create an arbitrary function that compute the tangent of a number and stores in a list 10,000 times.
# This is simply meant to be computationally intensive.
from math import tan
def time_function(x):
t = [tan(j) for j in range(10000)]
return tan(x)
In [5]:
%%time
time_function(5)Wall time: 1.5 ms
Out[5]:
-3.380515006246586
In [6]:
%%time
map_test = rdd1.map(lambda x: time_function(x))Wall time: 0 ns
Notice that the operation executes instantly. This is because PySpark uses lazy evaluation, meaning nothing is actually computed, only a plan of what to execute. The execution only takes place once the data is actually accessed.
In [7]:
%%time
# Count the number of entries where the values is greater than 0.
print(map_test.filter(lambda x:x>0).count())50005
Wall time: 18.6 s
In [8]:
%%time
# Perform the equivalent operations on a python list. All of the setup is included, since this is all included in the
# computation time above.
large_list = [i for i in range(100000)]
large_list = list(map(lambda x: time_function(x), large_list))
print(len(list(filter(lambda x: x>0, large_list))))50005
Wall time: 1min 45s
Evaluating Performance
Notice that even though the same operations are taking place, the execution time of the PySpark RDD is significantly faster. Though with smaller list sizes python will win out because of the overhead of parallelism, when larger data is used, the parallel computation of Spark means it will take less time. This execution time is only further lowered when used in a distributed environment.
Reading in Data with PySpark
Since we are reading data from a URL, we are going to use SparkFiles to download the file to the spark context and then read the file into a spark dataframe. While not necessary for this problem, we will extract the name of the csv from the url and the name of the json file, since spark uses an absolute file context and the name is required to access the files from the context. In this example, we are loading 1,297,156 ratings and 151,254 reviews.
Check out http://jmcauley.ucsd.edu/data/amazon/ for a list of all available datasets. The data we are using is only a small subset.
In [9]:
# Save the file. This may take some time to download.
url1 = "http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/ratings_Grocery_and_Gourmet_Food.csv"
sc.sparkContext.addFile(url1)
In [10]:
# extract filename
filename1 = url1.split(r'/')[-1]
In [11]:
# Define a schema. This csv does not include column names, therefore we must define them. We do not want to infer the
# schema, therefore we will use pyspark types to define the types.
schema = T.StructType([\
T.StructField("user", T.StringType(), True),\
T.StructField("item", T.StringType(), True),\
T.StructField("rating", T.StringType(), True),\
T.StructField("timestamp", T.IntegerType(), True)])
# Read the file from the Spark context. No header is present in the csv, therefore header=False.
df_spark_csv = sc.read.csv(SparkFiles.get(filename1), header=False, schema=schema)
Viewing the Dataframe
In order to get an idea of what the PySpark dataframe looks like, we use the .show() command, similar to .head(n) in Pandas.
In [12]:
df_spark_csv.show(5)+--------------+----------+------+----------+
| user| item|rating| timestamp|
+--------------+----------+------+----------+
|A1ZQZ8RJS1XVTX|0657745316| 5.0|1381449600|
|A31W38VGZAUUM4|0700026444| 5.0|1354752000|
|A3I0AV0UJX5OH0|1403796890| 1.0|1385942400|
|A3QAAOLIXKV383|1403796890| 3.0|1307836800|
| AB1A5EGHHVA9M|141278509X| 5.0|1332547200|
+--------------+----------+------+----------+
only showing top 5 rows
Reading in Data from Pandas
One benefit to using PySpark to read in data from a url is that the file is saved in the spark context and can be accessed easily after the download. Reading a url directly into a pandas dataframe requires the file to be downloaded each time, unless another library is used to download the url, like the requests library. Though this is fairly simple, it can be an extra required step. However, a benefit to pandas is that the types are inferred easily and a struct type schema does not need to be defined in order to name the columns like in spark.
In [13]:
# notice this must be run each time you want to read in the file, unless you save the csv.
df_pandas_csv = pd.read_csv(url1, names=['user', 'item', 'rating', 'timestamp'])
Viewing the Dataframe
In order to get an idea of what the Pandas dataframe looks like, we use the .head() command
In [14]:
df_pandas_csv.head(5)
Out[14]:
useritemratingtimestamp0A1ZQZ8RJS1XVTX06577453165.013814496001A31W38VGZAUUM407000264445.013547520002A3I0AV0UJX5OH014037968901.013859424003A3QAAOLIXKV38314037968903.013078368004AB1A5EGHHVA9M141278509X5.01332547200
Predicting Amazon Ratings from Reviews on Big Data Sets
What makes PySpark unique is its inclusion of its Spark Machine Learning Library, which is similar to Scikit-Learn. In this example, we will show how to predict a user’s rating from the review text using PySpark’s machine learning library. We will use Logistic Regression to predict and classify the associated review text.
Loading the Data
In [15]:
url = "http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Grocery_and_Gourmet_Food_5.json.gz"
sc.sparkContext.addFile(url)
# extract filename
filename_json_reviews = url.split(r'/')[-1]
In [16]:
# We can just as easily read compressed, json files, using .read.json()
df_json_reviews = sc.read.json(SparkFiles.get(filename_json_reviews))
Viewing the Data
Let’s get a better idea of what data we are working with and which columns are important
In [17]:
df_json_reviews.show(5)+----------+-------+-------+--------------------+-----------+--------------+---------------+--------------------+--------------+
| asin|helpful|overall| reviewText| reviewTime| reviewerID| reviewerName| summary|unixReviewTime|
+----------+-------+-------+--------------------+-----------+--------------+---------------+--------------------+--------------+
|616719923X| [0, 0]| 4.0|Just another flav...| 06 1, 2013|A1VEELTKS8NLZB|Amazon Customer| Good Taste| 1370044800|
|616719923X| [0, 1]| 3.0|I bought this on ...|05 19, 2014|A14R9XMZVJ6INB| amf0001|3.5 stars, sadly...| 1400457600|
|616719923X| [3, 4]| 4.0|Really good. Grea...| 10 8, 2013|A27IQHDZFQFNGG| Caitlin| Yum!| 1381190400|
|616719923X| [0, 0]| 5.0|I had never had i...|05 20, 2013|A31QY5TASILE89| DebraDownSth|Unexpected flavor...| 1369008000|
|616719923X| [1, 2]| 4.0|I've been looking...|05 26, 2013|A2LWK003FFMCI5| Diana X.|Not a very strong...| 1369526400|
+----------+-------+-------+--------------------+-----------+--------------+---------------+--------------------+--------------+
only showing top 5 rows
Removing unwanted columns
Similar to pandas, PySpark dataframes can be manipulated using SQL like operations. In this case, we will just select the overall and reveiwText columns to keep.
In [18]:
keep_columns = ["overall", "reviewText"]
# Select returns a new PySpark Dataframe
df_json_reviews = df_json_reviews.select([column for column in keep_columns])
Now df_json_reviews just has overall and reviewText
In [19]:
df_json_reviews.show(5)+-------+--------------------+
|overall| reviewText|
+-------+--------------------+
| 4.0|Just another flav...|
| 3.0|I bought this on ...|
| 4.0|Really good. Grea...|
| 5.0|I had never had i...|
| 4.0|I've been looking...|
+-------+--------------------+
only showing top 5 rows
Developing the Model Pipeline
The pipeline will be broken down into several steps listed below:
1) Remove any html tags and then tokenize
2) Remove stopwords using the built-in StopWordsRemoval and the default stop words
3) Compute HashingTF which maps a sequence of terms to their term frequencies using a hashing trick (or optionally Count Vectorizer)
4) Compute TF-IDF from HashingTF (or Count Vectorizer)
5) Logistic Regression
6) Use PySpark ml pipeline to execute the above stages
Spark Dataframes are immutable, meaning preprocessing the strings is not exactly the same as pandas. Here, we are going to create a user-defined function (UDF) to remove urls and perform other processing
What is a UDF?
In order to preprocess the data, we must use a user-defined function. A user-defined function is used to manipulate data in a PySpark Dataframe. PySpark dataframes are immutable, so any operation performed on the dataframe creates a new dataframe. Here we use a UDF to add a new column which is a filtered version of the text, removing links, apostrophes, and spaces:
In [21]:
def replace(text):
text = text.lower()
# remove http links
text = re.sub(r'http[s]?:\/\/t.co\/[\w]+', '', text)
# remove trailing 's chars followed by other apostrophes
text = re.sub(r"'s", '', text)
text = re.sub(r"'", '', text)
text = re.sub(r"[^a-zA-Z0-9]+", ' ', text)
return text
replaceUDF = F.udf(lambda z: replace(z), T.StringType())
df_cleaned = df_json_reviews.withColumn("cleanedText", replaceUDF(F.col("reviewText")))
In [22]:
df_cleaned.show()+-------+--------------------+--------------------+
|overall| reviewText| cleanedText|
+-------+--------------------+--------------------+
| 4.0|Just another flav...|just another flav...|
| 3.0|I bought this on ...|i bought this on ...|
| 4.0|Really good. Grea...|really good great...|
| 5.0|I had never had i...|i had never had i...|
| 4.0|I've been looking...|ive been looking ...|
| 4.0|These Kit-kats ar...|these kit kats ar...|
| 3.0|I found these in ...|i found these in ...|
| 5.0|Creamy white choc...|creamy white choc...|
| 5.0|After hearing mix...|after hearing mix...|
| 1.0|I love green tea,...|i love green tea ...|
| 5.0|I ordered these i...|i ordered these i...|
| 5.0|These are definit...|these are definit...|
| 5.0|Yes - this is one...|yes this is one o...|
| 5.0|I love the green ...|i love the green ...|
| 3.0|I love Kit Kat & ...|i love kit kat gr...|
| 4.0|I tried this for ...|i tried this for ...|
| 5.0|This curry paste ...|this curry paste ...|
| 5.0|I've purchased di...|ive purchased dif...|
| 5.0|I love ethnic foo...|i love ethnic foo...|
| 4.0|I started a new d...|i started a new d...|
+-------+--------------------+--------------------+
only showing top 20 rows
Splitting Data into Training and Test
PySpark also offers an easy way to split data into training and testing. By using the .randomSplit() method, we can randomly separate the data into training and test dataframes.
In [23]:
# 70% training, 30% testing.
trainingData, testData = df_cleaned.randomSplit([0.7, 0.3], seed=100)
At this point, we added another column called cleanedText which applies some preprocessing, including removing urls, apostrophes, and non-alphanumeric characters.
Tokenization
Now let’s tokenize the text and output it to a new column called “words”
In [24]:
tokenizer = Tokenizer(inputCol="cleanedText", outputCol="words")
Stop Word Removal
Next, we use the StopWordsRemover to output the text with common stop words removed. In order to see these words, we can run .getStopWords()
In [25]:
stop_words = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="StopRemoved")
In [26]:
stop_words.getStopWords()
Out[26]:
['i',
'me',
'my',
'myself',
'we',
'our',
'ours',
'ourselves',
'you',
'your',
'yours',
'yourself',
'yourselves',
'he',
'him',
'his',
'himself',
'she',
'her',
'hers',
'herself',
'it',
'its',
'itself',
'they',
'them',
'their',
'theirs',
'themselves',
'what',
'which',
'who',
'whom',
'this',
'that',
'these',
'those',
'am',
'is',
'are',
'was',
'were',
'be',
'been',
'being',
'have',
'has',
'had',
'having',
'do',
'does',
'did',
'doing',
'a',
'an',
'the',
'and',
'but',
'if',
'or',
'because',
'as',
'until',
'while',
'of',
'at',
'by',
'for',
'with',
'about',
'against',
'between',
'into',
'through',
'during',
'before',
'after',
'above',
'below',
'to',
'from',
'up',
'down',
'in',
'out',
'on',
'off',
'over',
'under',
'again',
'further',
'then',
'once',
'here',
'there',
'when',
'where',
'why',
'how',
'all',
'any',
'both',
'each',
'few',
'more',
'most',
'other',
'some',
'such',
'no',
'nor',
'not',
'only',
'own',
'same',
'so',
'than',
'too',
'very',
's',
't',
'can',
'will',
'just',
'don',
'should',
'now',
"i'll",
"you'll",
"he'll",
"she'll",
"we'll",
"they'll",
"i'd",
"you'd",
"he'd",
"she'd",
"we'd",
"they'd",
"i'm",
"you're",
"he's",
"she's",
"it's",
"we're",
"they're",
"i've",
"we've",
"you've",
"they've",
"isn't",
"aren't",
"wasn't",
"weren't",
"haven't",
"hasn't",
"hadn't",
"don't",
"doesn't",
"didn't",
"won't",
"wouldn't",
"shan't",
"shouldn't",
"mustn't",
"can't",
"couldn't",
'cannot',
'could',
"here's",
"how's",
"let's",
'ought',
"that's",
"there's",
"what's",
"when's",
"where's",
"who's",
"why's",
'would']
Compute Term Frequency
Now let’s compute the term frequency and output it to a column called “features”. This can be done using CountVectorizer or HashingTF. Feel free to try it both ways.
In [27]:
tf = HashingTF(inputCol=stop_words.getOutputCol(), outputCol="rawFeatures")
#tf = CountVectorizer(inputCol=tokenizer.getOutputCol(), outputCol="features")
idf = IDF(inputCol=tf.getOutputCol(), outputCol="features", minDocFreq=5)
Logistic Regression
Now let’s set up logistic regression. Feel free to experiment wiht the maxIter. The regularization parameters are tested below during K-fold cross validation.
In [28]:
lr = LogisticRegression(maxIter=10000).setLabelCol("overall").setFeaturesCol("features")
Using the built in ml Pipeline
PySpark’s ml library allows us to set up steps in our pipeline and adjust parameters before executing. This enables us to string together multiple steps and then create a pipeline to fit the training data to.
In [29]:
pipeline = Pipeline(stages=[tokenizer, stop_words, tf, idf, lr])
Built in K-fold cross validation
Another unique feature of PySpark is the ability to perform cross validation with ease, simply by forming a parameter grid and using the CrossValidator constructor. You can specify the estimator, the parameters, the evaluator, and the number of folds. In this case, we use our MulticlassClassification Evaluator with accuracy as the metric and set the label, prediction, and features columns to the correct names in the dataframe.
In [30]:
paramGrid = ParamGridBuilder() \
.addGrid(tf.numFeatures, [1000, 5000, 10000]) \
.addGrid(lr.regParam, [1, 0.5, 0.1, 0.01]) \
.build()crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator(labelCol="overall", predictionCol="prediction", metricName="accuracy"),
numFolds=2)
Note that this next cell will take a few minutes to run.
In [31]:
cvModel = crossval.fit(trainingData)
In [32]:
predictions = cvModel.transform(testData)
In [33]:
predictions = predictions.selectExpr("overall as label", "prediction", "cleanedText")
In [34]:
predictions.select("label", "prediction").show(5)+-----+----------+
|label|prediction|
+-----+----------+
| 1.0| 5.0|
| 1.0| 5.0|
| 1.0| 4.0|
| 1.0| 5.0|
| 1.0| 5.0|
+-----+----------+
only showing top 5 rows
Evaluating Model Accuracy
In the following cells, we use our MulticlassClassificationEvaluator to determine the accuracy of the model we constructed above. Keep in mind that we are trying to classify to 5 different categories simply based on text input. We can also set the metric name to other metrics. By default, the metric is F1-score. Here we use accuracy.
In [35]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
print("Accuracy: " + str(evaluator.setMetricName("accuracy").evaluate(predictions)))Accuracy: 0.6425628483927116
Interpreting the Result
While determining the actual star rating is interesting, we will also calculate the average difference between the actual rating and the predicted ratings to evaluate what this accuracy actually means. Ultimately, we have an average difference of -0.447 and a standard deviation of 0.99. While these results are not outstanding, this example shows the ability to implement a multiclass classifier easily in PySpark, using built in machine learning functions. Keep in mind that we are not truly interpreting this result. To do so, we would need to look at a confusion matrix, which can be implemented, but strangely is difficult to use on a single machine since one of the workers fails after running out of memory. In reality, there are far more 5 star reviews, so by predicting 5 stars more frequently, accuracy may be higher, but once again, this tutorial is just a baseline for using the library.
In [38]:
sim = predictions.withColumn("diff", F.col("label") -F.col("prediction"))
sim.select("diff").show(5)+----+
|diff|
+----+
|-4.0|
|-4.0|
|-3.0|
|-4.0|
|-4.0|
+----+
only showing top 5 rows
In [39]:
sim.agg(F.mean('diff')).show()+-------------------+
| avg(diff)|
+-------------------+
|-0.4469341441381893|
+-------------------+
In [40]:
sim.agg(F.stddev('diff')).show()+------------------+
| stddev_samp(diff)|
+------------------+
|0.9936002514173153|
+------------------+
Summary and References
Ultimately this tutorial highlighted some key features of PySpark and how Spark may be useful for data processing applications. PySpark’s all-in-one approach can make developing machine learning applications more scalable, especially for larger datasets.
By initially developing in a local environment, bugs can be worked out before deploying to a web-service cluster, like on Amazon web services or google cloud. While Spark’s potential is limited in this local setting, understanding some of its fundamentals is essential to a successful cluster deployment.
- PySpark Documentation: https://spark.apache.org/docs/latest/api/python/index.html
- Amazon data: http://jmcauley.ucsd.edu/data/amazon/
- Spark Download: https://spark.apache.org/downloads.html
- Winutils: https://github.com/steveloughran/winutils