Apache Spark on RDD data

from pyspark import SparkContext

sc = SparkContext(appName="myAppName")

RDD

  • Resilient Distributed Dataset
  • Main abstraction on Spark Core
  • Two main functions:
    • Actions:
      • RDD as input - number as output.
    • Transformations:
      • lazy operation
      • RDD as input and RDD as output
  • In-Memory
  • Immutable
  • Lazy evaluated
  • Parallel
  • Partitioned

Important information

Term Definition
RDD Resilient Distributed Dataset
Transformation Spark operation that produces an RDD
Action Spark operation that produces a local object
Spark Job Sequence of transformations on data with a final action

RDD create:

Method Result
sc.parallelize(array) Create RDD of elements of array (or list)
sc.textFile(path/to/file) Create RDD of lines from file

Transformations

Transformation Example Result
filter(lambda x: x % 2 == 0) Discard non-even elements
map(lambda x: x * 2) Multiply each RDD element by 2
map(lambda x: x.split()) Split each string into words
flatMap(lambda x: x.split()) Split each string into words and flatten sequence
sample(withReplacement=True,0.25) Create sample of 25% of elements with replacement
union(rdd) Append rdd to existing RDD
distinct() Remove duplicates in RDD
sortBy(lambda x: x, ascending=False) Sort elements in descending order

Actions

Action Result
collect() Convert RDD to in-memory list
take(3) First 3 elements of RDD
top(3) Top 3 elements of RDD
takeSample(withReplacement=True,3) Create sample of 3 elements with replacement
sum() Find element sum (assumes numeric elements)
mean() Find element mean (assumes numeric elements)
stdev() Find element deviation (assumes numeric elements)
keywords = ['Books', 'DVD', 'CD', 'PenDrive'] 

key_rdd = sc.parallelize(keywords)  

key_rdd.collect() 
key_small = key_rdd.map(lambda x: x.lower()) 

key_small.collect()
sc.stop()

Map reduce

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("new").getOrCreate()

sc = spark.sparkContext
tekst = sc.textFile("MobyDick.txt")
tekst.take(5)
import re
# Word Count on RDD 
sc.textFile("MobyDick.txt")\
.map(lambda x: re.findall(r"[a-z']+", x.lower())) \
.flatMap(lambda x: [(y, 1) for y in x]).reduceByKey(lambda x,y: x + y)\
.take(5)
sc.stop()

SPARK STREAMING - OLD Version

SPARK STREAMING with discretized stream DStream (RDD sequence).

For Spark Streaming you need 2 cores.


  • StreamingContext(sparkContext, batchDuration) - represents a connection with cluster, batchDuration
  • socketTextStream(hostname, port) - Create the stream
  • flatMap(f), map(f), reduceByKey(f) - transformations for DRR
  • pprint(n)
  • StreamingContext.start() - just start stream
  • StreamingContext.awaitTermination(timeout) wait for termination
  • StreamingContext.stop(stopSparkContext, stopGraceFully) - end stream

You can generate StreamingContext from an SparkContext object.

import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread
# and batch interval of 1 second

sc = SparkContext("local[2]", "NetworkWordCount2")
ssc = StreamingContext(sc, 2)

# DStream
lines = ssc.socketTextStream("localhost", 9998)

words = lines.flatMap(lambda x: re.findall(r"[a-z']+", x.lower()))

wordCounts = words.map(lambda word: (word,1)).reduceByKey(lambda x,y: x+y)

wordCounts.pprint()

!nc -lk 9998
# before starting, run a stream data
ssc.start()             # Start the computation
ssc.awaitTermination()
ssc.stop()
sc.stop()

Stream from a socket

nc -lk 9998
%%file start_stream.py

from socket import *
import time

rdd = list()
with open("MobyDick.txt", 'r') as ad:
    for line in ad:
        rdd.append(line)

HOST = 'localhost'
PORT = 9998
ADDR = (HOST, PORT)
tcpSock = socket(AF_INET, SOCK_STREAM)
tcpSock.bind(ADDR)
tcpSock.listen(5)


while True:
    c, addr = tcpSock.accept()
    print('got connection')
    for line in rdd:
        try:
            c.send(line.encode())
            time.sleep(1)
        except:
            break
    c.close()
    print('disconnected')
%%file streamWordCount.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

if __name__ == "__main__":
    spark = SparkSession.builder.appName("Stream_DF").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    
    lines = (spark
         .readStream
         .format("socket")
         .option("host", "localhost")
         .option("port", 9998)
         .load())

    words = lines.select(explode(split(lines.value, " ")).alias("word"))
    word_counts = words.groupBy("word").count()

    streamingQuery = (word_counts
         .writeStream
         .format("console")
         .outputMode("complete")
         .trigger(processingTime="5 second")
         .start())

    streamingQuery.awaitTermination()
         
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

batch_counter = {"count": 0}

def process_batch(df, batch_id):
    batch_counter["count"] += 1
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)


spark = SparkSession.builder.appName("Stream_DF").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
    
lines = (spark
         .readStream
         .format("socket")
         .option("host", "localhost")
         .option("port", 9998)
         .load())

words = lines.select(explode(split(lines.value, " ")).alias("word"))
word_counts = words.groupBy("word").count()

streamingQuery = (word_counts.writeStream
         .format("console")
         .outputMode("complete")
         .foreachBatch(process_batch) 
         .trigger(processingTime="5 second")
         .start())

Apache Kafka + Apache Spark stream

  1. Check if you have a topics in your kafka env bash cd ~ kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092
  2. create new topic streamXX
cd ~ 
kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic streamXX
  1. Check again topics list streamXX

  2. Run new terminal with producer

cd ~ 
kafka/bin/kafka-console-producer.sh --bootstrap-server broker:9092 --topic stream

Check if you can send and recive messages

cd ~ 
kafka/bin/kafka-console-consumer.sh  --bootstrap-server broker:9092 --topic streamXX 

Close producer terminal.

Run stream code

%%file stream.py

import json
import random
import sys
from datetime import datetime
from time import sleep
from kafka import KafkaProducer


KAFKA_SERVER = "broker:9092"
TOPIC = 'stream'
LAG = 2

def create_producer(server):
    return KafkaProducer(
        bootstrap_servers=[server],
        value_serializer=lambda x: json.dumps(x).encode("utf-8"),
        api_version=(3, 7, 0),
    )

if __name__ == "__main__":
    
    producer = create_producer(KAFKA_SERVER)
    try:
        while True:

            message = {
                "time" : str(datetime.now() )  ,
                "id" : random.choice(["a", "b", "c", "d", "e"])     ,
                "temperatura" : random.randint(-100,100)  ,
                "cisnienie" :  random.randint(0,50)   ,
            }
  
            producer.send(TOPIC, value=message)
            sleep(LAG)
    except KeyboardInterrupt:
        producer.close()
Overwriting stream.py
  1. Run in terminal stream.py file
python stream.py

APACHE SPARK

let’s create spark script for data analysis

%%file app.py

# spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 app.py

from pyspark.sql import SparkSession 
from pyspark.sql.functions import *
from pyspark.sql.types import *


SERVER = "broker:9092"
TOPIC = 'stream'

schema = StructType(
        [
            StructField("time", TimestampType()),
            StructField("id", StringType()),
            StructField("temperatura", IntegerType()),
            StructField("cisnienie", IntegerType()),
        ]
    )

SCHEMA = """time Timestamp, id String, temperatura Int, cisnienie Int """ # DDL string


if __name__ == "__main__":
    
    spark = SparkSession.builder.getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
     
    raw = (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", SERVER)
        .option("subscribe", TOPIC)
        .load()
    )
    # query =  (
    #     raw.writeStream
    #     .outputMode("append")
    #     .format("console")
    #     .option("truncate", False)
    #     .start()
    # )
    parsed = (raw.select("timestamp", from_json(decode(col("value"), "utf-8"), SCHEMA).alias("moje_dane"))
                .select("timestamp", "moje_dane.*")
             )
    # query =  (
    #     parsed.writeStream
    #     .outputMode("append")
    #     .format("console")
    #     .option("truncate", False)
    #     .start()
    # )
    # gr = parsed.agg(avg("temperatura"), avg("cisnienie"))
    # query =  (gr.writeStream
    #     .outputMode("update")
    #     .format("console")
    #     .option("truncate", False)
    #     .start()
    # )
    gr = (parsed.withWatermark("timestamp", "5 seconds")
    .groupBy(window("timestamp", "10 seconds", "7 seconds"))
    .agg(avg("temperatura"), avg("cisnienie"))
         )
    query =  (gr.writeStream
        .outputMode("complete")
        .format("console")
        .option("truncate", False)
        .start()
    )
    query.awaitTermination()
Overwriting app.py