Real-Time Analytics Lectures for SGH students

Laboratory 2 - Data in Apache Spark

Batch Apache Spark

# SparkContext
from pyspark import SparkContext
sc = SparkContext(appName="myAppName")
sc

Newer version

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .appName("new")\
        .getOrCreate()

You can get SparkContext from SparkSession object:

sc = spark.sparkContext

RDD Transformations and Actions

In this lecture we will begin to delve deeper into using Spark and Python. Please view the video lecture for a full explanation.

Important Terms

Let’s quickly go over some important terms:

Term Definition
RDD Resilient Distributed Dataset
Transformation Spark operation that produces an RDD
Action Spark operation that produces a local object
Spark Job Sequence of transformations on data with a final action

Creating an RDD

There are two common ways to create an RDD:

Method Result
sc.parallelize(array) Create RDD of elements of array (or list)
sc.textFile(path/to/file) Create RDD of lines from file

RDD Transformations

We can use transformations to create a set of instructions we want to preform on the RDD (before we call an action and actually execute them).

Transformation Example Result
filter(lambda x: x % 2 == 0) Discard non-even elements
map(lambda x: x * 2) Multiply each RDD element by 2
map(lambda x: x.split()) Split each string into words
flatMap(lambda x: x.split()) Split each string into words and flatten sequence
sample(withReplacement=True,0.25) Create sample of 25% of elements with replacement
union(rdd) Append rdd to existing RDD
distinct() Remove duplicates in RDD
sortBy(lambda x: x, ascending=False) Sort elements in descending order

Generation of RDD

rdd = sc.parallelize(range(10))
rdd

RDD Actions

Once you have your ‘recipe’ of transformations ready, what you will do next is execute them by calling an action. Here are some common actions:

Action Result
collect() Convert RDD to in-memory list
take(3) First 3 elements of RDD
top(3) Top 3 elements of RDD
takeSample(withReplacement=True,3) Create sample of 3 elements with replacement
sum() Find element sum (assumes numeric elements)
mean() Find element mean (assumes numeric elements)
stdev() Find element deviation (assumes numeric elements)

Action

rdd.first()
rdd.take(2)
rdd.takeSample(True,3)
rdd.takeSample(False,3)
rdd.count()
rdd.mean()
rdd2 = rdd.map(lambda x: x*x).collect()
rdd3 = rdd.map(lambda x: [x,x]).collect()
rdd4 = rdd.flatMap(lambda x: [x,x]).collect()

sc.parallelize(range(20)) \
.map(lambda x: x * 2) \
.filter(lambda x: x != 2) \
.reduce(lambda x,y: x + y)
rdd11a = sc.parallelize(('aa','bb','cc','dd','aa','cc','ee','ff','dd','dd','aa'))
rdd11b = rdd11a.map(lambda k: (k,1))
rdd11b.countByKey().items()


rdda1 = sc.parallelize(('aa','bb','cc','dd','ee','ff','gg','aa')).map(lambda k: (k,1))
rdda2 = sc.parallelize(('aa','cc','mm','rr','tt')).map(lambda k: (k,1))
rdda1.join(rdda2).collect()

rdda1.leftOuterJoin(rdda2).collect()

rdda1.rightOuterJoin(rdda2).collect()
%%file example.txt
first
second line
the third line
then a fourth line
text_rdd = sc.textFile('example.txt')
text_rdd.first()
text_rdd.take(3)
text_rdd.takeSample(True,2)
text_rdd.count()
text_rdd.map(lambda line: line.split()).collect()
def nasza_fun(line):
    return line.split()

text_rdd.map(nasza_fun).collect()
# Collect everything as a single flat map
text_rdd.flatMap(lambda line: line.split()).collect()

Spark DataFrames

rdd = sc.parallelize([(1, 2, 3, 'a b c'),
             (4, 5, 6, 'd e f'),
             (7, 8, 9, 'g h i')])
df = rdd.toDF(['col1', 'col2', 'col3','col4'])
df.show()  
df.printSchema()

dfe = spark.createDataFrame([
                        ('1', 'Joe',   '70000', '1'),
                        ('2', 'Henry', '80000', '2'),
                        ('3', 'Sam',   '60000', '2'),
                        ('4', 'Max',   '90000', '1')],
                        ['Id', 'Name', 'Sallary','DepartmentId']
                       )
dfe.show()
dfe.printSchema()
from pyspark.sql.functions import udf, array
from pyspark.sql.types import StringType
from datetime import datetime as Date
data = [
[10,'Direct Sales',Date(2019,1,1)],
[12,'Direct Sales',Date(2019,1,2)],
[20,'Online Sales',Date(2019,1,1)],
[25,'Online Sales',Date(2019,1,2)],
]
df = spark.createDataFrame(data , ['Revenue','Department','Date'])
df.show()
adultDF = spark.read.csv("adult.data", inferSchema=True, ignoreLeadingWhiteSpace=True)

adultDF.take(1)
col_names = ["age", "workclass", "fnlwgt", "education", "education-num","marital-status", "occupation",
             "relationship", "race", "sex", "capital-gain", "capital-loss", "hours-per-week",
             "native-country", "earnings"]

adultDF = adultDF.toDF(*col_names)

adultDF.show(2, vetrical=True)

adultDF.printSchema()

filtering by drop and dropna

adultDF = adultDF.drop("fnlwgt").dropna("any")
adultDF.show(1, vertical=True)

From Spark DataFrame to Pandas DataFrame

df = adultDF.toPandas()
df.describe()

pure Spark

adultDF.select(['age', 'education-num']).describe().show()

SQL with Spark

Save DataFrame As SQL Table

adultDF.write.saveAsTable("adult")

and run SQL querie

newAdult = spark.sql("select age, education, sex from adult where age > 50")
newAdult.show(3)