from pyspark import SparkContext
= SparkContext(appName="myAppName") sc
Apache Spark z plikami RDD
RDD
- Resilient Distributed Dataset
- Podstawowa abstrakcja oraz rdzeń Sparka
- Obsługiwane przez dwa rodzaje operacji:
- Akcje:
- operacje uruchamiające egzekucję transformacji na RDD
- przyjmują RDD jako input i zwracają wynik NIE będący RDD
- Transformacje:
- leniwe operacje
- przyjmują RDD i zwracają RDD
- Akcje:
- In-Memory - dane RDD przechowywane w pamięci
- Immutable
- Lazy evaluated
- Parallel - przetwarzane równolegle
- Partitioned - rozproszone
WAŻNE informacje !
Ważne do zrozumienia działania SPARKA:
Term | Definition |
---|---|
RDD | Resilient Distributed Dataset |
Transformation | Spark operation that produces an RDD |
Action | Spark operation that produces a local object |
Spark Job | Sequence of transformations on data with a final action |
Dwie podstawowe metody tworzenia RDD:
Method | Result |
---|---|
sc.parallelize(array) |
Create RDD of elements of array (or list) |
sc.textFile(path/to/file) |
Create RDD of lines from file |
Podstawowe transformacje
Transformation Example | Result |
---|---|
filter(lambda x: x % 2 == 0) |
Discard non-even elements |
map(lambda x: x * 2) |
Multiply each RDD element by 2 |
map(lambda x: x.split()) |
Split each string into words |
flatMap(lambda x: x.split()) |
Split each string into words and flatten sequence |
sample(withReplacement=True,0.25) |
Create sample of 25% of elements with replacement |
union(rdd) |
Append rdd to existing RDD |
distinct() |
Remove duplicates in RDD |
sortBy(lambda x: x, ascending=False) |
Sort elements in descending order |
Podstawowe akcje
Action | Result |
---|---|
collect() |
Convert RDD to in-memory list |
take(3) |
First 3 elements of RDD |
top(3) |
Top 3 elements of RDD |
takeSample(withReplacement=True,3) |
Create sample of 3 elements with replacement |
sum() |
Find element sum (assumes numeric elements) |
mean() |
Find element mean (assumes numeric elements) |
stdev() |
Find element deviation (assumes numeric elements) |
= ['Books', 'DVD', 'CD', 'PenDrive'] # nasze dane
keywords
= sc.parallelize(keywords) # metoda parallelize - "wczyta dane"
key_rdd
# akcja wyświetlania key_rdd.collect()
= key_rdd.map(lambda x: x.lower()) # transformacja
key_small
# akcja key_small.collect()
sc.stop()
Map reduce
from pyspark.sql import SparkSession
= SparkSession.builder.appName("new").getOrCreate()
spark
# otrzymanie obiektu SparkContext
= spark.sparkContext sc
= sc.textFile("MobyDick.txt")
tekst 5) tekst.take(
import re
# Word Count on RDD
"MobyDick.txt")\
sc.textFile(map(lambda x: re.findall(r"[a-z']+", x.lower())) \
.lambda x: [(y, 1) for y in x]).reduceByKey(lambda x,y: x + y)\
.flatMap(5) .take(
sc.stop()
SPARK STREAMING
Część Sparka odpowiedzialna za przetwarzanie danych w czasie rzeczywistym.
Dane mogą pochodzić z różnych źródeł np. sokety TCP, Kafka, etc. Korzystając z poznanych już metod map, reduce, join, oraz window
można w łatwy sposób generować przetwarzanie strumienia tak jaby był to nieskończony ciąg RDD. Ponadto nie ma problemu aby wywołać na strumieniu operacje ML czy wykresy.
Cała procedura przedstawia się następująco:
SPARK STREAMING w tej wersji wprowadza abstrakcje zwaną discretized stream
DStream (reprezentuje sekwencję RDD).
Operacje na DStream można wykonywać w API JAVA, SCALA, Python, R (nie wszystkie możliwości są dostępne dla Pythona).
Spark Streaming potrzebuje minium 2 rdzenie.
- StreamingContext(sparkContext, batchDuration) - reprezentuje połączenie z klastrem i służy do tworzenia DStreamów,
batchDuration
wskazuje na granularność batch’y (w sekundach) - socketTextStream(hostname, port) - tworzy DStream na podstawie danych napływających ze wskazanego źródła TCP
- flatMap(f), map(f), reduceByKey(f) - działają analogicznie jak w przypadku RDD z tym że tworzą nowe DStream’y
- pprint(n) - printuje pierwsze
n
(domyślnie 10) elementów z każdego RDD wygenerowanego w DStream’ie - StreamingContext.start() - rozpoczyna działania na strumieniach
- StreamingContext.awaitTermination(timeout) - oczekuje na zakończenie działań na strumieniach
- StreamingContext.stop(stopSparkContext, stopGraceFully) - kończy działania na strumieniach
Obiekt StreamingContext można wygenerować za pomocą obiektu SparkContext.
import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with two working thread
# and batch interval of 1 second
= SparkContext("local[2]", "NetworkWordCount2")
sc = StreamingContext(sc, 2)
ssc
# DStream
= ssc.socketTextStream("localhost", 9998)
lines
# podziel każdą linię na wyrazy
# DStream jest mapowany na kolejny DStream
# words = lines.flatMap(lambda line: line.split(" "))
= lines.flatMap(lambda x: re.findall(r"[a-z']+", x.lower()))
words
# zliczmy każdy wyraz w każdym batchu
# DStream jest mapowany na kolejny DStream
# pairs = words.map(lambda word: (word, 1))
# DStream jest mapowany na kolejny DStream
# wordCounts = pairs.reduceByKey(lambda x, y: x + y)
= words.map(lambda word: (word,1)).reduceByKey(lambda x,y: x+y)
wordCounts # wydrukuj pierwszy element
wordCounts.pprint()
# w konsoli linuxowej netcat Nmap for windows
!nc -lk 9998
# before starting, run a stream data
# Start the computation
ssc.start()
ssc.awaitTermination()
ssc.stop() sc.stop()
przesylanie strumienia przez socket
nc -lk 9998
%%file start_stream.py
from socket import *
import time
= list()
rdd with open("MobyDick.txt", 'r') as ad:
for line in ad:
rdd.append(line)
= 'localhost'
HOST = 9998
PORT = (HOST, PORT)
ADDR = socket(AF_INET, SOCK_STREAM)
tcpSock
tcpSock.bind(ADDR)5)
tcpSock.listen(
while True:
= tcpSock.accept()
c, addr print('got connection')
for line in rdd:
try:
c.send(line.encode())1)
time.sleep(except:
break
c.close()print('disconnected')
%%file streamWordCount.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
if __name__ == "__main__":
= SparkSession.builder.appName("Stream_DF").getOrCreate()
spark "ERROR")
spark.sparkContext.setLogLevel(
= (spark
lines
.readStreamformat("socket")
."host", "localhost")
.option("port", 9998)
.option(
.load())
= lines.select(explode(split(lines.value, " ")).alias("word"))
words = words.groupBy("word").count()
word_counts
= (word_counts
streamingQuery
.writeStreamformat("console")
."complete")
.outputMode(="5 second")
.trigger(processingTime
.start())
streamingQuery.awaitTermination()
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
= {"count": 0}
batch_counter
def process_batch(df, batch_id):
"count"] += 1
batch_counter[print(f"Batch ID: {batch_id}")
=False)
df.show(truncate
= SparkSession.builder.appName("Stream_DF").getOrCreate()
spark "ERROR")
spark.sparkContext.setLogLevel(
= (spark
lines
.readStreamformat("socket")
."host", "localhost")
.option("port", 9998)
.option(
.load())
= lines.select(explode(split(lines.value, " ")).alias("word"))
words = words.groupBy("word").count()
word_counts
= (word_counts.writeStream
streamingQuery format("console")
."complete")
.outputMode(
.foreachBatch(process_batch) ="5 second")
.trigger(processingTime .start())
Przetwarzanie danych strumieniowych
- Sprawdź czy serwer Kafki posiada jakieś zdefiniowane topici:
cd ~
kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092
- dodaj topic o nazwie
streamXX
Gdzie zaX
wstaw nr grupy a zaxx
nr swojego serwera
cd ~
kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic streamXX
sprawdź listę tematów ponownie upewniając się, że posiadasz temat
streamXX
Uruchom nowy terminal na swoim komputerze i utwórz producenta generującego dane do nowego topicu
cd ~
kafka/bin/kafka-console-producer.sh --bootstrap-server broker:9092 --topic stream
Aby sprawdzić czy wysyłanie wiadomości działa uruchom kolejne okno terminala i wpisz następującą komendę realizującą konsumenta:
cd ~
kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic streamXX
Zweryfikuj, że przesyłanie danych działa.
Zamknij okno producenta. Okno konsumenta zostaw otwarte - przyda się do weryfikacji automatu generującego dane.
Uruchomienie kodu wysyłającego strumień
Uzupełnij skrypt tak by generował następujące dane:
- utwórz zmienną
message
która będzie słownikiem zawierającym informacje pojedynczego eventu (klucz: wartość):- “time” : aktualny czas w postaci stringu datetime.now()
- “id” : wybierane losowo z listy [“a”, “b”, “c”, “d”, “e”]
- “value: losowa wartość z zakresu 0 do 100
%%file stream.py
import json
import random
import sys
from datetime import datetime
from time import sleep
from kafka import KafkaProducer
= "broker:9092"
KAFKA_SERVER = 'stream'
TOPIC = 2
LAG
def create_producer(server):
return KafkaProducer(
=[server],
bootstrap_servers=lambda x: json.dumps(x).encode("utf-8"),
value_serializer=(3, 7, 0),
api_version
)
if __name__ == "__main__":
= create_producer(KAFKA_SERVER)
producer try:
while True:
= {
message "time" : str(datetime.now() ) ,
"id" : random.choice(["a", "b", "c", "d", "e"]) ,
"temperatura" : random.randint(-100,100) ,
"cisnienie" : random.randint(0,50) ,
}
=message)
producer.send(TOPIC, value
sleep(LAG)except KeyboardInterrupt:
producer.close()
Overwriting stream.py
- w terminalu jupyterlab uruchom plik
stream.py
python stream.py
sprawdz w oknie consumenta czy wysyłane wiadomości przychodzą do Kafki.
Za uruchomienie importu kafka odpowiedzialna jest biblioteka kafka-python
którą możesz zainstalować poleceniem pip install kafka-python
APACHE SPARK
Przygotuj kod skryptu który pobierze informacje z przesyłanego strumienia danych.
%%file app.py
# spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 app.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
= "broker:9092"
SERVER = 'stream'
TOPIC
= StructType(
schema
["time", TimestampType()),
StructField("id", StringType()),
StructField("temperatura", IntegerType()),
StructField("cisnienie", IntegerType()),
StructField(
]
)
= """time Timestamp, id String, temperatura Int, cisnienie Int """ # DDL string
SCHEMA
if __name__ == "__main__":
= SparkSession.builder.getOrCreate()
spark "WARN")
spark.sparkContext.setLogLevel(
= (
raw
spark.readStreamformat("kafka")
."kafka.bootstrap.servers", SERVER)
.option("subscribe", TOPIC)
.option(
.load()
)# query = (
# raw.writeStream
# .outputMode("append")
# .format("console")
# .option("truncate", False)
# .start()
# )
= (raw.select("timestamp", from_json(decode(col("value"), "utf-8"), SCHEMA).alias("moje_dane"))
parsed "timestamp", "moje_dane.*")
.select(
)# query = (
# parsed.writeStream
# .outputMode("append")
# .format("console")
# .option("truncate", False)
# .start()
# )
# gr = parsed.agg(avg("temperatura"), avg("cisnienie"))
# query = (gr.writeStream
# .outputMode("update")
# .format("console")
# .option("truncate", False)
# .start()
# )
= (parsed.withWatermark("timestamp", "5 seconds")
gr "timestamp", "10 seconds", "7 seconds"))
.groupBy(window("temperatura"), avg("cisnienie"))
.agg(avg(
)= (gr.writeStream
query "complete")
.outputMode(format("console")
."truncate", False)
.option(
.start()
) query.awaitTermination()
Overwriting app.py