Most z Lab 3
W Lab 3 strumieniowałeś z rate (generator wbudowany) i z pliku JSON.
Dziś to samo — ale źródłem jest Kafka . Zmiana to dosłownie jedna linia:
# Lab 3 — rate:
df = spark.readStream.format ("rate" ).option("rowsPerSecond" , 1 ).load()
# Lab 3 — plik:
df = spark.readStream.schema(tx_schema).json("data/stream" )
# Lab 4 — Kafka:
df = spark.readStream.format ("kafka" ).option("subscribe" , "transactions" ).load()
Różnica: Kafka zwraca dane jako surowe bajty — trzeba je zdekodować.
Okna i watermarki z Lab 3 działają bez zmian .
Plan
Uruchom producenta
Podłącz Sparka do Kafki (spark-submit vs notebook)
Obejrzyj co naprawdę przychodzi z Kafki — surowy schemat
Krok po kroku: bajty → string → JSON → tabela
Okna tumbling + watermark
Zapis alertów do tematu alerts
Część 0: Przygotowanie
W terminalu JupyterLab:
~/kafka/bin/kafka-topics.sh --create --topic transactions \
--bootstrap-server broker:9092
~/kafka/bin/kafka-topics.sh --create --topic alerts \
--bootstrap-server broker:9092
# Uruchom producenta z Lab 1
python producer.py
Zostaw producenta działającego przez cały lab.
Część 1: Podłączenie Sparka do Kafki
Dlaczego potrzebujemy dodatkowej paczki?
Spark nie ma wbudowanej obsługi Kafki — trzeba dołączyć connector jako zewnętrzną zależność Maven.
Nazwa paczki różni się zależnie od wersji Sparka i Scali:
Spark 4.0 (Scala 2.13)
org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2
Spark 3.5 (Scala 2.12)
org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0
Możemy ją przekazać na dwa sposoby:
Sposób A — spark-submit (skrypt .py poza notebookiem)
# Spark 4.0
spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2 \
moj_skrypt.py
# Spark 3.5
spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
moj_skrypt.py
Spark pobierze paczkę z Maven Central (raz, potem cache lokalny).
Przy pierwszym uruchomieniu pojawi się dużo logów — to normalne.
Sposób B — z notebooka przez os.environ
Ustawiamy zmienną środowiskową przed pierwszym importem PySpark.
Poniższy kod automatycznie wykrywa wersję i dobiera właściwą paczkę.
import os
import pyspark
# Autodetekcja wersji Sparka → właściwy connector
spark_version = pyspark.__version__
print (f"Wykryta wersja PySpark: { spark_version} " )
if spark_version.startswith("4" ):
KAFKA_PACKAGE = "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2"
else :
# Spark 3.x
KAFKA_PACKAGE = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0"
print (f"Użyty connector: { KAFKA_PACKAGE} " )
# Musi być ustawione PRZED SparkSession.builder
os.environ['PYSPARK_SUBMIT_ARGS' ] = f'--packages { KAFKA_PACKAGE} pyspark-shell'
Wykryta wersja PySpark: 4.0.0.dev2
Użyty connector: org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("Lab4-Kafka" )
.config("spark.jars.packages" , KAFKA_PACKAGE)
.getOrCreate()
)
spark.sparkContext.setLogLevel("WARN" )
print (f"Spark { spark. version} — gotowy" )
Część 2: Co naprawdę przychodzi z Kafki?
Odczytajmy strumień i zobaczmy jego surowy schemat — zanim cokolwiek przetworzymy.
kafka_raw = (
spark.readStream
.format ("kafka" )
.option("kafka.bootstrap.servers" , "broker:9092" )
.option("subscribe" , "transactions" )
.load()
)
kafka_raw.printSchema()
Wynik printSchema() — co oznacza każda kolumna?
root
|-- key: binary ← klucz wiadomości (opcjonalny, u nas null)
|-- value: binary ← TREŚĆ wiadomości — tu jest nasz JSON (jako bajty)
|-- topic: string ← nazwa tematu ("transactions")
|-- partition: integer ← numer partycji (0, 1 lub 2 — mamy 3 partycje)
|-- offset: long ← pozycja w partycji (0, 1, 2, … rośnie monotonicznie)
|-- timestamp: timestamp← kiedy Kafka przyjęła wiadomość
|-- timestampType: integer ← 0 = CreateTime, 1 = LogAppendTime
Kluczowa obserwacja: kolumna value to binary — Kafka przechowuje bajty i nic nie wie o formacie.
Wszystkie inne kolumny to metadane Kafki , nie dane biznesowe.
Zobaczmy jak wygląda surowa wiadomość przed jakimkolwiek parsowaniem:
%% file kafka_raw.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2
spark = (
SparkSession.builder
.appName("Lab4-Kafka" )
.getOrCreate()
)
spark.sparkContext.setLogLevel("WARN" )
kafka_raw = (
spark.readStream
.format ("kafka" )
.option("kafka.bootstrap.servers" , "broker:9092" )
.option("subscribe" , "transactions" )
.load()
)
q = (
kafka_raw.writeStream
.format ("console" )
.outputMode("append" )
.option("truncate" , False )
.start()
)
q.awaitTermination()
Część 3: Krok po kroku — bajty → tabela
Mamy trzy etapy dekodowania:
value (binary)
→ cast("string") → "{\"tx_id\": \"TX1234\", \"amount\": 312.32, ...}"
→ from_json(col, schema) → struct<tx_id: string, amount: double, ...>
→ select("tx.*") → tx_id | user_id | amount | store | ...
Krok 1 — value jako string
Sprawdzamy czy bajty poprawnie dekodują się do tekstu JSON:
%% file kafka_text.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2
spark = (
SparkSession.builder
.appName("Lab4-Kafka" )
.getOrCreate()
)
spark.sparkContext.setLogLevel("WARN" )
kafka_raw = (
spark.readStream
.format ("kafka" )
.option("kafka.bootstrap.servers" , "broker:9092" )
.option("subscribe" , "transactions" )
.load()
)
df = kafka_raw.select(
col("value" ).cast("string" )
)
q = (
df.writeStream
.format ("console" )
.outputMode("append" )
.option("truncate" , False )
.start()
)
q.awaitTermination()
Krok 2 — from_json(): string → struct
from_json() parsuje tekst JSON na kolumnę typu struct — używając schematu który podajemy.
Schemat musi zgadzać się z polami z producenta — nieznanego pola Spark zignoruje, brakującego doda jako null.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import from_json, col
spark = (
SparkSession.builder
.appName("Lab4-Kafka" )
.config("spark.jars.packages" , KAFKA_PACKAGE)
.getOrCreate()
)
spark.sparkContext.setLogLevel("WARN" )
def process_batch(df, batch_id, tstop= 5 ):
print (f"Batch ID: { batch_id} " )
df.show(truncate= False )
if batch_id == tstop:
df.stop()
kafka_raw = (
spark.readStream
.format ("kafka" )
.option("kafka.bootstrap.servers" , "broker:9092" )
.option("subscribe" , "transactions" )
.load()
)
tx_schema = StructType([
StructField("tx_id" , StringType()),
StructField("user_id" , StringType()),
StructField("amount" , DoubleType()),
StructField("store" , StringType()),
StructField("category" , StringType()),
StructField("timestamp" , StringType()),
])
# Krok 2: string → struct (jedna kolumna 'tx' zawierająca wszystkie pola)
step2 = kafka_raw.select(
from_json(col("value" ).cast("string" ), tx_schema).alias("tx" )
)
query = (step2.writeStream
.format ("console" )
.outputMode("append" )
.foreachBatch(process_batch)
.option("truncate" , False )
.start()
)
Batch ID: 0
+---+
|tx |
+---+
+---+
Batch ID: 1
+------------------------------------------------------------------------+
|tx |
+------------------------------------------------------------------------+
|{TX8530, u10, 344.96, Warszawa, elektronika, 2026-04-26T22:57:52.652119}|
+------------------------------------------------------------------------+
Batch ID: 2
+------------------------------------------------------------------+
|tx |
+------------------------------------------------------------------+
|{TX1014, u05, 602.91, Wrocław, odzież, 2026-04-26T22:57:53.652892}|
+------------------------------------------------------------------+
Batch ID: 3
+---------------------------------------------------------------------+
|tx |
+---------------------------------------------------------------------+
|{TX7591, u14, 4656.58, Warszawa, książki, 2026-04-26T22:57:54.653632}|
+---------------------------------------------------------------------+
Batch ID: 4
+--------------------------------------------------------------------+
|tx |
+--------------------------------------------------------------------+
|{TX6785, u20, 3628.64, Warszawa, odzież, 2026-04-26T22:57:55.654831}|
+--------------------------------------------------------------------+
Batch ID: 5
+--------------------------------------------------------------------+
|tx |
+--------------------------------------------------------------------+
|{TX8307, u07, 4900.89, Wrocław, książki, 2026-04-26T22:57:56.657993}|
+--------------------------------------------------------------------+
Krok 3 — select("tx.*"): struct → płaskie kolumny
select("tx.*") odpakowuje struct — każde pole staje się osobną kolumną.
Dodajemy jeszcze konwersję timestamp z tekstu na typ TimestampType (potrzebne do okien).
from pyspark.sql.functions import to_timestamp
# Krok 3: struct → płaskie kolumny + konwersja timestamp
df = (
kafka_raw
.select(from_json(col("value" ).cast("string" ), tx_schema).alias("tx" ))
.select("tx.*" ) # struct → kolumny
.withColumn("timestamp" , to_timestamp("timestamp" ))
)
print ("Finalny schemat:" )
df.printSchema()
Finalny schemat:
root
|-- tx_id: string (nullable = true)
|-- user_id: string (nullable = true)
|-- amount: double (nullable = true)
|-- store: string (nullable = true)
|-- category: string (nullable = true)
|-- timestamp: timestamp (nullable = true)
Część 4: Okna tumbling + watermark
Od tego momentu kod jest identyczny jak w Lab 2 (batch) i Lab 3 (rate source).
Źródło się zmieniło — logika okien nie.
Watermark — obsługa spóźnionych zdarzeń
Watermark mówi Sparkowi: „poczekam maksymalnie X czasu na spóźnione dane, potem zamykam okno” .
append
dopiero gdy okno jest zamknięte
produkcja, zapis do Kafki/bazy
complete
po każdym batchu — całą tabelę
debugowanie
Zadanie 4.1 — Okna 1-minutowe per sklep
from pyspark.sql.functions import window, count, sum as _sum, round as _round
windowed = (
df
.withWatermark("timestamp" , "3 seconds" )
.groupBy(window("timestamp" , "20 seconds" ), "store" )
.agg(
count("tx_id" ).alias("liczba_tx" ),
_round(_sum("amount" ), 2 ).alias("suma_PLN" ),
)
)
Zadanie 4.2 — Okna per kategoria, tryb complete
Pogrupuj po kategorii zamiast sklepu i użyj outputMode("complete").
Obserwuj różnicę: wyniki pojawiają się po każdym batchu, nie dopiero po zamknięciu okna.
# TWÓJ KOD
# windowed_cat = df.withWatermark(...).groupBy(window(...), "category").agg(...)
# outputMode("complete")
Część 5: Zapis alertów do Kafki
Transakcje > 3000 PLN wysyłamy do tematu alerts.
Kafka oczekuje kolumny value jako string/bajty — używamy to_json() żeby zamienić struct z powrotem na JSON.
from pyspark.sql.functions import to_json, struct, lit
alerts = (
df
.filter (col("amount" ) > 3000 )
.select(
to_json(
struct(
"tx_id" , "user_id" , "amount" , "store" , "category" ,
col("timestamp" ).cast("string" ),
lit("HIGH" ).alias("alert_level" ),
)
).alias("value" ) # Kafka wymaga kolumny 'value'
)
)
alert_query = (
alerts.writeStream
.format ("kafka" )
.option("kafka.bootstrap.servers" , "broker:9092" )
.option("topic" , "alerts" )
.outputMode("append" )
.start()
)
print ("Strumień alertów uruchomiony. Zatrzymaj ręcznie: alert_query.stop()" )
Zadanie 5.1 — Sprawdź temat alerts w terminalu
kafka/bin/kafka-console-consumer.sh \
--bootstrap-server broker:9092 \
--topic alerts \
--from-beginning
Powinieneś widzieć rekordy JSON z polem alert_level: HIGH.
Zadanie 5.2 — Pytania kontrolne
# 1. Dlaczego w trybie 'append' pierwsze wyniki pojawiają się z opóźnieniem?
# ODPOWIEDŹ:
# 2. Co się stanie jeśli ustawisz watermark na "0 seconds"?
# ODPOWIEDŹ:
# 3. Jaka jest różnica między window() w Lab 2 (batch) a window() tutaj (streaming)?
# ODPOWIEDŹ:
# 4. Dlaczego do zapisu do Kafki używamy to_json(), a nie po prostu select("value")?
# ODPOWIEDŹ:
alert_query.stop()
spark.stop()
Praca domowa
Zrób okno sliding 2 minuty / krok 1 minuta na strumieniu z Kafki per sklep.
Dodaj do alertów pole ratio = amount / 400.0 (przybliżona średnia z Lab 2).
Co się dzieje z wynikami gdy zatrzymasz producenta i odczekasz 2 minuty? Dlaczego?
Następne zajęcia: Reguły decyzyjne i scoring na strumieniu.
Wersje kodów do wyświetlania ramek w notatniku
def process_batch(df, batch_id, tstop= 5 ):
print (f"Batch ID: { batch_id} " )
df.show(truncate= False )
if batch_id == tstop:
df.stop()
kafka_raw = (
spark.readStream
.format ("kafka" )
.option("kafka.bootstrap.servers" , "broker:9092" )
.option("subscribe" , "transactions" )
.load()
)
query = (kafka_raw.writeStream
.format ("console" )
.outputMode("append" )
.foreachBatch(process_batch)
.option("truncate" , False )
.start()
)
2. bity na napis
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
## KAFKA_PACKAGE znajdziesz w pierwszej komorce notatnika
spark = (
SparkSession.builder
.appName("Lab4-Kafka" )
.config("spark.jars.packages" , KAFKA_PACKAGE)
.getOrCreate()
)
spark.sparkContext.setLogLevel("WARN" )
def process_batch(df, batch_id, tstop= 5 ):
print (f"Batch ID: { batch_id} " )
df.show(truncate= False )
if batch_id == tstop:
df.stop()
kafka_raw = (
spark.readStream
.format ("kafka" )
.option("kafka.bootstrap.servers" , "broker:9092" )
.option("subscribe" , "transactions" )
.load()
)
df = kafka_raw.select(
col("timestamp" ).alias("time" ),
col("value" ).cast("string" )
)
query = (df.writeStream
.format ("console" )
.outputMode("append" )
.foreachBatch(process_batch)
.option("truncate" , False )
.start()
)