import re
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sc.textFile("RDD_input")\
.map(lambda x: re.findall(r"[a-z']+", x.lower()))\
.flatMap(lambda x: [(y, 1) for y in x])\
.reduceByKey(lambda x, y: x+y)\
.collect()
In linux You can use netcat
nc -lk 9999
The better way is write a short python script
from socket import *
import time
rdd = list()
# example how to create list of events from text file
with open("RDD_input", 'r') as ad:
for line in ad:
rdd.append(line)
HOST = 'localhost'
PORT = 9998
ADDR = (HOST, PORT)
tcpSock = socket(AF_INET, SOCK_STREAM)
tcpSock.bind(ADDR)
tcpSock.listen(5)
# main loop
while True:
c, addr = tcpSock.accept()
print("got connection")
for line in rdd:
try:
c.send(line.encode())
time.sleep(1)
except:
break
c.close()
print("disconnected")
import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
For stream with DStrem object You need minimum 2 core ```python
sc = SparkContext(“local[2]”, “NetworkWordCount”) ssc = StreamingContext(sc, 5) # window with 5 second
For Data Streaming You need 3 steps:
### 1. Take source
For socket source:
```python
lines = ssc.socketTextStream("localhost", 9999)
This is example a stateless transformation - the same for each window. A stateless architecture or application is a type of Internet protocol where the state of the previous transactions is neither stored nor referenced in subsequent transactions. You can connect it with an ETL process.
words = lines.flatMap(lambda x: re.findall(r"[a-z']+", x.lower()))
wordCounts = words.map(lambda word: (word,1)).reduceByKey(lambda x,y: x+y)
wordCounts.pprint()
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
ssc.stop(True,True)
Go to terminal and:
wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.12-3.0.0.tgz
tar -xzf kafka_2.12-3.0.0.tgz
cd kafka_2.12-3.0.0
Open new terminal 1 - zookeeper server run
bin/zookeeper-server-start.sh config/zookeeper.properties
Open new terminal 2 - start kafka broker
bin/kafka-server-start.sh config/server.properties
Open new terminal 3
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server 127.0.0.1:9092 --partitions 3 --replication-factor 1
You can run it on two or more terminals
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092