Przetwarzanie danych strumieniowych

  1. Sprawdź czy serwer Kafki posiada jakieś zdefiniowane topici:
    • w dodatkowym oknie termianala wpisz polecenie:
    cd ~ 
    kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092
  2. dodaj topic o nazwie grupaXj1xx Gdzie za X wstaw nr grupy a za xx nr swojego serwera
cd ~ 
kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic grupaXj1xx
  1. sprawdź listę tematów ponownie upewniając się, że posiadasz temat grupaXj1xx

  2. Uruchom nowy terminal na swoim komputerze i utwórz producenta generującego dane do nowego topicu

cd ~ 
kafka/bin/kafka-console-producer.sh --bootstrap-server broker:9092 --topic grupaXj1xx

Aby sprawdzić czy wysyłanie wiadomości działa uruchom kolejne okno terminala i wpisz następującą komendę realizującą konsumenta:

cd ~ 
kafka/bin/kafka-console-consumer.sh  --bootstrap-server broker:9092 --topic grupaXj1xx 

Zweryfikuj, że przesyłanie danych działa.

Zamknij okno producenta. Okno konsumenta zostaw otwarte - przyda się do weryfikacji automatu generującego dane.

Uruchomienie kodu wysyłającego strumień

Uzupełnij skrypt tak by generował następujące dane:

  1. utwórz zmienną message która będzie słownikiem zawierającym informacje pojedynczego eventu (klucz: wartość):
    • “time” : aktualny czas w postaci stringu datetime.now()
    • “id” : wybierane losowo z listy [“a”, “b”, “c”, “d”, “e”]
    • “value: losowa wartość z zakresu 0 do 100
%%file stream.py

import json
import random
import sys
from datetime import datetime
from time import sleep
from kafka import KafkaProducer


KAFKA_SERVER = "broker:9092"
TOPIC = ...
LAG = 2

def create_producer(server):
    return KafkaProducer(
        bootstrap_servers=[server],
        value_serializer=lambda x: json.dumps(x).encode("utf-8"),
        api_version=(3, 7, 0),
    )

if __name__ == "__main__":
    
    producer = create_producer(KAFKA_SERVER)
    try:
        while True:

        ### TWOJ KOD 
            message = {}
        ###    
            producer.send(TOPIC, value=message)
            sleep(LAG)
    except KeyboardInterrupt:
        producer.close()
Writing stream.py
  1. w terminalu jupyterlab uruchom plik stream.py
python stream.py

sprawdz w oknie consumenta czy wysyłane wiadomości przychodzą do Kafki.

Za uruchomienie importu kafka odpowiedzialna jest biblioteka kafka-python którą możesz zainstalować poleceniem pip install kafka-python

APACHE SPARK

Przygotuj kod skryptu który pobierze informacje z przesyłanego strumienia danych.

%%file app.py

## LOAD SPARK SESSION object

SERVER = "broker:9092"
TOPIC = ...

if __name__ == "__main__":
    ## create spark variable
    #YOUR CODE HERE
    
    ##  
    spark.sparkContext.setLogLevel("WARN")
     
    raw = (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", SERVER)
        .option("subscribe", TOPIC)
        .load()
    )

    query =  (
        raw.writeStream
        .outputMode("append")
        .format("console")
        .start()
    )
    
    
    query.awaitTermination()
    query.stop()
Writing app.py

uruchom pierwsze przetwarzanie strumienia:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 app.py

Zmodyfikuj pragram app.py dodając schemat:

    json_schema = StructType(
        [
            StructField("time", TimestampType()),
            StructField("id", StringType()),
            StructField("value", IntegerType()),
        ]
    )

Możesz również wykorzystać schemat ddl.

Wczytaj zawartość zmiennej value wysyłanej z Kafki: nie zapomnij wczytać biblioteki pyspark.sql.functions

    parsed = raw.select(
        "timestamp", f.from_json(raw.value.cast("string"), json_schema).alias("json")
    ).select(
        f.col("timestamp").alias("proc_time"),
        f.col("json").getField("time").alias("event_time"),
        f.col("json").getField("id").alias("id"),
        f.col("json").getField("value").alias("value"),
    )

lub wykorzystując funkcję dekodowania

parsed = raw.select("timestamp", f.from_json(f.decode(f.col("value"), "utf-8"), schema).alias("values")
                   ).select("timestamp", "values.*")

W wielu przykładach można znaleźć

parsed = raw.selectExpr("cast(value as string) as value") 

Pomimo, iż kod ten będzie wyświetlał nam wiersz naszych danych to jest on traktowany jako string i nie będzie łatwo przetwarzać taki napis.

uruchom kod sprawdzając czy widzisz przychodzące eventy.

zlicz ilość eventów ze względu na grupę ID

uważaj na

Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;

Użyj “complete” albo “update”

w tej prostej wersji możesz zmienić czas wykonywania obliczeń parametrem

writeStream.trigger(processingTime='5 seconds')

Przetwarzanie w oknach czasowych

Aby wygenerować obliczanie grupowania w oknie typu tumblink window (jedno po drugim) użyj funkcji window

grouped = parsed.groupBy(f.window("timestamp", "5 seconds"),"id").count()

korzystając z funkcji window możesz wskazać zmienną czasową oraz zrealizować okno typu sliding o długości np 10 sekund z uruchomieniem następnego okna po 5 sekundach.

grouped = parsed.groupBy(f.window("timestamp", "10 seconds", "5 seconds")).count()

Pamiętaj o sprawdzeniu opcji “complete” i “update”