Prosty licznik wyrazów wygenerowany na podstawie pliku tekstowego.
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.appName("new")\
.getOrCreate()
# otrzymanie obiektu SparkContext
sc = spark.sparkContext
import re
# Word Count on RDD
sc.textFile("RDD_input") \
.map(lambda x: re.findall(r"[a-z']+", x.lower())) \
.flatMap(lambda x: [(y, 1) for y in x]) \
.reduceByKey(lambda x, y: x + y) \
.collect()
Część Sparka odpowiedzialna za przetwarzanie danych w czasie rzeczywistym.
Dane mogą pochodzić z różnych źródeł np. sokety TCP, Kafka, etc.
Korzystając z poznanych już metod map, reduce, join, oraz window
można w łatwy sposób generować przetwarzanie strumienia tak jakby był to nieskończony ciąg RDD.
Ponadto nie ma problemu aby wywołać na strumieniu operacje ML czy wykresy.
Cała procedura przedstawia się następująco:
SPARK STREAMING w tej wersji wprowadza abstrakcje zwaną discretized stream
DStream (reprezentuje sekwencję RDD).
Operacje na DStream można wykonywać w API JAVA, SCALA, Python, R (nie wszystkie możliwości są dostępne dla Pythona).
Spark Streaming potrzebuje minimum 2 rdzenie.
batchDuration
wskazuje na granularność batch’y (w sekundach)n
(domyślnie 10) elementów z każdego RDD wygenerowanego w DStream’ieObiekt StreamingContext można wygenerować za pomocą obiektu SparkContext.
import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
# DStream
lines = ssc.socketTextStream("localhost", 9999)
# podziel każdą linię na wyrazy
# DStream jest mapowany na kolejny DStream
words = lines.flatMap(lambda x: re.findall(r"[a-z']+", x.lower()))
# zliczmy każdy wyraz w każdym batchu
wordCounts = words.map(lambda word: (word,1)).reduceByKey(lambda x,y: x+y)
# wydrukuj pierwszy element
wordCounts.pprint()
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
ssc.stop(True,True)
Po uruchomieniu powyższego kodu należy wygenerować tekst źródłowy nadawany na porcie 9999. Można posłużyć się w tym celu narzędziem netcat (na windowsie Nmap) W konsoli bash wykonaj:
nc -lk 9999
Python to język programowania używany do zastosowań ogólnych. Dlatego mamy możliwość napisania swojego kodu realizującego wypisywanie komunikatów tekstowych na porcie 9999.
Przykładowy kod może wyglądać tak:
from socket import *
import time
rdd = list()
with open("RDD_input", 'r') as ad:
for line in ad:
rdd.append(line)
HOST = 'localhost'
PORT = 9999
ADDR = (HOST, PORT)
tcpSock = socket(AF_INET, SOCK_STREAM)
tcpSock.bind(ADDR)
tcpSock.listen(5)
while True:
c, addr = tcpSock.accept()
print('got connection')
for line in rdd:
try:
c.send(line.encode())
time.sleep(1)
except:
break
c.close()
print('disconnected')
W celach ćwiczeniowych i testowania przetwarzania strumieniowego warto wykorzystać Obiekt Kolejki wygenerowany z obiektów RDD. Każdy RDD wepchany (pushed) do kolejki traktowany jest jako batch w DStream i przetwarzany jest jako strumień.
import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount3")
ssc = StreamingContext(sc, 2)
rddQueue = []
for i in range(10):
rddQueue += [sc.parallelize(
[j for j in range(1, 1001)], 10)]
inputStream = ssc.queueStream(rddQueue)
mappedStream = inputStream.map(lambda x: (x % 10, 1))
reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)
reducedStream.pprint()
import time
ssc.start()
time.sleep(10)
ssc.stop(stopSparkContext=True, stopGraceFully=True)
Operacja updateStateByKey
pozwala łączyć ze sobą wyniki otrzymywane na poszczególbych DStreamach. Dzięki tej operacji możesz w sposób ciągły uzupełniać informacje !
Aby Spark Streaming mógł łączyć dane z wielu batchy (stateful transformations) konieczne jest wskazanie lokalizacji gdzie zapisywane będą checkpointy.
updateFunc
def updateFunc(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount)
import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "NetworkWC3")
ssc = StreamingContext(sc, 5)
ssc.checkpoint("tmp")
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda x: re.findall(r"[a-z']+", x.lower()))
pairs = words.map(lambda word: (word, 1))
runningCounts = pairs.updateStateByKey(updateFunc)
runningCounts.pprint()
ssc.start()
ssc.awaitTermination()
ssc.stop(True,True)
Zadanie - Korzystając z danych kolejki rddQueue dodaj wszystkie elementy do siebie
import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "NetworkWC4")
ssc = StreamingContext(sc, 2)
ssc.checkpoint("tmp")
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda x: re.findall(r"[a-z']+", x.lower()))
pairs = words.map(lambda word: (word,1))
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
windowedWordCounts.pprint()
ssc.start()
ssc.awaitTermination()
ssc.stop(True,True)