Count large amount of words with Apache Spark Dataframe
One of the basic examples in Apache Spark and other big data platforms is counting words. This example reads a file into a Apache Spark DataFrame
and then performs basic tasks: Minimum, Maximum, Mean and Standard Deviation are calculated.
The CSV file I am going to read is taken from Kaggle and contains news which are either fake or not. For basic analysis, I will get some characteristics from the file. The csv file looks as follows:
"uuid","ord_in_thread","author","published","title","text","language","crawled","site_url","country","domain_rank","thread_title","spam_score","main_img_url","replies_count","participants_count","likes","comments","shares","type"
"6a175f46bcd24d39b3e962ad0f29936721db70db",0,"Barracuda Brigade","2016-10-26T21:41:00.000+03:00","Muslims BUSTED: They Stole Millions In Gov’t Benefits","Print They should pay all the back all the money plus interest. The entire family and everyone who came in with them need to be deported asap. Why did it take two years to bust them? Here we go again …another group stealing from the government and taxpayers! A group of Somalis stole over four million in government benefits over just 10 months! We’ve reported on numerous cases like this one where the Muslim refugees/immigrants commit fraud by scamming our system…It’s way out of control! More Related","english","2016-10-27T01:49:27.168+03:00","100percentfedup.com","US",25689,"Muslims BUSTED: They Stole Millions In Gov’t Benefits",0,"http://bb4sp.com/wp-content/uploads/2016/10/Fullscreen-capture-10262016-83501-AM.bmp.jpg",0,1,0,0,0,"bias"
SparkSession
:
from pyspark.sql.functions import mean as _mean, stddev as _std, min as _min, max as _max, col, explode
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer
session = SparkSession.builder.appName('fake').getOrCreate()
DataFrame
. The several options are required because the CSV file doesn’t follow a standard format. For example it has line breaks within each field which requires encapsulation with double quotes ("
) for each field.
df = session.read\
.format('csv')\
.option('header', True)\
.option('multiline', True)\
.option('mode', 'DROPMALFORMED')\
.option('quote', '"')\
.option('escape', '"')\
.load('fake.csv')
text
:
all_columns = df.columns
all_columns.remove('text')
df = df.drop(*all_columns)
None
because they provide no value:
df = df.where(col('text').isNotNull())
Tokenizer
. This splits the sentence into an array of words. For example, ‘Hello World.’ becomes [hello, world]
.
tokenizer = Tokenizer(inputCol='text', outputCol='words')
wordsData = tokenizer.transform(df)
wordsData.show()
+--------------------+--------------------+
| text| words|
+--------------------+--------------------+
|Print They should...|[print, they, sho...|
|Why Did Attorney ...|[why, did, attorn...|
|Red State :
Fox ...|[red, state, :, ,...|
|Email Kayla Muell...|[email, kayla, mu...|
|Email HEALTHCARE ...|[email, healthcar...|
|Print Hillary goe...|[print, hillary, ...|
|BREAKING! NYPD Re...|[breaking!, nypd,...|
|BREAKING! NYPD Re...|[breaking!, nypd,...|
|
Limbaugh said th...|[, limbaugh, said...|
|Email
These peop...|[email, , these, ...|
| | []|
|
Who? Comedian.
|[, who?, comedian...|
|Students expresse...|[students, expres...|
|Email For Republi...|[email, for, repu...|
|Copyright © 2016 ...|[copyright, ©, 20...|
|Go to Article A T...|[go, to, article,...|
|Copyright © 2016 ...|[copyright, ©, 20...|
|Go to Article Don...|[go, to, article,...|
|John McNaughton i...|[john, mcnaughton...|
|Go to Article Dea...|[go, to, article,...|
+--------------------+--------------------+
only showing top 20 rows
explode()
each line. This method takes items of an array and inserts one new row for each item of the array. In the example above, [hello, world]
, would become hello
and world
. This is needed as a preparation for counting the words.
singleWords = wordsData.select(explode(wordsData.words).alias('word'))
singleWords.show()
+---------+
| word|
+---------+
| print|
| they|
| should|
| pay|
| all|
| the|
| back|
| all|
| the|
| money|
| plus|
|interest.|
| the|
| entire|
| family|
| and|
| everyone|
| who|
| came|
| in|
+---------+
only showing top 20 rows
counts = singleWords.groupBy('word').count().orderBy('count', ascending=False)
+----+------+
|word| count|
+----+------+
| the|468400|
| of|222781|
| to|222298|
| and|202780|
| a|159362|
| |153173|
| in|147685|
|that|104812|
| is| 98624|
| for| 71160|
| on| 58344|
| as| 52508|
|with| 50037|
| it| 48248|
| are| 47200|
|this| 44396|
| by| 44333|
| be| 42129|
| was| 41164|
|have| 39288|
+----+------+
only showing top 20 rows
counts.select(_mean('count').alias('mean'),
_std('count').alias('stddev'),
_min('count').alias('min'),
_max('count').alias('max')).show()
+------------------+------------------+---+------+
| mean| stddev|min| max|
+------------------+------------------+---+------+
|24.613804751125944|1217.3337287760087| 1|468400|
+------------------+------------------+---+------+