# SparkContext
from pyspark import SparkContext
sc = SparkContext(appName="myAppName")
sc
Newer version
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.appName("new")\
.getOrCreate()
You can get SparkContext
from SparkSession
object:
sc = spark.sparkContext
In this lecture we will begin to delve deeper into using Spark and Python. Please view the video lecture for a full explanation.
Let’s quickly go over some important terms:
Term | Definition |
---|---|
RDD | Resilient Distributed Dataset |
Transformation | Spark operation that produces an RDD |
Action | Spark operation that produces a local object |
Spark Job | Sequence of transformations on data with a final action |
There are two common ways to create an RDD:
Method | Result |
---|---|
sc.parallelize(array) |
Create RDD of elements of array (or list) |
sc.textFile(path/to/file) |
Create RDD of lines from file |
We can use transformations to create a set of instructions we want to preform on the RDD (before we call an action and actually execute them).
Transformation Example | Result |
---|---|
filter(lambda x: x % 2 == 0) |
Discard non-even elements |
map(lambda x: x * 2) |
Multiply each RDD element by 2 |
map(lambda x: x.split()) |
Split each string into words |
flatMap(lambda x: x.split()) |
Split each string into words and flatten sequence |
sample(withReplacement=True,0.25) |
Create sample of 25% of elements with replacement |
union(rdd) |
Append rdd to existing RDD |
distinct() |
Remove duplicates in RDD |
sortBy(lambda x: x, ascending=False) |
Sort elements in descending order |
Generation of RDD
rdd = sc.parallelize(range(10))
rdd
Once you have your ‘recipe’ of transformations ready, what you will do next is execute them by calling an action. Here are some common actions:
Action | Result |
---|---|
collect() |
Convert RDD to in-memory list |
take(3) |
First 3 elements of RDD |
top(3) |
Top 3 elements of RDD |
takeSample(withReplacement=True,3) |
Create sample of 3 elements with replacement |
sum() |
Find element sum (assumes numeric elements) |
mean() |
Find element mean (assumes numeric elements) |
stdev() |
Find element deviation (assumes numeric elements) |
Action
rdd.first()
rdd.take(2)
rdd.takeSample(True,3)
rdd.takeSample(False,3)
rdd.count()
rdd.mean()
rdd2 = rdd.map(lambda x: x*x).collect()
rdd3 = rdd.map(lambda x: [x,x]).collect()
rdd4 = rdd.flatMap(lambda x: [x,x]).collect()
sc.parallelize(range(20)) \
.map(lambda x: x * 2) \
.filter(lambda x: x != 2) \
.reduce(lambda x,y: x + y)
rdd11a = sc.parallelize(('aa','bb','cc','dd','aa','cc','ee','ff','dd','dd','aa'))
rdd11b = rdd11a.map(lambda k: (k,1))
rdd11b.countByKey().items()
rdda1 = sc.parallelize(('aa','bb','cc','dd','ee','ff','gg','aa')).map(lambda k: (k,1))
rdda2 = sc.parallelize(('aa','cc','mm','rr','tt')).map(lambda k: (k,1))
rdda1.join(rdda2).collect()
rdda1.leftOuterJoin(rdda2).collect()
rdda1.rightOuterJoin(rdda2).collect()
%%file example.txt
first
second line
the third line
then a fourth line
text_rdd = sc.textFile('example.txt')
text_rdd.first()
text_rdd.take(3)
text_rdd.takeSample(True,2)
text_rdd.count()
text_rdd.map(lambda line: line.split()).collect()
def nasza_fun(line):
return line.split()
text_rdd.map(nasza_fun).collect()
# Collect everything as a single flat map
text_rdd.flatMap(lambda line: line.split()).collect()
rdd = sc.parallelize([(1, 2, 3, 'a b c'),
(4, 5, 6, 'd e f'),
(7, 8, 9, 'g h i')])
df = rdd.toDF(['col1', 'col2', 'col3','col4'])
df.show()
df.printSchema()
dfe = spark.createDataFrame([
('1', 'Joe', '70000', '1'),
('2', 'Henry', '80000', '2'),
('3', 'Sam', '60000', '2'),
('4', 'Max', '90000', '1')],
['Id', 'Name', 'Sallary','DepartmentId']
)
dfe.show()
dfe.printSchema()
from pyspark.sql.functions import udf, array
from pyspark.sql.types import StringType
from datetime import datetime as Date
data = [
[10,'Direct Sales',Date(2019,1,1)],
[12,'Direct Sales',Date(2019,1,2)],
[20,'Online Sales',Date(2019,1,1)],
[25,'Online Sales',Date(2019,1,2)],
]
df = spark.createDataFrame(data , ['Revenue','Department','Date'])
df.show()
adultDF = spark.read.csv("adult.data", inferSchema=True, ignoreLeadingWhiteSpace=True)
adultDF.take(1)
col_names = ["age", "workclass", "fnlwgt", "education", "education-num","marital-status", "occupation",
"relationship", "race", "sex", "capital-gain", "capital-loss", "hours-per-week",
"native-country", "earnings"]
adultDF = adultDF.toDF(*col_names)
adultDF.show(2, vetrical=True)
adultDF.printSchema()
filtering by drop and dropna
adultDF = adultDF.drop("fnlwgt").dropna("any")
adultDF.show(1, vertical=True)
From Spark DataFrame to Pandas DataFrame
df = adultDF.toPandas()
df.describe()
pure Spark
adultDF.select(['age', 'education-num']).describe().show()
Save DataFrame As SQL Table
adultDF.write.saveAsTable("adult")
and run SQL querie
newAdult = spark.sql("select age, education, sex from adult where age > 50")
newAdult.show(3)