Lab 2: Apache Spark — analiza danych transakcyjnych

Dlaczego Spark?

W Lab 1 napisałeś konsumenta, który zlicza transakcje per sklep. Działa — ale wyobraź sobie, że:

  • Chcesz wiedzieć ile transakcji było w każdej godzinie z osobna (nie łącznie od początku)
  • Masz nie 50, ale 10 milionów transakcji dziennie
  • Konsument padł — tracisz stan (liczniki), musisz zaczynać od zera

Do takich zadań używamy Apache Spark — silnika obliczeniowego, który: - przetwarza dane równolegle (na wielu rdzeniach/maszynach) - ma wbudowane okna czasowe (window) - działa zarówno na plikach (batch) jak i na żywym strumieniu (streaming)


Plan

  1. Wczytaj 10 000 transakcji z pliku
  2. Podstawowe agregacje — znajomy groupBy
  3. Okna tumbling — zliczanie per godzina / per 30 minut
  4. Okna sliding — nakładające się przedziały
  5. Zapowiedź: te same zapytania na żywym strumieniu z Kafki

Część 1: Wczytaj dane

SparkSession — brama do Sparka

Zanim cokolwiek zrobimy, tworzymy SparkSession. To jest odpowiednik połączenia z bazą danych — jeden obiekt na całą sesję.

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("Lab2-Transactions")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
print(f"Spark {spark.version} — gotowy")

Wczytaj plik JSON

Dane są w pliku data/transactions_10k.jsonl — 10 000 transakcji e-commerce z godzin 08:00–11:00. Każda linia to jeden rekord JSON, ten sam format co zdarzenia z Kafki w Lab 1.

df = spark.read.json("data/transactions_10k.jsonl")

print(f"Liczba rekordów: {df.count()}")
df.printSchema()
df.show(10, truncate=False)

Konwersja kolumny timestamp

Spark wczytał timestamp jako string (tekst). Żeby móc robić okna czasowe, musimy zamienić go na typ TimestampType.

To jak zmiana formatu komórki w Excelu z “tekst” na “data”.

from pyspark.sql.functions import to_timestamp, col

df = df.withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))

df.printSchema()  # timestamp powinien być teraz 'timestamp (nullable = true)'

Część 2: Podstawowe agregacje

Zanim przejdziemy do okien, przypomnij sobie groupBy — działa dokładnie jak w SQL (GROUP BY) i Pandas.

Zadanie 2.1 — Liczba transakcji i suma przychodów per sklep

from pyspark.sql.functions import count, sum as _sum, avg, round as _round

store_summary = (
    df.groupBy("store")
    .agg(
        count("tx_id").alias("liczba_tx"),
        _round(_sum("amount"), 2).alias("suma_PLN"),
        _round(avg("amount"), 2).alias("srednia_PLN"),
    )
    .orderBy("store")
)
store_summary.show()

Zadanie 2.2 - Statystyki per kategoria

Policz sumę, minimum i maksimum kwoty dla każdej kategorii.

from pyspark.sql.functions import min as _min, max as _max

# TWÓJ KOD
# df.groupBy("category").agg(...).orderBy("category").show()

Część 3: Okna tumbling

Co to jest okno?

Dotychczasowe groupBy("store") liczyło wszystkie rekordy razem — bez znaczenia kiedy transakcja była.

Okno (window) dzieli oś czasu na przedziały i grupuje dane osobno w każdym.

Okno tumbling (przeskakujące) — przedziały bez nakładania się:

Każda transakcja trafia dokładnie do jednego okna.

Zadanie 3.1 — Liczba transakcji per godzina (tumbling 1h)

from pyspark.sql.functions import window

hourly = (
    df.groupBy(window("timestamp", "1 hour"))    # okno 1-godzinne
    .agg(
        count("tx_id").alias("liczba_tx"),
        _round(_sum("amount"), 2).alias("suma_PLN"),
    )
    .orderBy("window")
)
hourly.show(truncate=False)

Kolumna window to struct z polami start i end. Możemy je wyciągnąć żeby ładniej wyświetlić:

(
    hourly
    .select(
        col("window.start").alias("od"),
        col("window.end").alias("do"),
        "liczba_tx",
        "suma_PLN",
    )
    .show(truncate=False)
)

Zadanie 3.2 Okna 30-minutowe per sklep

Policz transakcje i sumę per sklep w każdym 30-minutowym oknie. Posortuj po oknie, a w ramach okna po sklepie.

# TWÓJ KOD
# df.groupBy(window("timestamp", "30 minutes"), "store").agg(...).orderBy(...).show()

Zadanie 3.3 — W której godzinie sklep “Kraków” miał najwyższy przychód?

Filtruj najpierw po sklepie, potem zrób okno godzinne, posortuj malejąco po sumie.

from pyspark.sql.functions import desc

# TWÓJ KOD

Część 4: Okna sliding

Okno sliding (przesuwne) — przedziały nakładają się:

Dwa parametry: windowDuration (szerokość okna) i slideDuration (co ile przesuwa się).

Uwaga: jedna transakcja może trafić do wielu okien — to normalne.

Zadanie 4.1 — Okno 1h, krok 30 minut

sliding = (
    df.groupBy(window("timestamp", "1 hour", "30 minutes"))  # szerokość 1h, krok 30min
    .agg(
        count("tx_id").alias("liczba_tx"),
        _round(_sum("amount"), 2).alias("suma_PLN"),
    )
    .select(
        col("window.start").alias("od"),
        col("window.end").alias("do"),
        "liczba_tx",
        "suma_PLN",
    )
    .orderBy("od")
)
sliding.show(truncate=False)

Zadanie 4.2 — Porównaj tumbling vs sliding

Policz łączną liczbę wierszy wynikowych w obu podejściach. Dlaczego sliding daje więcej wierszy?

tumbling_rows = (
    df.groupBy(window("timestamp", "1 hour"))
    .agg(count("tx_id"))
    .count()
)
sliding_rows = (
    df.groupBy(window("timestamp", "1 hour", "30 minutes"))
    .agg(count("tx_id"))
    .count()
)
print(f"Tumbling (1h):          {tumbling_rows} okien")
print(f"Sliding  (1h / 30min):  {sliding_rows} okien")

# Odpowiedz w komentarzu: dlaczego sliding ma więcej wierszy?
# TWOJA ODPOWIEDŹ:

Część 5: Pytania kontrolne

# Odpowiedz na pytania w komentarzach:

# 1. Ile transakcji jest w oknie 09:00–10:00?
#    Sprawdź w wyniku zadania 3.1.
#    ODPOWIEDŹ:

# 2. Jaka jest różnica między groupBy("store") a groupBy(window(...), "store")?
#    ODPOWIEDŹ:

# 3. W oknie sliding 1h/30min — ile okien zawiera transakcje z godziny 09:30?
#    Wskazówka: narysuj oś czasu.
#    ODPOWIEDŹ:

Zapowiedź Lab 3

Właśnie robiłeś okna na pliku (dane historyczne, batch).

Na następnych zajęciach ten sam kod zadziała na żywym strumieniu z Kafki — zmiana to dosłownie jedna linia:

# Batch (dziś):
df = spark.read.json("data/transactions_10k.jsonl")

# Streaming (Lab 3):
df = spark.readStream.format("kafka").option(...).load()

Agregacje z window() pozostają bez zmian.

Praca domowa

  1. Znajdź godzinę, w której sklep Gdańsk miał najniższą średnią kwotę transakcji.
  2. Policz ile transakcji per kategoria było w oknie 09:00–09:30.
  3. Zrób okno 15-minutowe i sprawdź w której ćwierćgodzinie był szczyt transakcji (łącznie dla wszystkich sklepów).

Następne zajęcia: Ten sam window() na strumieniu z Kafki — watermark i obsługa spóźnionych zdarzeń.

spark.stop()