Lab 1: Kafka w Pythonie — producent, konsument, ETL w czasie rzeczywistym

Cel

Pierwsze praktyczne doświadczenie z Apache Kafka:

  • Wysyłanie zdarzeń transakcyjnych do Kafki (producent),
  • Filtrowanie zdarzeń w czasie rzeczywistym (konsument bezstanowy),
  • Zliczanie i agregacja zdarzeń na bieżąco (konsument stanowy).

Case biznesowy: Pracujesz w firmie e-commerce. Twoim zadaniem jest zbudowanie systemu monitoringu transakcji, który przetwarza zdarzenia zakupowe w czasie rzeczywistym.

Ten case będzie rozwijać się przez kolejnych 9 labów aż do kompletnej platformy.


Przygotowanie

Upewnij się, że środowisko działa:

cd jupyterlab-project
docker compose up

Otwórz JupyterLab: http://localhost:8999

Cały kod poniżej uruchamiamy w JupyterLab w środowisku Docker.


Część 1: Utwórz temat Kafka

W terminalu JupyterLab (File > New > Terminal):

kafka-topics.sh --create \
  --topic transactions \
  --bootstrap-server broker:9092

kafka-topics.sh --list --bootstrap-server broker:9092
# Wklej wynik komendy --list:
# TWÓJ WYNIK

Część 2: Producent — generowanie transakcji

Zadanie 2.1 — Napisz producenta transakcji

Utwórz plik producer.py, który wysyła symulowane transakcje e-commerce do tematu transactions.

Każda transakcja to JSON z polami: - tx_id (np. “TX0001”) - user_id (losowy z “u01” do “u20”) - amount (losowy float 5.0–5000.0) - store (losowy z: “Warszawa”, “Kraków”, “Gdańsk”, “Wrocław”) - category (losowy z: “elektronika”, “odzież”, “żywność”, “książki”) - timestamp (aktualny czas ISO)

Wysyłaj jedną transakcję na sekundę.

%%file producer.py
from kafka import KafkaProducer
import json, random, time
from datetime import datetime

producer = KafkaProducer(
    bootstrap_servers='broker:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def generate_transaction():
    # TWÓJ KOD
    # Zwróć słownik z polami: tx_id, user_id, amount, store, category, timestamp
    pass

# TWÓJ KOD
# Pętla: generuj transakcję, wyślij do tematu 'transactions', wypisz, sleep 1s

Uruchom producenta w terminalu JupyterLab:

python producer.py

Pozostaw działającego i przejdź do następnego zadania.


Część 3: Konsument bezstanowy — filtrowanie

Przetwarzanie bezstanowe = każde zdarzenie przetwarzane niezależnie, bez pamięci o poprzednich.

Zadanie 3.1 — Filtruj duże transakcje

Napisz konsumenta, który czyta z transactions i wyświetla tylko transakcje z amount > 3000.

%%file consumer_filter.py
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'transactions',
    bootstrap_servers='broker:9092',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

print("Nasłuchuję na duże transakcje (amount > 3000)...")

for message in consumer:
    print(message)
    
# TWÓJ KOD
# Dla każdej wiadomości: sprawdź czy amount > 3000, jeśli tak — wypisz ALERT
# Format: ALERT: TX0042 | 2345.67 PLN | Warszawa | elektronika

Zadanie 3.2 — Transformacja i wzbogacenie

Napisz konsumenta, który do słownika eventu dodaje pole risk_level: - amount > 3000 → “HIGH” - amount > 1000 → “MEDIUM” - pozostałe → “LOW”

%%file consumer_enrich.py
from kafka import KafkaConsumer
import json

# TWÓJ KOD
# Czytaj z 'transactions' (użyj INNEGO group_id!)
# Dodaj pole risk_level na podstawie amount
# Wypisz wzbogaconą transakcję

Część 4: Konsument stanowy — agregacja

Przetwarzanie stanowe = utrzymujemy stan (liczniki, sumy) pomiędzy zdarzeniami.

Zadanie 4.1 — Zliczanie transakcji per sklep

Napisz konsumenta, który prowadzi bieżące zliczanie transakcji per sklep i wypisuje podsumowanie co 10 wiadomości. Obiekt Counter() przeczytaj Counter.

%%file consumer_count.py
from kafka import KafkaConsumer
from collections import Counter
import json

consumer = KafkaConsumer(
    'transactions',
    bootstrap_servers='broker:9092',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

store_counts = Counter()
total_amount = {}
msg_count = 0

# TWÓJ KOD
# Dla każdej wiadomości:
#   1. Zwiększ store_counts[store]
#   2. Dodaj amount do total_amount[store]
#   3. Co 10 wiadomości wypisz tabelę:
#      Sklep | Liczba | Suma | Średnia

Zadanie 4.2 — Statystyki per kategoria

Napisz konsumenta, który śledzi per kategoria: - liczbę transakcji - łączny przychód - min i max kwotę

Wypisuj co 10 wiadomości.

%%file consumer_stats.py
from kafka import KafkaConsumer
from collections import defaultdict
import json

# TWÓJ KOD

Część 5: Wielu konsumentów

Zadanie 5.1 — Uruchom wszystko razem

Otwórz 3 terminale w JupyterLab: 1. python producer.py 2. python consumer_filter.py 3. python consumer_count.py

Zadanie 5.2 — Pytania

  1. Co się stanie, jeśli uruchomisz consumer_filter.py po zakończeniu producenta?
  2. Co się stanie, jeśli dwóch konsumentów ma TĘ SAMĄ group_id?
  3. Jaka jest różnica między przetwarzaniem bezstanowym a stanowym?

# TWOJE ODPOWIEDZI

Praca domowa

  1. Napisz konsumenta wykrywającego anomalie prędkości: alert jeśli ten sam user_id wykona więcej niż 3 transakcje w ciągu 60 sekund.
  2. Wypchnij kod do repozytorium Git.

Następne zajęcia: Przetwarzamy ten sam strumień Kafki w Apache Spark.