Friedrich Ewald My Personal Website

Count large amount of words with Apache Spark Dataframe

One of the basic examples in Apache Spark and other big data platforms is counting words. This example reads a file into a Apache Spark DataFrame and then performs basic tasks: Minimum, Maximum, Mean and Standard Deviation are calculated. The CSV file I am going to read is taken from Kaggle and contains news which are either fake or not. For basic analysis, I will get some characteristics from the file. The csv file looks as follows:

"uuid","ord_in_thread","author","published","title","text","language","crawled","site_url","country","domain_rank","thread_title","spam_score","main_img_url","replies_count","participants_count","likes","comments","shares","type"
"6a175f46bcd24d39b3e962ad0f29936721db70db",0,"Barracuda Brigade","2016-10-26T21:41:00.000+03:00","Muslims BUSTED: They Stole Millions In Gov’t Benefits","Print They should pay all the back all the money plus interest. The entire family and everyone who came in with them need to be deported asap. Why did it take two years to bust them? Here we go again …another group stealing from the government and taxpayers! A group of Somalis stole over four million in government benefits over just 10 months! We’ve reported on numerous cases like this one where the Muslim refugees/immigrants commit fraud by scamming our system…It’s way out of control! More Related","english","2016-10-27T01:49:27.168+03:00","100percentfedup.com","US",25689,"Muslims BUSTED: They Stole Millions In Gov’t Benefits",0,"http://bb4sp.com/wp-content/uploads/2016/10/Fullscreen-capture-10262016-83501-AM.bmp.jpg",0,1,0,0,0,"bias"
First, I import all necessary methods and classes and then establish the connection to Spark via SparkSession:
from pyspark.sql.functions import mean as _mean, stddev as _std, min as _min, max as _max, col, explode
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer

session = SparkSession.builder.appName('fake').getOrCreate()
In the next step, read the CSV file into a DataFrame. The several options are required because the CSV file doesn’t follow a standard format. For example it has line breaks within each field which requires encapsulation with double quotes (") for each field.
df = session.read\
    .format('csv')\
    .option('header', True)\
    .option('multiline', True)\
    .option('mode', 'DROPMALFORMED')\
    .option('quote', '"')\
    .option('escape', '"')\
    .load('fake.csv')
Then, drop all the columns except the one which is named text:
all_columns = df.columns
all_columns.remove('text')
df = df.drop(*all_columns)
Filter all rows where the text is None because they provide no value:
df = df.where(col('text').isNotNull())
Tokenize each sentence with the default Tokenizer. This splits the sentence into an array of words. For example, ‘Hello World.’ becomes [hello, world].
tokenizer = Tokenizer(inputCol='text', outputCol='words')
wordsData = tokenizer.transform(df)
The output looks as follows:
wordsData.show()
+--------------------+--------------------+
|                text|               words|
+--------------------+--------------------+
|Print They should...|[print, they, sho...|
|Why Did Attorney ...|[why, did, attorn...|
|Red State : 
Fox ...|[red, state, :, ,...|
|Email Kayla Muell...|[email, kayla, mu...|
|Email HEALTHCARE ...|[email, healthcar...|
|Print Hillary goe...|[print, hillary, ...|
|BREAKING! NYPD Re...|[breaking!, nypd,...|
|BREAKING! NYPD Re...|[breaking!, nypd,...|
|
Limbaugh said th...|[, limbaugh, said...|
|Email 
These peop...|[email, , these, ...|
|                    |                  []|
|
Who? Comedian. 
|[, who?, comedian...|
|Students expresse...|[students, expres...|
|Email For Republi...|[email, for, repu...|
|Copyright © 2016 ...|[copyright, ©, 20...|
|Go to Article A T...|[go, to, article,...|
|Copyright © 2016 ...|[copyright, ©, 20...|
|Go to Article Don...|[go, to, article,...|
|John McNaughton i...|[john, mcnaughton...|
|Go to Article Dea...|[go, to, article,...|
+--------------------+--------------------+
only showing top 20 rows
In the next step, we need to explode() each line. This method takes items of an array and inserts one new row for each item of the array. In the example above, [hello, world], would become hello and world. This is needed as a preparation for counting the words.
singleWords = wordsData.select(explode(wordsData.words).alias('word'))
Again showing the result:
singleWords.show()
+---------+
|     word|
+---------+
|    print|
|     they|
|   should|
|      pay|
|      all|
|      the|
|     back|
|      all|
|      the|
|    money|
|     plus|
|interest.|
|      the|
|   entire|
|   family|
|      and|
| everyone|
|      who|
|     came|
|       in|
+---------+
only showing top 20 rows
In the last step, the words need to be grouped and counted.
counts = singleWords.groupBy('word').count().orderBy('count', ascending=False)
This results in the following
+----+------+
|word| count|
+----+------+
| the|468400|
|  of|222781|
|  to|222298|
| and|202780|
|   a|159362|
|    |153173|
|  in|147685|
|that|104812|
|  is| 98624|
| for| 71160|
|  on| 58344|
|  as| 52508|
|with| 50037|
|  it| 48248|
| are| 47200|
|this| 44396|
|  by| 44333|
|  be| 42129|
| was| 41164|
|have| 39288|
+----+------+
only showing top 20 rows
From that it’s only a small step to getting the basic statistics:
counts.select(_mean('count').alias('mean'),
              _std('count').alias('stddev'),
              _min('count').alias('min'),
              _max('count').alias('max')).show()

+------------------+------------------+---+------+
|              mean|            stddev|min|   max|
+------------------+------------------+---+------+
|24.613804751125944|1217.3337287760087|  1|468400|
+------------------+------------------+---+------+


About the author

is an experienced Software Engineer with a Master's degree in Computer Science. He started this website in late 2015, mostly as a digital business card. He is interested in Go, Python, Ruby, SQL- and NoSQL-databases, machine learning and AI and is experienced in building scalable, distributed systems and micro-services at multiple larger and smaller companies.