# Wklej wynik komendy --list:
# TWÓJ WYNIKLab 1: Kafka w Pythonie — producent, konsument, ETL w czasie rzeczywistym
Cel
Pierwsze praktyczne doświadczenie z Apache Kafka:
- Wysyłanie zdarzeń transakcyjnych do Kafki (producent),
- Filtrowanie zdarzeń w czasie rzeczywistym (konsument bezstanowy),
- Zliczanie i agregacja zdarzeń na bieżąco (konsument stanowy).
Case biznesowy: Pracujesz w firmie e-commerce. Twoim zadaniem jest zbudowanie systemu monitoringu transakcji, który przetwarza zdarzenia zakupowe w czasie rzeczywistym.
Ten case będzie rozwijać się przez kolejnych 9 labów aż do kompletnej platformy.
Przygotowanie
Upewnij się, że środowisko działa:
cd jupyterlab-project
docker compose upOtwórz JupyterLab: http://localhost:8999
Cały kod poniżej uruchamiamy w JupyterLab w środowisku Docker.
Część 1: Utwórz temat Kafka
W terminalu JupyterLab (File > New > Terminal):
kafka-topics.sh --create \
--topic transactions \
--bootstrap-server broker:9092
kafka-topics.sh --list --bootstrap-server broker:9092Część 2: Producent — generowanie transakcji
Zadanie 2.1 — Napisz producenta transakcji
Utwórz plik producer.py, który wysyła symulowane transakcje e-commerce do tematu transactions.
Każda transakcja to JSON z polami: - tx_id (np. “TX0001”) - user_id (losowy z “u01” do “u20”) - amount (losowy float 5.0–5000.0) - store (losowy z: “Warszawa”, “Kraków”, “Gdańsk”, “Wrocław”) - category (losowy z: “elektronika”, “odzież”, “żywność”, “książki”) - timestamp (aktualny czas ISO)
Wysyłaj jedną transakcję na sekundę.
%%file producer.py
from kafka import KafkaProducer
import json, random, time
from datetime import datetime
producer = KafkaProducer(
bootstrap_servers='broker:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def generate_transaction():
# TWÓJ KOD
# Zwróć słownik z polami: tx_id, user_id, amount, store, category, timestamp
pass
# TWÓJ KOD
# Pętla: generuj transakcję, wyślij do tematu 'transactions', wypisz, sleep 1sUruchom producenta w terminalu JupyterLab:
python producer.pyPozostaw działającego i przejdź do następnego zadania.
Część 3: Konsument bezstanowy — filtrowanie
Przetwarzanie bezstanowe = każde zdarzenie przetwarzane niezależnie, bez pamięci o poprzednich.
Zadanie 3.1 — Filtruj duże transakcje
Napisz konsumenta, który czyta z transactions i wyświetla tylko transakcje z amount > 3000.
%%file consumer_filter.py
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'transactions',
bootstrap_servers='broker:9092',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
print("Nasłuchuję na duże transakcje (amount > 3000)...")
for message in consumer:
print(message)
# TWÓJ KOD
# Dla każdej wiadomości: sprawdź czy amount > 3000, jeśli tak — wypisz ALERT
# Format: ALERT: TX0042 | 2345.67 PLN | Warszawa | elektronikaZadanie 3.2 — Transformacja i wzbogacenie
Napisz konsumenta, który do słownika eventu dodaje pole risk_level: - amount > 3000 → “HIGH” - amount > 1000 → “MEDIUM” - pozostałe → “LOW”
%%file consumer_enrich.py
from kafka import KafkaConsumer
import json
# TWÓJ KOD
# Czytaj z 'transactions' (użyj INNEGO group_id!)
# Dodaj pole risk_level na podstawie amount
# Wypisz wzbogaconą transakcjęCzęść 4: Konsument stanowy — agregacja
Przetwarzanie stanowe = utrzymujemy stan (liczniki, sumy) pomiędzy zdarzeniami.
Zadanie 4.1 — Zliczanie transakcji per sklep
Napisz konsumenta, który prowadzi bieżące zliczanie transakcji per sklep i wypisuje podsumowanie co 10 wiadomości. Obiekt Counter() przeczytaj Counter.
%%file consumer_count.py
from kafka import KafkaConsumer
from collections import Counter
import json
consumer = KafkaConsumer(
'transactions',
bootstrap_servers='broker:9092',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
store_counts = Counter()
total_amount = {}
msg_count = 0
# TWÓJ KOD
# Dla każdej wiadomości:
# 1. Zwiększ store_counts[store]
# 2. Dodaj amount do total_amount[store]
# 3. Co 10 wiadomości wypisz tabelę:
# Sklep | Liczba | Suma | ŚredniaZadanie 4.2 — Statystyki per kategoria
Napisz konsumenta, który śledzi per kategoria: - liczbę transakcji - łączny przychód - min i max kwotę
Wypisuj co 10 wiadomości.
%%file consumer_stats.py
from kafka import KafkaConsumer
from collections import defaultdict
import json
# TWÓJ KODCzęść 5: Wielu konsumentów
Zadanie 5.1 — Uruchom wszystko razem
Otwórz 3 terminale w JupyterLab: 1. python producer.py 2. python consumer_filter.py 3. python consumer_count.py
Zadanie 5.2 — Pytania
- Co się stanie, jeśli uruchomisz consumer_filter.py po zakończeniu producenta?
- Co się stanie, jeśli dwóch konsumentów ma TĘ SAMĄ group_id?
- Jaka jest różnica między przetwarzaniem bezstanowym a stanowym?
# TWOJE ODPOWIEDZIPraca domowa
- Napisz konsumenta wykrywającego anomalie prędkości: alert jeśli ten sam
user_idwykona więcej niż 3 transakcje w ciągu 60 sekund. - Wypchnij kod do repozytorium Git.
Następne zajęcia: Przetwarzamy ten sam strumień Kafki w Apache Spark.