Sprawdź czy serwer Kafki posiada jakieś zdefiniowane topici:
w dodatkowym oknie termianala wpisz polecenie:
cd ~
kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092
dodaj topic o nazwie grupaXj1xx
Gdzie za X
wstaw nr grupy a za xx
nr swojego serwera
cd ~
kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic grupaXj1xx
sprawdź listę tematów ponownie upewniając się, że posiadasz temat grupaXj1xx
Uruchom nowy terminal na swoim komputerze i utwórz producenta generującego dane do nowego topicu
cd ~
kafka/bin/kafka-console-producer.sh --bootstrap-server broker:9092 --topic grupaXj1xx
Aby sprawdzić czy wysyłanie wiadomości działa uruchom kolejne okno terminala i wpisz następującą komendę realizującą konsumenta:
cd ~
kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic grupaXj1xx
Zweryfikuj, że przesyłanie danych działa.
Zamknij okno producenta. Okno konsumenta zostaw otwarte - przyda się do weryfikacji automatu generującego dane.
Uruchomienie kodu wysyłającego strumień
Uzupełnij skrypt tak by generował następujące dane:
utwórz zmienną message
która będzie słownikiem zawierającym informacje pojedynczego eventu (klucz: wartość):
“time” : aktualny czas w postaci stringu datetime.now()
“id” : wybierane losowo z listy [“a”, “b”, “c”, “d”, “e”]
“value: losowa wartość z zakresu 0 do 100
%% file stream.py
import json
import random
import sys
from datetime import datetime
from time import sleep
from kafka import KafkaProducer
KAFKA_SERVER = "broker:9092"
TOPIC = ...
LAG = 2
def create_producer(server):
return KafkaProducer(
bootstrap_servers= [server],
value_serializer= lambda x: json.dumps(x).encode("utf-8" ),
api_version= (3 , 7 , 0 ),
)
if __name__ == "__main__" :
producer = create_producer(KAFKA_SERVER)
try :
while True :
### TWOJ KOD
message = {}
###
producer.send(TOPIC, value= message)
sleep(LAG)
except KeyboardInterrupt :
producer.close()
w terminalu jupyterlab uruchom plik stream.py
sprawdz w oknie consumenta czy wysyłane wiadomości przychodzą do Kafki.
Za uruchomienie importu kafka odpowiedzialna jest biblioteka kafka-python
którą możesz zainstalować poleceniem pip install kafka-python
APACHE SPARK
Przygotuj kod skryptu który pobierze informacje z przesyłanego strumienia danych.
%% file app.py
## LOAD SPARK SESSION object
SERVER = "broker:9092"
TOPIC = ...
if __name__ == "__main__" :
## create spark variable
#YOUR CODE HERE
##
spark.sparkContext.setLogLevel("WARN" )
raw = (
spark.readStream
.format ("kafka" )
.option("kafka.bootstrap.servers" , SERVER)
.option("subscribe" , TOPIC)
.load()
)
query = (
raw.writeStream
.outputMode("append" )
.format ("console" )
.start()
)
query.awaitTermination()
query.stop()
uruchom pierwsze przetwarzanie strumienia:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 app.py
Zmodyfikuj pragram app.py
dodając schemat:
json_schema = StructType(
[
StructField("time" , TimestampType()),
StructField("id" , StringType()),
StructField("value" , IntegerType()),
]
)
Możesz również wykorzystać schemat ddl.
Wczytaj zawartość zmiennej value wysyłanej z Kafki: nie zapomnij wczytać biblioteki pyspark.sql.functions
parsed = raw.select(
"timestamp" , f.from_json(raw.value.cast("string" ), json_schema).alias("json" )
).select(
f.col("timestamp" ).alias("proc_time" ),
f.col("json" ).getField("time" ).alias("event_time" ),
f.col("json" ).getField("id" ).alias("id" ),
f.col("json" ).getField("value" ).alias("value" ),
)
lub wykorzystując funkcję dekodowania
parsed = raw.select("timestamp" , f.from_json(f.decode(f.col("value" ), "utf-8" ), schema).alias("values" )
).select("timestamp" , "values.*" )
W wielu przykładach można znaleźć
parsed = raw.selectExpr("cast(value as string) as value" )
Pomimo, iż kod ten będzie wyświetlał nam wiersz naszych danych to jest on traktowany jako string i nie będzie łatwo przetwarzać taki napis.
uruchom kod sprawdzając czy widzisz przychodzące eventy.
zlicz ilość eventów ze względu na grupę ID
uważaj na
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;
Użyj “complete” albo “update”
w tej prostej wersji możesz zmienić czas wykonywania obliczeń parametrem
writeStream.trigger(processingTime= '5 seconds' )
Przetwarzanie w oknach czasowych
Aby wygenerować obliczanie grupowania w oknie typu tumblink window (jedno po drugim) użyj funkcji window
grouped = parsed.groupBy(f.window("timestamp" , "5 seconds" ),"id" ).count()
korzystając z funkcji window
możesz wskazać zmienną czasową oraz zrealizować okno typu sliding o długości np 10 sekund z uruchomieniem następnego okna po 5 sekundach.
grouped = parsed.groupBy(f.window("timestamp" , "10 seconds" , "5 seconds" )).count()
Pamiętaj o sprawdzeniu opcji “complete” i “update”