Lab 4: Spark Structured Streaming + Kafka

Most z Lab 3

W Lab 3 strumieniowałeś z rate (generator wbudowany) i z pliku JSON.
Dziś to samo — ale źródłem jest Kafka. Zmiana to dosłownie jedna linia:

# Lab 3 — rate:
df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Lab 3 — plik:
df = spark.readStream.schema(tx_schema).json("data/stream")

# Lab 4 — Kafka:
df = spark.readStream.format("kafka").option("subscribe", "transactions").load()

Różnica: Kafka zwraca dane jako surowe bajty — trzeba je zdekodować.
Okna i watermarki z Lab 3 działają bez zmian.

Plan

  1. Uruchom producenta
  2. Podłącz Sparka do Kafki (spark-submit vs notebook)
  3. Obejrzyj co naprawdę przychodzi z Kafki — surowy schemat
  4. Krok po kroku: bajty → string → JSON → tabela
  5. Okna tumbling + watermark
  6. Zapis alertów do tematu alerts

Część 0: Przygotowanie

W terminalu JupyterLab:

~/kafka/bin/kafka-topics.sh --create --topic transactions \
  --bootstrap-server broker:9092
~/kafka/bin/kafka-topics.sh --create --topic alerts \
  --bootstrap-server broker:9092

# Uruchom producenta z Lab 1
python producer.py

Zostaw producenta działającego przez cały lab.

Część 1: Podłączenie Sparka do Kafki

Dlaczego potrzebujemy dodatkowej paczki?

Spark nie ma wbudowanej obsługi Kafki — trzeba dołączyć connector jako zewnętrzną zależność Maven.
Nazwa paczki różni się zależnie od wersji Sparka i Scali:

Środowisko Paczka
Spark 4.0 (Scala 2.13) org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2
Spark 3.5 (Scala 2.12) org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0

Możemy ją przekazać na dwa sposoby:

Sposób A — spark-submit (skrypt .py poza notebookiem)

# Spark 4.0
spark-submit \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2 \
  moj_skrypt.py

# Spark 3.5
spark-submit \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
  moj_skrypt.py

Spark pobierze paczkę z Maven Central (raz, potem cache lokalny).
Przy pierwszym uruchomieniu pojawi się dużo logów — to normalne.

Sposób B — z notebooka przez os.environ

Ustawiamy zmienną środowiskową przed pierwszym importem PySpark.
Poniższy kod automatycznie wykrywa wersję i dobiera właściwą paczkę.

import os
import pyspark

# Autodetekcja wersji Sparka → właściwy connector
spark_version = pyspark.__version__
print(f"Wykryta wersja PySpark: {spark_version}")

if spark_version.startswith("4"):
    KAFKA_PACKAGE = "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2"
else:
    # Spark 3.x
    KAFKA_PACKAGE = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0"

print(f"Użyty connector:        {KAFKA_PACKAGE}")

# Musi być ustawione PRZED SparkSession.builder
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages {KAFKA_PACKAGE} pyspark-shell'
Wykryta wersja PySpark: 4.0.0.dev2
Użyty connector:        org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("Lab4-Kafka")
    .config("spark.jars.packages", KAFKA_PACKAGE)
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
print(f"Spark {spark.version} — gotowy")

Część 2: Co naprawdę przychodzi z Kafki?

Odczytajmy strumień i zobaczmy jego surowy schemat — zanim cokolwiek przetworzymy.

kafka_raw = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("subscribe", "transactions")
    .load()
)

kafka_raw.printSchema()

Wynik printSchema() — co oznacza każda kolumna?

root
 |-- key:           binary   ← klucz wiadomości (opcjonalny, u nas null)
 |-- value:         binary   ← TREŚĆ wiadomości — tu jest nasz JSON (jako bajty)
 |-- topic:         string   ← nazwa tematu ("transactions")
 |-- partition:     integer  ← numer partycji (0, 1 lub 2 — mamy 3 partycje)
 |-- offset:        long     ← pozycja w partycji (0, 1, 2, … rośnie monotonicznie)
 |-- timestamp:     timestamp← kiedy Kafka przyjęła wiadomość
 |-- timestampType: integer  ← 0 = CreateTime, 1 = LogAppendTime

Kluczowa obserwacja: kolumna value to binary — Kafka przechowuje bajty i nic nie wie o formacie.
Wszystkie inne kolumny to metadane Kafki, nie dane biznesowe.

Zobaczmy jak wygląda surowa wiadomość przed jakimkolwiek parsowaniem:

%%file kafka_raw.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2

spark = (
    SparkSession.builder
    .appName("Lab4-Kafka")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

kafka_raw = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("subscribe", "transactions")
    .load()
)

q = (
    kafka_raw.writeStream
    .format("console") 
    .outputMode("append") 
    .option("truncate", False)
    .start()
)

q.awaitTermination()

Część 3: Krok po kroku — bajty → tabela

Mamy trzy etapy dekodowania:

value (binary)                          
  → cast("string")                      → "{\"tx_id\": \"TX1234\", \"amount\": 312.32, ...}"
  → from_json(col, schema)              → struct<tx_id: string, amount: double, ...>
  → select("tx.*")                      → tx_id | user_id | amount | store | ...

Krok 1 — value jako string

Sprawdzamy czy bajty poprawnie dekodują się do tekstu JSON:

%%file kafka_text.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2

spark = (
    SparkSession.builder
    .appName("Lab4-Kafka")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

kafka_raw = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("subscribe", "transactions")
    .load()
)

df = kafka_raw.select(
    col("value").cast("string")
)

q = (
    df.writeStream
    .format("console") 
    .outputMode("append") 
    .option("truncate", False)
    .start()
)

q.awaitTermination()

Krok 2 — from_json(): string → struct

from_json() parsuje tekst JSON na kolumnę typu struct — używając schematu który podajemy.
Schemat musi zgadzać się z polami z producenta — nieznanego pola Spark zignoruje, brakującego doda jako null.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import from_json, col

spark = (
    SparkSession.builder
    .appName("Lab4-Kafka")
    .config("spark.jars.packages", KAFKA_PACKAGE)
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")

def process_batch(df, batch_id, tstop=5):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id == tstop:
        df.stop()

kafka_raw = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("subscribe", "transactions")
    .load()
)


tx_schema = StructType([
    StructField("tx_id",     StringType()),
    StructField("user_id",   StringType()),
    StructField("amount",    DoubleType()),
    StructField("store",     StringType()),
    StructField("category",  StringType()),
    StructField("timestamp", StringType()),
])

# Krok 2: string → struct (jedna kolumna 'tx' zawierająca wszystkie pola)
step2 = kafka_raw.select(
    from_json(col("value").cast("string"), tx_schema).alias("tx")
)

query = (step2.writeStream 
    .format("console") 
    .outputMode("append")
    .foreachBatch(process_batch)
    .option("truncate", False) 
    .start()
)
Batch ID: 0
+---+
|tx |
+---+
+---+

Batch ID: 1
+------------------------------------------------------------------------+
|tx                                                                      |
+------------------------------------------------------------------------+
|{TX8530, u10, 344.96, Warszawa, elektronika, 2026-04-26T22:57:52.652119}|
+------------------------------------------------------------------------+

Batch ID: 2
+------------------------------------------------------------------+
|tx                                                                |
+------------------------------------------------------------------+
|{TX1014, u05, 602.91, Wrocław, odzież, 2026-04-26T22:57:53.652892}|
+------------------------------------------------------------------+

Batch ID: 3
+---------------------------------------------------------------------+
|tx                                                                   |
+---------------------------------------------------------------------+
|{TX7591, u14, 4656.58, Warszawa, książki, 2026-04-26T22:57:54.653632}|
+---------------------------------------------------------------------+

Batch ID: 4
+--------------------------------------------------------------------+
|tx                                                                  |
+--------------------------------------------------------------------+
|{TX6785, u20, 3628.64, Warszawa, odzież, 2026-04-26T22:57:55.654831}|
+--------------------------------------------------------------------+

Batch ID: 5
+--------------------------------------------------------------------+
|tx                                                                  |
+--------------------------------------------------------------------+
|{TX8307, u07, 4900.89, Wrocław, książki, 2026-04-26T22:57:56.657993}|
+--------------------------------------------------------------------+

Krok 3 — select("tx.*"): struct → płaskie kolumny

select("tx.*") odpakowuje struct — każde pole staje się osobną kolumną.
Dodajemy jeszcze konwersję timestamp z tekstu na typ TimestampType (potrzebne do okien).

from pyspark.sql.functions import to_timestamp

# Krok 3: struct → płaskie kolumny + konwersja timestamp
df = (
    kafka_raw
    .select(from_json(col("value").cast("string"), tx_schema).alias("tx"))
    .select("tx.*")                                         # struct → kolumny
    .withColumn("timestamp", to_timestamp("timestamp"))
)

print("Finalny schemat:")
df.printSchema()
Finalny schemat:
root
 |-- tx_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- store: string (nullable = true)
 |-- category: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

Część 4: Okna tumbling + watermark

Od tego momentu kod jest identyczny jak w Lab 2 (batch) i Lab 3 (rate source).
Źródło się zmieniło — logika okien nie.

Watermark — obsługa spóźnionych zdarzeń

Watermark mówi Sparkowi: „poczekam maksymalnie X czasu na spóźnione dane, potem zamykam okno”.

Mode Kiedy emituje Kiedy użyć
append dopiero gdy okno jest zamknięte produkcja, zapis do Kafki/bazy
complete po każdym batchu — całą tabelę debugowanie

Zadanie 4.1 — Okna 1-minutowe per sklep

from pyspark.sql.functions import window, count, sum as _sum, round as _round

windowed = (
    df
    .withWatermark("timestamp", "3 seconds")
    .groupBy(window("timestamp", "20 seconds"), "store")
    .agg(
        count("tx_id").alias("liczba_tx"),
        _round(_sum("amount"), 2).alias("suma_PLN"),
    )
)

Zadanie 4.2 — Okna per kategoria, tryb complete

Pogrupuj po kategorii zamiast sklepu i użyj outputMode("complete").
Obserwuj różnicę: wyniki pojawiają się po każdym batchu, nie dopiero po zamknięciu okna.

# TWÓJ KOD
# windowed_cat = df.withWatermark(...).groupBy(window(...), "category").agg(...)
# outputMode("complete")

Część 5: Zapis alertów do Kafki

Transakcje > 3000 PLN wysyłamy do tematu alerts.
Kafka oczekuje kolumny value jako string/bajty — używamy to_json() żeby zamienić struct z powrotem na JSON.

from pyspark.sql.functions import to_json, struct, lit

alerts = (
    df
    .filter(col("amount") > 3000)
    .select(
        to_json(
            struct(
                "tx_id", "user_id", "amount", "store", "category",
                col("timestamp").cast("string"),
                lit("HIGH").alias("alert_level"),
            )
        ).alias("value")    # Kafka wymaga kolumny 'value'
    )
)

alert_query = (
    alerts.writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("topic", "alerts")
    .outputMode("append")
    .start()
)
print("Strumień alertów uruchomiony. Zatrzymaj ręcznie: alert_query.stop()")

Zadanie 5.1 — Sprawdź temat alerts w terminalu

kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server broker:9092 \
  --topic alerts \
  --from-beginning

Powinieneś widzieć rekordy JSON z polem alert_level: HIGH.

Zadanie 5.2 — Pytania kontrolne

# 1. Dlaczego w trybie 'append' pierwsze wyniki pojawiają się z opóźnieniem?
#    ODPOWIEDŹ:

# 2. Co się stanie jeśli ustawisz watermark na "0 seconds"?
#    ODPOWIEDŹ:

# 3. Jaka jest różnica między window() w Lab 2 (batch) a window() tutaj (streaming)?
#    ODPOWIEDŹ:

# 4. Dlaczego do zapisu do Kafki używamy to_json(), a nie po prostu select("value")?
#    ODPOWIEDŹ:
alert_query.stop()
spark.stop()

Praca domowa

  1. Zrób okno sliding 2 minuty / krok 1 minuta na strumieniu z Kafki per sklep.
  2. Dodaj do alertów pole ratio = amount / 400.0 (przybliżona średnia z Lab 2).
  3. Co się dzieje z wynikami gdy zatrzymasz producenta i odczekasz 2 minuty? Dlaczego?

Następne zajęcia: Reguły decyzyjne i scoring na strumieniu.

Wersje kodów do wyświetlania ramek w notatniku

def process_batch(df, batch_id, tstop=5):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id == tstop:
        df.stop()

kafka_raw = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("subscribe", "transactions")
    .load()
)

query = (kafka_raw.writeStream 
    .format("console") 
    .outputMode("append")
    .foreachBatch(process_batch)
    .option("truncate", False) 
    .start()
)

2. bity na napis

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

## KAFKA_PACKAGE znajdziesz w pierwszej komorce notatnika 
spark = (
    SparkSession.builder
    .appName("Lab4-Kafka")
    .config("spark.jars.packages", KAFKA_PACKAGE)
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")

def process_batch(df, batch_id, tstop=5):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id == tstop:
        df.stop()

kafka_raw = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("subscribe", "transactions")
    .load()
)

df = kafka_raw.select(
    col("timestamp").alias("time"), 
    col("value").cast("string")
)
query = (df.writeStream 
    .format("console") 
    .outputMode("append")
    .foreachBatch(process_batch)
    .option("truncate", False) 
    .start()
)