Studia zaoczne | 1,5h
Cel
- Uruchomienie Kafki, utworzenie tematu,
- Napisanie producenta i konsumenta w Pythonie,
- Filtrowanie (stateless) i zliczanie (stateful) zdarzeń,
- Proste reguły decyzyjne na strumieniu.
Case biznesowy: Budujesz system monitoringu transakcji e-commerce. Ten case będzie rozwijany na kolejnych zajęciach.
Przygotowanie
Upewnij się, że środowisko działa:
cd jupyterlab-project
docker compose up
Otwórz JupyterLab: http://localhost:8999
W terminalu JupyterLab utwórz temat:
kafka-topics.sh --create --topic transactions \
--bootstrap-server broker:9092
Część 1: Producent (20 min)
Poniżej gotowy producent — przeanalizuj kod, uruchom w terminalu (python producer.py).
%%file producer.py
from kafka import KafkaProducer
import json, random, time
from datetime import datetime
producer = KafkaProducer(
bootstrap_servers='broker:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
sklepy = ['Warszawa', 'Kraków', 'Gdańsk', 'Wrocław']
kategorie = ['elektronika', 'odzież', 'żywność', 'książki']
def generate_transaction():
return {
'tx_id': f'TX{random.randint(1000,9999)}',
'user_id': f'u{random.randint(1,20):02d}',
'amount': round(random.uniform(5.0, 5000.0), 2),
'store': random.choice(sklepy),
'category': random.choice(kategorie),
'timestamp': datetime.now().isoformat(),
}
for i in range(1000):
tx = generate_transaction()
producer.send('transactions', value=tx)
print(f"[{i+1}] {tx['tx_id']} | {tx['amount']:.2f} PLN | {tx['store']}")
time.sleep(0.5)
producer.flush()
producer.close()
Zadanie 1.1 — Zmodyfikuj producenta
Zmień producenta tak, by 5% transakcji było podejrzanych: - kwota > 3000 PLN - kategoria = ‘elektronika’ - godzina nocna (dodaj pole hour z losową wartością 0–5)
# TWÓJ KOD — zmodyfikowany generate_transaction()
# Podpowiedź: if random.random() < 0.05: ... else: ...
Część 2: Konsument bezstanowy — filtrowanie (15 min)
Zadanie 2.1 — Wyświetl duże transakcje
Napisz konsumenta, który wypisuje tylko transakcje z amount > 1000.
%%file consumer_filter.py
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'transactions',
bootstrap_servers='broker:9092',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for message in consumer:
print(message)
# TWÓJ KOD
# Dla każdej wiadomości: sprawdź amount > 1000, jeśli tak — wypisz ALERT
Zadanie 2.2 — Dodaj poziom ryzyka
Napisz konsumenta, który dodaje pole risk_level: - amount > 3000 → “HIGH” - amount > 1000 → “MEDIUM” - reszta → “LOW”
# TWÓJ KOD — consumer_enrich.py
Część 3: Konsument stanowy — zliczanie (15 min)
Zadanie 3.1 — Transakcje per sklep
Napisz konsumenta, który liczy transakcje i sumy per sklep. Wypisz podsumowanie co 10 wiadomości.
%%file consumer_count.py
from kafka import KafkaConsumer
from collections import Counter, defaultdict
import json
consumer = KafkaConsumer(
'transactions',
bootstrap_servers='broker:9092',
auto_offset_reset='earliest',
group_id='count-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
store_counts = Counter()
total_amount = defaultdict(float)
msg_count = 0
# TWÓJ KOD
# Dla każdej wiadomości:
# 1. store_counts[store] += 1
# 2. total_amount[store] += amount
# 3. Co 10 wiadomości: print tabela
Część 4: Reguły decyzyjne (20 min)
Zadanie 4.1 — Scoring transakcji
Napisz funkcję, która przyznaje punkty za podejrzane cechy:
| R1 |
amount > 3000 |
+3 |
| R2 |
elektronika i amount > 1500 |
+2 |
| R3 |
godzina < 6 (noc) |
+2 |
Jeśli suma >= 3 → PODEJRZANA.
def score_transaction(tx):
score = 0
rules = []
# TWÓJ KOD — zaimplementuj reguły R1, R2, R3
return score, rules
# Test
test_tx = {'tx_id': 'TX999', 'amount': 4500.0, 'category': 'elektronika',
'timestamp': '2026-04-01T03:15:00'}
print(score_transaction(test_tx)) # powinno dać score >= 5
Zadanie 4.2 — Konsument scoringowy
Połącz scoring z konsumentem Kafki. Podejrzane transakcje wyślij do nowego tematu alerts.
Najpierw utwórz temat:
kafka/bin/kafka-topics.sh --create --topic alerts \
--bootstrap-server broker:9092 --partitions 1 --replication-factor 1
%%file scoring_consumer.py
from kafka import KafkaConsumer, KafkaProducer
import json
consumer = KafkaConsumer('transactions', bootstrap_servers='broker:9092',
auto_offset_reset='earliest', group_id='scoring-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
alert_producer = KafkaProducer(bootstrap_servers='broker:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# TWÓJ KOD
# Dla każdej transakcji: scoruj, jeśli >= 3: wyślij do 'alerts' i wypisz ALERT
Praca domowa
- Uruchom jednocześnie: producenta + consumer_filter + scoring_consumer.
- W osobnym terminalu sprawdź alerty: uruchom konsumenta tematu
alerts.
- Wypchnij kod do Git.
Następne zajęcia: Model ML + FastAPI + serwowanie predykcji.