Real-Time Analytics Lectures for SGH students

Laboratory 3 - Stream Data producing

WordCount in mapReduce example for Apache Spark with RDD

import re
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

sc.textFile("RDD_input")\
.map(lambda x: re.findall(r"[a-z']+", x.lower()))\
.flatMap(lambda x: [(y, 1) for y in x])\
.reduceByKey(lambda x, y: x+y)\
.collect()

send text data by socket

In linux You can use netcat

nc -lk 9999

A Python code for sending data by socket

The better way is write a short python script

from socket import *
import time

rdd = list()
# example how to create list of events from text file 
with open("RDD_input", 'r') as ad: 
    for line in ad:
        rdd.append(line)
        

HOST = 'localhost'
PORT = 9998
ADDR = (HOST, PORT)
tcpSock = socket(AF_INET, SOCK_STREAM)
tcpSock.bind(ADDR) 
tcpSock.listen(5)

# main loop
while True:
    c, addr = tcpSock.accept()
    print("got connection")
    for line in rdd:
        try:
            c.send(line.encode())
            time.sleep(1)
        except:
            break
    c.close()
    print("disconnected") 

Read the stream generated in Apache Spark

import re 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

For stream with DStrem object You need minimum 2 core ```python

sc = SparkContext(“local[2]”, “NetworkWordCount”) ssc = StreamingContext(sc, 5) # window with 5 second

For Data Streaming You need 3 steps: 

### 1. Take source 

For socket source:
```python
lines = ssc.socketTextStream("localhost", 9999)

2. prepare Your transformations

This is example a stateless transformation - the same for each window. A stateless architecture or application is a type of Internet protocol where the state of the previous transactions is neither stored nor referenced in subsequent transactions. You can connect it with an ETL process.

words = lines.flatMap(lambda x: re.findall(r"[a-z']+", x.lower()))
wordCounts = words.map(lambda word: (word,1)).reduceByKey(lambda x,y: x+y)

3. Print or save results

wordCounts.pprint()

Run stream listening in spark

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate
ssc.stop(True,True)

Apache Kafka Run

Go to terminal and:

1. download


wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.12-3.0.0.tgz

2. untar

tar -xzf kafka_2.12-3.0.0.tgz

3. Go to Kafka directory

cd kafka_2.12-3.0.0

Zookeeper server

Open new terminal 1 - zookeeper server run

bin/zookeeper-server-start.sh config/zookeeper.properties

Kafka server

Open new terminal 2 - start kafka broker

bin/kafka-server-start.sh config/server.properties

Create new topic

Open new terminal 3

bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server 127.0.0.1:9092 --partitions 3 --replication-factor 1

Kafka Console producer

You can run it on two or more terminals

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

Kafka Console consumer

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092