from pyspark import SparkContext
= SparkContext(appName="myAppName") sc
Apache Spark on RDD data
RDD
- Resilient Distributed Dataset
- Main abstraction on Spark Core
- Two main functions:
- Actions:
- RDD as input - number as output.
- Transformations:
- lazy operation
- RDD as input and RDD as output
- Actions:
- In-Memory
- Immutable
- Lazy evaluated
- Parallel
- Partitioned
Important information
Term | Definition |
---|---|
RDD | Resilient Distributed Dataset |
Transformation | Spark operation that produces an RDD |
Action | Spark operation that produces a local object |
Spark Job | Sequence of transformations on data with a final action |
RDD create:
Method | Result |
---|---|
sc.parallelize(array) |
Create RDD of elements of array (or list) |
sc.textFile(path/to/file) |
Create RDD of lines from file |
Transformations
Transformation Example | Result |
---|---|
filter(lambda x: x % 2 == 0) |
Discard non-even elements |
map(lambda x: x * 2) |
Multiply each RDD element by 2 |
map(lambda x: x.split()) |
Split each string into words |
flatMap(lambda x: x.split()) |
Split each string into words and flatten sequence |
sample(withReplacement=True,0.25) |
Create sample of 25% of elements with replacement |
union(rdd) |
Append rdd to existing RDD |
distinct() |
Remove duplicates in RDD |
sortBy(lambda x: x, ascending=False) |
Sort elements in descending order |
Actions
Action | Result |
---|---|
collect() |
Convert RDD to in-memory list |
take(3) |
First 3 elements of RDD |
top(3) |
Top 3 elements of RDD |
takeSample(withReplacement=True,3) |
Create sample of 3 elements with replacement |
sum() |
Find element sum (assumes numeric elements) |
mean() |
Find element mean (assumes numeric elements) |
stdev() |
Find element deviation (assumes numeric elements) |
= ['Books', 'DVD', 'CD', 'PenDrive']
keywords
= sc.parallelize(keywords)
key_rdd
key_rdd.collect()
= key_rdd.map(lambda x: x.lower())
key_small
key_small.collect()
sc.stop()
Map reduce
from pyspark.sql import SparkSession
= SparkSession.builder.appName("new").getOrCreate()
spark
= spark.sparkContext sc
= sc.textFile("MobyDick.txt")
tekst 5) tekst.take(
import re
# Word Count on RDD
"MobyDick.txt")\
sc.textFile(map(lambda x: re.findall(r"[a-z']+", x.lower())) \
.lambda x: [(y, 1) for y in x]).reduceByKey(lambda x,y: x + y)\
.flatMap(5) .take(
sc.stop()
SPARK STREAMING - OLD Version
SPARK STREAMING with discretized stream
DStream (RDD sequence).
For Spark Streaming you need 2 cores.
- StreamingContext(sparkContext, batchDuration) - represents a connection with cluster,
batchDuration
- socketTextStream(hostname, port) - Create the stream
- flatMap(f), map(f), reduceByKey(f) - transformations for DRR
- pprint(n)
- StreamingContext.start() - just start stream
- StreamingContext.awaitTermination(timeout) wait for termination
- StreamingContext.stop(stopSparkContext, stopGraceFully) - end stream
You can generate StreamingContext from an SparkContext object.
import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with two working thread
# and batch interval of 1 second
= SparkContext("local[2]", "NetworkWordCount2")
sc = StreamingContext(sc, 2)
ssc
# DStream
= ssc.socketTextStream("localhost", 9998)
lines
= lines.flatMap(lambda x: re.findall(r"[a-z']+", x.lower()))
words
= words.map(lambda word: (word,1)).reduceByKey(lambda x,y: x+y)
wordCounts
wordCounts.pprint()
!nc -lk 9998
# before starting, run a stream data
# Start the computation
ssc.start()
ssc.awaitTermination()
ssc.stop() sc.stop()
Stream from a socket
nc -lk 9998
%%file start_stream.py
from socket import *
import time
= list()
rdd with open("MobyDick.txt", 'r') as ad:
for line in ad:
rdd.append(line)
= 'localhost'
HOST = 9998
PORT = (HOST, PORT)
ADDR = socket(AF_INET, SOCK_STREAM)
tcpSock
tcpSock.bind(ADDR)5)
tcpSock.listen(
while True:
= tcpSock.accept()
c, addr print('got connection')
for line in rdd:
try:
c.send(line.encode())1)
time.sleep(except:
break
c.close()print('disconnected')
%%file streamWordCount.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
if __name__ == "__main__":
= SparkSession.builder.appName("Stream_DF").getOrCreate()
spark "ERROR")
spark.sparkContext.setLogLevel(
= (spark
lines
.readStreamformat("socket")
."host", "localhost")
.option("port", 9998)
.option(
.load())
= lines.select(explode(split(lines.value, " ")).alias("word"))
words = words.groupBy("word").count()
word_counts
= (word_counts
streamingQuery
.writeStreamformat("console")
."complete")
.outputMode(="5 second")
.trigger(processingTime
.start())
streamingQuery.awaitTermination()
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
= {"count": 0}
batch_counter
def process_batch(df, batch_id):
"count"] += 1
batch_counter[print(f"Batch ID: {batch_id}")
=False)
df.show(truncate
= SparkSession.builder.appName("Stream_DF").getOrCreate()
spark "ERROR")
spark.sparkContext.setLogLevel(
= (spark
lines
.readStreamformat("socket")
."host", "localhost")
.option("port", 9998)
.option(
.load())
= lines.select(explode(split(lines.value, " ")).alias("word"))
words = words.groupBy("word").count()
word_counts
= (word_counts.writeStream
streamingQuery format("console")
."complete")
.outputMode(
.foreachBatch(process_batch) ="5 second")
.trigger(processingTime .start())
Apache Kafka + Apache Spark stream
- Check if you have a topics in your kafka env
bash cd ~ kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092
- create new topic
streamXX
cd ~
kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic streamXX
Check again topics list
streamXX
Run new terminal with producer
cd ~
kafka/bin/kafka-console-producer.sh --bootstrap-server broker:9092 --topic stream
Check if you can send and recive messages
cd ~
kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic streamXX
Close producer terminal.
Run stream code
%%file stream.py
import json
import random
import sys
from datetime import datetime
from time import sleep
from kafka import KafkaProducer
= "broker:9092"
KAFKA_SERVER = 'stream'
TOPIC = 2
LAG
def create_producer(server):
return KafkaProducer(
=[server],
bootstrap_servers=lambda x: json.dumps(x).encode("utf-8"),
value_serializer=(3, 7, 0),
api_version
)
if __name__ == "__main__":
= create_producer(KAFKA_SERVER)
producer try:
while True:
= {
message "time" : str(datetime.now() ) ,
"id" : random.choice(["a", "b", "c", "d", "e"]) ,
"temperatura" : random.randint(-100,100) ,
"cisnienie" : random.randint(0,50) ,
}
=message)
producer.send(TOPIC, value
sleep(LAG)except KeyboardInterrupt:
producer.close()
Overwriting stream.py
- Run in terminal
stream.py
file
python stream.py
APACHE SPARK
let’s create spark script for data analysis
%%file app.py
# spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 app.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
= "broker:9092"
SERVER = 'stream'
TOPIC
= StructType(
schema
["time", TimestampType()),
StructField("id", StringType()),
StructField("temperatura", IntegerType()),
StructField("cisnienie", IntegerType()),
StructField(
]
)
= """time Timestamp, id String, temperatura Int, cisnienie Int """ # DDL string
SCHEMA
if __name__ == "__main__":
= SparkSession.builder.getOrCreate()
spark "WARN")
spark.sparkContext.setLogLevel(
= (
raw
spark.readStreamformat("kafka")
."kafka.bootstrap.servers", SERVER)
.option("subscribe", TOPIC)
.option(
.load()
)# query = (
# raw.writeStream
# .outputMode("append")
# .format("console")
# .option("truncate", False)
# .start()
# )
= (raw.select("timestamp", from_json(decode(col("value"), "utf-8"), SCHEMA).alias("moje_dane"))
parsed "timestamp", "moje_dane.*")
.select(
)# query = (
# parsed.writeStream
# .outputMode("append")
# .format("console")
# .option("truncate", False)
# .start()
# )
# gr = parsed.agg(avg("temperatura"), avg("cisnienie"))
# query = (gr.writeStream
# .outputMode("update")
# .format("console")
# .option("truncate", False)
# .start()
# )
= (parsed.withWatermark("timestamp", "5 seconds")
gr "timestamp", "10 seconds", "7 seconds"))
.groupBy(window("temperatura"), avg("cisnienie"))
.agg(avg(
)= (gr.writeStream
query "complete")
.outputMode(format("console")
."truncate", False)
.option(
.start()
) query.awaitTermination()
Overwriting app.py