from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("Lab2-Transactions")
.getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
print(f"Spark {spark.version} — gotowy")Lab 2: Apache Spark — analiza danych transakcyjnych
Dlaczego Spark?
W Lab 1 napisałeś konsumenta, który zlicza transakcje per sklep. Działa — ale wyobraź sobie, że:
- Chcesz wiedzieć ile transakcji było w każdej godzinie z osobna (nie łącznie od początku)
- Masz nie 50, ale 10 milionów transakcji dziennie
- Konsument padł — tracisz stan (liczniki), musisz zaczynać od zera
Do takich zadań używamy Apache Spark — silnika obliczeniowego, który: - przetwarza dane równolegle (na wielu rdzeniach/maszynach) - ma wbudowane okna czasowe (window) - działa zarówno na plikach (batch) jak i na żywym strumieniu (streaming)
Plan
- Wczytaj 10 000 transakcji z pliku
- Podstawowe agregacje — znajomy
groupBy - Okna tumbling — zliczanie per godzina / per 30 minut
- Okna sliding — nakładające się przedziały
- Zapowiedź: te same zapytania na żywym strumieniu z Kafki
Część 1: Wczytaj dane
SparkSession — brama do Sparka
Zanim cokolwiek zrobimy, tworzymy SparkSession. To jest odpowiednik połączenia z bazą danych — jeden obiekt na całą sesję.
Wczytaj plik JSON
Dane są w pliku data/transactions_10k.jsonl — 10 000 transakcji e-commerce z godzin 08:00–11:00. Każda linia to jeden rekord JSON, ten sam format co zdarzenia z Kafki w Lab 1.
df = spark.read.json("data/transactions_10k.jsonl")
print(f"Liczba rekordów: {df.count()}")
df.printSchema()df.show(10, truncate=False)Konwersja kolumny timestamp
Spark wczytał timestamp jako string (tekst). Żeby móc robić okna czasowe, musimy zamienić go na typ TimestampType.
To jak zmiana formatu komórki w Excelu z “tekst” na “data”.
from pyspark.sql.functions import to_timestamp, col
df = df.withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))
df.printSchema() # timestamp powinien być teraz 'timestamp (nullable = true)'Część 2: Podstawowe agregacje
Zanim przejdziemy do okien, przypomnij sobie groupBy — działa dokładnie jak w SQL (GROUP BY) i Pandas.
Zadanie 2.1 — Liczba transakcji i suma przychodów per sklep
from pyspark.sql.functions import count, sum as _sum, avg, round as _round
store_summary = (
df.groupBy("store")
.agg(
count("tx_id").alias("liczba_tx"),
_round(_sum("amount"), 2).alias("suma_PLN"),
_round(avg("amount"), 2).alias("srednia_PLN"),
)
.orderBy("store")
)
store_summary.show()Zadanie 2.2 - Statystyki per kategoria
Policz sumę, minimum i maksimum kwoty dla każdej kategorii.
from pyspark.sql.functions import min as _min, max as _max
# TWÓJ KOD
# df.groupBy("category").agg(...).orderBy("category").show()Część 3: Okna tumbling
Co to jest okno?
Dotychczasowe groupBy("store") liczyło wszystkie rekordy razem — bez znaczenia kiedy transakcja była.
Okno (window) dzieli oś czasu na przedziały i grupuje dane osobno w każdym.
Okno tumbling (przeskakujące) — przedziały bez nakładania się:
Każda transakcja trafia dokładnie do jednego okna.
Zadanie 3.1 — Liczba transakcji per godzina (tumbling 1h)
from pyspark.sql.functions import window
hourly = (
df.groupBy(window("timestamp", "1 hour")) # okno 1-godzinne
.agg(
count("tx_id").alias("liczba_tx"),
_round(_sum("amount"), 2).alias("suma_PLN"),
)
.orderBy("window")
)
hourly.show(truncate=False)Kolumna window to struct z polami start i end. Możemy je wyciągnąć żeby ładniej wyświetlić:
(
hourly
.select(
col("window.start").alias("od"),
col("window.end").alias("do"),
"liczba_tx",
"suma_PLN",
)
.show(truncate=False)
)Zadanie 3.2 Okna 30-minutowe per sklep
Policz transakcje i sumę per sklep w każdym 30-minutowym oknie. Posortuj po oknie, a w ramach okna po sklepie.
# TWÓJ KOD
# df.groupBy(window("timestamp", "30 minutes"), "store").agg(...).orderBy(...).show()Zadanie 3.3 — W której godzinie sklep “Kraków” miał najwyższy przychód?
Filtruj najpierw po sklepie, potem zrób okno godzinne, posortuj malejąco po sumie.
from pyspark.sql.functions import desc
# TWÓJ KODCzęść 4: Okna sliding
Okno sliding (przesuwne) — przedziały nakładają się:
Dwa parametry: windowDuration (szerokość okna) i slideDuration (co ile przesuwa się).
Uwaga: jedna transakcja może trafić do wielu okien — to normalne.
Zadanie 4.1 — Okno 1h, krok 30 minut
sliding = (
df.groupBy(window("timestamp", "1 hour", "30 minutes")) # szerokość 1h, krok 30min
.agg(
count("tx_id").alias("liczba_tx"),
_round(_sum("amount"), 2).alias("suma_PLN"),
)
.select(
col("window.start").alias("od"),
col("window.end").alias("do"),
"liczba_tx",
"suma_PLN",
)
.orderBy("od")
)
sliding.show(truncate=False)Zadanie 4.2 — Porównaj tumbling vs sliding
Policz łączną liczbę wierszy wynikowych w obu podejściach. Dlaczego sliding daje więcej wierszy?
tumbling_rows = (
df.groupBy(window("timestamp", "1 hour"))
.agg(count("tx_id"))
.count()
)
sliding_rows = (
df.groupBy(window("timestamp", "1 hour", "30 minutes"))
.agg(count("tx_id"))
.count()
)
print(f"Tumbling (1h): {tumbling_rows} okien")
print(f"Sliding (1h / 30min): {sliding_rows} okien")
# Odpowiedz w komentarzu: dlaczego sliding ma więcej wierszy?
# TWOJA ODPOWIEDŹ:Część 5: Pytania kontrolne
# Odpowiedz na pytania w komentarzach:
# 1. Ile transakcji jest w oknie 09:00–10:00?
# Sprawdź w wyniku zadania 3.1.
# ODPOWIEDŹ:
# 2. Jaka jest różnica między groupBy("store") a groupBy(window(...), "store")?
# ODPOWIEDŹ:
# 3. W oknie sliding 1h/30min — ile okien zawiera transakcje z godziny 09:30?
# Wskazówka: narysuj oś czasu.
# ODPOWIEDŹ:Zapowiedź Lab 3
Właśnie robiłeś okna na pliku (dane historyczne, batch).
Na następnych zajęciach ten sam kod zadziała na żywym strumieniu z Kafki — zmiana to dosłownie jedna linia:
# Batch (dziś):
df = spark.read.json("data/transactions_10k.jsonl")
# Streaming (Lab 3):
df = spark.readStream.format("kafka").option(...).load()Agregacje z window() pozostają bez zmian.
Praca domowa
- Znajdź godzinę, w której sklep Gdańsk miał najniższą średnią kwotę transakcji.
- Policz ile transakcji per kategoria było w oknie 09:00–09:30.
- Zrób okno 15-minutowe i sprawdź w której ćwierćgodzinie był szczyt transakcji (łącznie dla wszystkich sklepów).
Następne zajęcia: Ten sam window() na strumieniu z Kafki — watermark i obsługa spóźnionych zdarzeń.
spark.stop()