Wykład 2 — Batch vs Stream. Architektury Lambda i Kappa

Analiza danych w czasie rzeczywistym

Przetwarzanie wsadowe vs strumieniowe, architektury Lambda i Kappa, czas zdarzenia, watermarking i okna czasowe.
Note{{< fa clock >}} Czas trwania: 1,5h

Cel wykładu: Zrozumienie różnic między przetwarzaniem wsadowym a strumieniowym, poznanie architektur Lambda i Kappa, oraz kluczowych pojęć: czas zdarzenia, czas przetwarzania, okna czasowe.


1 Batch vs Stream — dwa podejścia do tych samych danych

Na poprzednim wykładzie ustaliliśmy, że dane zawsze powstają jako strumień zdarzeń. Przetwarzanie wsadowe to tylko uproszczenie — zbieramy strumień do pliku i analizujemy z opóźnieniem.

Porównajmy oba podejścia na tym samym problemie biznesowym: monitorowanie transakcji w sklepie internetowym.

Pokaż kod
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

np.random.seed(42)

# Dane zebrane z całego dnia — analizowane następnego ranka
transakcje = pd.DataFrame({
    'czas': [datetime(2026, 3, 12, h, m)
             for h in range(9, 18)
             for m in np.random.choice(range(60), 20)],
    'kwota': np.random.uniform(20, 3000, 180).round(2),
    'klient_id': np.random.randint(1000, 9999, 180)
})

# Raport dzienny — widzimy go NASTĘPNEGO DNIA
raport = transakcje.groupby(transakcje['czas'].dt.hour).agg(
    liczba=('kwota', 'count'),
    suma=('kwota', 'sum'),
    srednia=('kwota', 'mean')
).round(2)

print("Raport dzienny (generowany następnego ranka):")
print(raport.head())
Raport dzienny (generowany następnego ranka):
      liczba      suma  srednia
czas                           
9         20  30174.87  1508.74
10        20  23969.05  1198.45
11        20  27346.70  1367.34
12        20  33457.91  1672.90
13        20  36552.13  1827.61
Pokaż kod
import time

# Symulacja: każda transakcja analizowana w momencie pojawienia się
okno_5min = []
alert_prog = 5000  # alert jeśli suma w oknie 5min > 5000 PLN

for i in range(10):
    tx = {
        'czas': datetime.now().strftime('%H:%M:%S'),
        'kwota': round(np.random.uniform(20, 3000), 2),
        'klient': np.random.randint(1000, 9999)
    }
    okno_5min.append(tx['kwota'])
    suma_okno = sum(okno_5min)

    status = ""
    if suma_okno > alert_prog:
        status = " << ALERT: wysoka aktywność!"
        okno_5min = []  # reset okna

    print(f"[{tx['czas']}] Klient {tx['klient']}: {tx['kwota']:>8.2f} PLN | Suma okna: {suma_okno:>9.2f}{status}")
    time.sleep(0.2)
[21:27:34] Klient 2191:  1289.75 PLN | Suma okna:   1289.75
[21:27:34] Klient 7938:   236.83 PLN | Suma okna:   1526.58
[21:27:35] Klient 2540:  2124.60 PLN | Suma okna:   3651.18
[21:27:35] Klient 6541:  1549.71 PLN | Suma okna:   5200.89 << ALERT: wysoka aktywność!
[21:27:35] Klient 2365:  2960.19 PLN | Suma okna:   2960.19
[21:27:35] Klient 9702:  2442.14 PLN | Suma okna:   5402.33 << ALERT: wysoka aktywność!
[21:27:35] Klient 2324:  2724.93 PLN | Suma okna:   2724.93
[21:27:36] Klient 4141:  2265.07 PLN | Suma okna:   4990.00
[21:27:36] Klient 2810:  2130.38 PLN | Suma okna:   7120.38 << ALERT: wysoka aktywność!
[21:27:36] Klient 6301:  2335.90 PLN | Suma okna:   2335.90
ImportantFundamentalna różnica

W trybie batch dowiadujemy się o problemie następnego dnia. W trybie stream — natychmiast.


2 Architektura Lambda

W praktyce firmy potrzebują obu podejść jednocześnie. Architektura Lambda, zaproponowana przez Nathana Marza, łączy przetwarzanie wsadowe i strumieniowe w jednym systemie.

2.1 Trzy warstwy Lambdy

Note{{< fa database >}} Warstwa wsadowa

Batch Layer — przetwarza kompletny zbiór danych w regularnych interwałach. Daje dokładne wyniki, ale z opóźnieniem.

Narzędzia: Spark (batch), Hadoop

Warning{{< fa bolt >}} Warstwa szybka

Speed Layer — przetwarza dane strumieniowe w czasie rzeczywistym. Daje przybliżone wyniki, ale natychmiast.

Narzędzia: Spark Streaming, Kafka Streams

Tip{{< fa server >}} Warstwa serwująca

Serving Layer — łączy wyniki z obu warstw i udostępnia je użytkownikom końcowym.

Narzędzia: bazy danych, API, dashboardy

flowchart TD
    SRC["Źródło danych\n(strumień zdarzeń)"] --> BATCH["Warstwa wsadowa\n(Batch Layer)\nKompletne dane\nDuże opóźnienie"]
    SRC --> SPEED["Warstwa szybka\n(Speed Layer)\nNajnowsze dane\nNiska latencja"]
    BATCH --> SERVE["Warstwa serwująca\n(Serving Layer)\nDashboard / API"]
    SPEED --> SERVE

    style BATCH fill:#2196F3,color:#fff
    style SPEED fill:#F44336,color:#fff
    style SERVE fill:#4CAF50,color:#fff

Architektura Lambda

  • Warstwa wsadowa: co noc przelicza historyczne wzorce zachowań klientów, trenuje modele fraud detection.
  • Warstwa szybka: w czasie rzeczywistym porównuje każdą transakcję z wzorcami i blokuje podejrzane operacje.
  • Warstwa serwująca: dashboard dla analityków + API dla aplikacji mobilnej.

2.2 Zalety i wady Lambdy

Zaleta: kompletność — batch layer koryguje błędy speed layer.

CautionGłówna wada Lambdy

Utrzymujesz dwie oddzielne ścieżki przetwarzania — dwa kody, dwa zestawy testów, dwa środowiska. To jest kosztowne i podatne na błędy.


3 Architektura Kappa

Jay Kreps (twórca Apache Kafka) zaproponował uproszczenie: a co, gdyby potrzebna była tylko warstwa strumieniowa?

Architektura Kappa to Lambda bez warstwy wsadowej. Cały przepływ danych opiera się na strumieniu zdarzeń. Jeśli trzeba przeliczyć dane historyczne — odtwarzamy strumień od początku (replay).

flowchart TD
    SRC["Źródło danych\n(strumień zdarzeń)"] --> STREAM["Warstwa strumieniowa\n(Stream Layer)\nJeden kod, jeden pipeline"]
    STREAM --> SERVE["Warstwa serwująca\nDashboard / API"]
    STREAM -.->|replay| SRC

    style STREAM fill:#FF9800,color:#fff
    style SERVE fill:#4CAF50,color:#fff

Architektura Kappa — uproszczenie Lambdy

3.1 Lambda vs Kappa — kiedy co stosować?

Lambda vs Kappa
Cecha {{< fa layer-group >}} Lambda {{< fa arrows-turn-right >}} Kappa
Złożoność Wysoka (dwie ścieżki) Niska (jedna ścieżka)
Dokładność Batch koryguje stream Zależna od jakości streamingu
Historyczne przeliczanie Naturalne (batch) Replay strumienia
Kiedy stosować Gdy batch i stream mają różną logikę Gdy jedna logika wystarcza
Przykład Bank (nocne retrenowanie + RT scoring) E-commerce (personalizacja)

4 Czas w przetwarzaniu strumieniowym

W przetwarzaniu wsadowym czas nie jest problemem — analizujemy dane historyczne, kiedy chcemy. W przetwarzaniu strumieniowym czas staje się kluczowym wymiarem analizy.

4.1 Dwa rodzaje czasu

Note{{< fa calendar-day >}} Czas zdarzenia (event time)

Moment, w którym zdarzenie faktycznie nastąpiło.

Np. klient kliknął „Kup” o 14:23:45.

Warning{{< fa server >}} Czas przetwarzania (processing time)

Moment, w którym system odebrał i przetworzył zdarzenie.

Np. system Kafki odebrał zdarzenie o 14:23:47.

W idealnym świecie oba czasy byłyby identyczne. W praktyce zawsze istnieje opóźnienie (latency) — spowodowane siecią, buforowaniem, awarią urządzeń.

Pokaż kod
# Symulacja: zdarzenia docierają z losowym opóźnieniem
import random

zdarzenia = []
for i in range(8):
    event_time = datetime(2026, 3, 12, 14, 23, i * 2)  # co 2 sekundy
    delay = random.uniform(0.1, 5.0)  # opóźnienie 0.1–5s
    processing_time = event_time + timedelta(seconds=delay)
    zdarzenia.append({
        'event_time': event_time.strftime('%H:%M:%S'),
        'processing_time': processing_time.strftime('%H:%M:%S.') + f'{int(delay*100):02d}',
        'opóźnienie': f'{delay:.1f}s'
    })

df = pd.DataFrame(zdarzenia)
print(df.to_string(index=False))
event_time processing_time opóźnienie
  14:23:00    14:23:03.390       3.9s
  14:23:02    14:23:03.137       1.4s
  14:23:04    14:23:08.496       5.0s
  14:23:06    14:23:10.461       4.6s
  14:23:08    14:23:10.276       2.8s
  14:23:10    14:23:13.360       3.6s
  14:23:12     14:23:12.91       0.9s
  14:23:14    14:23:18.460       4.6s
Pokaż kod
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

np.random.seed(42)
n = 20
evt = np.sort(np.random.uniform(0, 60, n))
delays = np.random.exponential(3, n)
proc = evt + delays
watermark = 5

fig, ax = plt.subplots(figsize=(9, 6))
ax.plot([0, 70], [0, 70], 'k--', alpha=0.3, label='Idealne (opóźnienie=0)')
ax.plot([0, 70], [watermark, 70+watermark], 'b--', alpha=0.3, label=f'Watermark ({watermark}s)')

for i in range(n):
    in_wm = proc[i] <= evt[i] + watermark + 1
    color = '#4CAF50' if in_wm else '#F44336'
    ax.scatter(evt[i], proc[i], c=color, s=50, zorder=5, edgecolors='white')
    ax.plot([evt[i], evt[i]], [evt[i], proc[i]], color=color, alpha=0.2, linewidth=1)

gp = mpatches.Patch(color='#4CAF50', label='W oknie watermarku')
rp = mpatches.Patch(color='#F44336', label='Za późno (odrzucone)')
ax.legend(handles=[gp, rp, ax.lines[0], ax.lines[1]], loc='upper left')
ax.set_xlabel('Event time (sekundy)')
ax.set_ylabel('Processing time (sekundy)')
ax.set_title('Opóźnienia i watermarking')
ax.set_xlim(-2, 70)
ax.set_ylim(-2, 75)
plt.tight_layout()
plt.show()
Figure 1: Czas zdarzenia vs czas przetwarzania — opóźnienia w systemach strumieniowych

Wyobraź sobie śledzenie samochodu przez GPS. Pojazd wjeżdża w tunel — przez 30 sekund nie ma sygnału. Po wyjściu z tunelu urządzenie wysyła naraz 30 odczytów. System musi wiedzieć, że te zdarzenia dotyczą przeszłości, a nie teraźniejszości.

Opóźnione zdarzenia możemy obsłużyć na dwa sposoby:

  • Ignorowanie — odrzucamy zdarzenia, które przyszły zbyt późno (ryzyko utraty danych).
  • Watermarking — definiujemy „znacznik wodny” — maksymalne dopuszczalne opóźnienie. Zdarzenia, które mieszczą się w oknie watermarku, są uwzględniane. Reszta odrzucana.

5 Okna czasowe

W przetwarzaniu strumieniowym nie możemy analizować „wszystkich danych” — strumień jest nieskończony. Zamiast tego grupujemy zdarzenia w okna czasowe o skończonej długości.

Stała długość, brak nakładania. Każde zdarzenie należy do dokładnie jednego okna.

Przykład: suma transakcji co 5 minut.

Pokaż kod
# Symulacja okna rozłącznego (tumbling) 5-minutowego
np.random.seed(42)

zdarzenia = pd.DataFrame({
    'czas': pd.date_range('2026-03-12 14:00', periods=30, freq='30s'),
    'kwota': np.random.uniform(10, 500, 30).round(2)
})

# Tumbling window: grupowanie co 5 minut
zdarzenia['okno'] = zdarzenia['czas'].dt.floor('5min')
wynik = zdarzenia.groupby('okno')['kwota'].agg(['sum', 'count']).round(2)
wynik.columns = ['suma', 'liczba']
print("Okno rozłączne (5 min):")
print(wynik)
Okno rozłączne (5 min):
                        suma  liczba
okno                                
2026-03-12 14:00:00  2648.68      10
2026-03-12 14:05:00  2036.82      10
2026-03-12 14:10:00  2061.89      10

Stała długość, ale okno przesuwa się o zadany interwał — zdarzenia mogą należeć do wielu okien.

Przykład: średnia z ostatnich 10 minut, aktualizowana co 2 minuty. Przydatne do wykrywania trendów.

Podobne do sliding, ale z możliwością nakładania się okien o określony krok. Stosowane do wygładzania danych.

Dynamiczna długość — okno trwa tak długo, jak długo przychodzą zdarzenia. Zamyka się po określonym czasie braku aktywności (gap).

Przykład: sesja użytkownika na stronie internetowej. Sesja trwa, dopóki użytkownik klika. Kończy się po 15 minutach braku aktywności.

5.1 Porównanie okien

Typy okien czasowych
Typ okna Długość Nakładanie Zastosowanie
Rozłączne (Tumbling) Stała {{< fa xmark >}} Raporty okresowe
Przesuwne (Sliding) Stała {{< fa check >}} Wykrywanie trendów
Skokowe (Hopping) Stała Częściowe Wygładzanie danych
Sesyjne (Session) Dynamiczna {{< fa xmark >}} Analiza sesji użytkowników
Pokaż kod
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

fig, axes = plt.subplots(3, 1, figsize=(10, 7), sharex=True)

events_x = [1, 2.5, 3, 4.5, 5.5, 7, 8, 9, 11, 12, 14]

# Tumbling
ax = axes[0]
ax.set_title('Okno rozłączne (Tumbling)', fontweight='bold')
for start in range(0, 15, 5):
    rect = mpatches.FancyBboxPatch((start, 0.2), 4.9, 0.6, boxstyle="round,pad=0.1",
        facecolor='#2196F3', alpha=0.3, edgecolor='#2196F3')
    ax.add_patch(rect)
ax.scatter(events_x, [0.5]*len(events_x), c='black', s=30, zorder=5)
ax.set_ylim(0, 1.2)
ax.set_yticks([])

# Sliding
ax = axes[1]
ax.set_title('Okno przesuwne (Sliding: 5 min, krok 2 min)', fontweight='bold')
colors = ['#F44336', '#FF9800', '#4CAF50', '#2196F3', '#9C27B0', '#795548', '#607D8B']
for i, start in enumerate(range(0, 12, 2)):
    y_offset = 0.15 + (i % 3) * 0.25
    rect = mpatches.FancyBboxPatch((start, y_offset), 4.9, 0.2, boxstyle="round,pad=0.05",
        facecolor=colors[i % len(colors)], alpha=0.3, edgecolor=colors[i % len(colors)])
    ax.add_patch(rect)
ax.scatter(events_x, [0.5]*len(events_x), c='black', s=30, zorder=5)
ax.set_ylim(0, 1.2)
ax.set_yticks([])

# Session
ax = axes[2]
ax.set_title('Okno sesyjne (Session: gap=2 min)', fontweight='bold')
session_events = [[1, 2.5, 3], [7, 8, 9], [14]]
sess_colors = ['#4CAF50', '#FF9800', '#9C27B0']
for j, (sess, col) in enumerate(zip(session_events, sess_colors)):
    start = min(sess) - 0.3
    end = max(sess) + 0.3
    rect = mpatches.FancyBboxPatch((start, 0.2), end-start, 0.6, boxstyle="round,pad=0.1",
        facecolor=col, alpha=0.3, edgecolor=col)
    ax.add_patch(rect)
ax.scatter(events_x, [0.5]*len(events_x), c='black', s=30, zorder=5)
ax.set_ylim(0, 1.2)
ax.set_yticks([])
ax.set_xlabel('Czas (minuty)')
ax.set_xlim(-0.5, 16)

plt.tight_layout()
plt.show()
Figure 2: Typy okien czasowych — wizualizacja
Pokaż kod
# Porównanie: tumbling vs sliding window na tych samych danych
print("=== Tumbling (5 min) ===")
tumbling = zdarzenia.groupby(zdarzenia['czas'].dt.floor('5min'))['kwota'].sum().round(2)
print(tumbling)

print("\n=== Sliding (okno 5 min, krok 1 min) ===")
for start_min in range(0, 15, 1):
    start = pd.Timestamp('2026-03-12 14:00') + pd.Timedelta(minutes=start_min)
    end = start + pd.Timedelta(minutes=5)
    mask = (zdarzenia['czas'] >= start) & (zdarzenia['czas'] < end)
    suma = zdarzenia.loc[mask, 'kwota'].sum()
    if suma > 0:
        print(f"  [{start.strftime('%H:%M')}{end.strftime('%H:%M')}) suma = {suma:.2f}")
=== Tumbling (5 min) ===
czas
2026-03-12 14:00:00    2648.68
2026-03-12 14:05:00    2036.82
2026-03-12 14:10:00    2061.89
Name: kwota, dtype: float64

=== Sliding (okno 5 min, krok 1 min) ===
  [14:00–14:05) suma = 2648.68
  [14:01–14:06) suma = 2484.66
  [14:02–14:07) suma = 2344.59
  [14:03–14:08) suma = 2370.66
  [14:04–14:09) suma = 2323.98
  [14:05–14:10) suma = 2036.82
  [14:06–14:11) suma = 1919.63
  [14:07–14:12) suma = 1730.35
  [14:08–14:13) suma = 2159.60
  [14:09–14:14) suma = 2103.20
  [14:10–14:15) suma = 2061.89
  [14:11–14:16) suma = 1673.73
  [14:12–14:17) suma = 1331.06
  [14:13–14:18) suma = 702.85
  [14:14–14:19) suma = 333.04

6 Podsumowanie

Na tym wykładzie poznaliśmy dwie kluczowe architektury (Lambda i Kappa) oraz fundamentalne pojęcia przetwarzania strumieniowego: czas zdarzenia, czas przetwarzania, watermarking i okna czasowe. To pojęcia, które będą towarzyszyć nam na laboratoriach podczas pracy z Apache Kafka i Spark Structured Streaming.

Note{{< fa forward >}} Na następnym wykładzie

Uczenie maszynowe w trybie wsadowym i przyrostowym (online learning), Stochastic Gradient Descent, detekcja anomalii.

Tip{{< fa brain >}} Do przemyślenia

Twój klient chce dashboard sprzedaży aktualizowany co 30 sekund. Jaką architekturę (Lambda/Kappa) byś zaproponował? Jaki typ okna czasowego zastosowałbyś?