Ćwiczenia 1: Apache Kafka — producent, konsument, reguły decyzyjne

Studia zaoczne | 1,5h

Cel

  • Uruchomienie Kafki, utworzenie tematu,
  • Napisanie producenta i konsumenta w Pythonie,
  • Filtrowanie (stateless) i zliczanie (stateful) zdarzeń,
  • Proste reguły decyzyjne na strumieniu.

Case biznesowy: Budujesz system monitoringu transakcji e-commerce. Ten case będzie rozwijany na kolejnych zajęciach.

Przygotowanie

Upewnij się, że środowisko działa:

cd jupyterlab-project 
docker compose up

Otwórz JupyterLab: http://localhost:8999

W terminalu JupyterLab utwórz temat:

kafka-topics.sh --create --topic transactions \
  --bootstrap-server broker:9092

Część 1: Producent (20 min)

Poniżej gotowy producent — przeanalizuj kod, uruchom w terminalu (python producer.py).

%%file producer.py
from kafka import KafkaProducer
import json, random, time
from datetime import datetime

producer = KafkaProducer(
    bootstrap_servers='broker:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

sklepy = ['Warszawa', 'Kraków', 'Gdańsk', 'Wrocław']
kategorie = ['elektronika', 'odzież', 'żywność', 'książki']

def generate_transaction():
    return {
        'tx_id': f'TX{random.randint(1000,9999)}',
        'user_id': f'u{random.randint(1,20):02d}',
        'amount': round(random.uniform(5.0, 5000.0), 2),
        'store': random.choice(sklepy),
        'category': random.choice(kategorie),
        'timestamp': datetime.now().isoformat(),
    }

for i in range(1000):
    tx = generate_transaction()
    producer.send('transactions', value=tx)
    print(f"[{i+1}] {tx['tx_id']} | {tx['amount']:.2f} PLN | {tx['store']}")
    time.sleep(0.5)

producer.flush()
producer.close()
Writing producer.py

Zadanie 1.1 — Zmodyfikuj producenta

Zmień producenta tak, by 5% transakcji było podejrzanych: - kwota > 3000 PLN - kategoria = ‘elektronika’ - godzina nocna (dodaj pole hour z losową wartością 0–5)

# TWÓJ KOD — zmodyfikowany generate_transaction()
# Podpowiedź: if random.random() < 0.05: ... else: ...

Część 2: Konsument bezstanowy — filtrowanie (15 min)

Zadanie 2.1 — Wyświetl duże transakcje

Napisz konsumenta, który wypisuje tylko transakcje z amount > 1000.

%%file consumer_filter.py
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'transactions',
    bootstrap_servers='broker:9092',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for message in consumer:
    print(message)
    
# TWÓJ KOD
# Dla każdej wiadomości: sprawdź amount > 1000, jeśli tak — wypisz ALERT

Zadanie 2.2 — Dodaj poziom ryzyka

Napisz konsumenta, który dodaje pole risk_level: - amount > 3000 → “HIGH” - amount > 1000 → “MEDIUM” - reszta → “LOW”

# TWÓJ KOD — consumer_enrich.py

Część 3: Konsument stanowy — zliczanie (15 min)

Zadanie 3.1 — Transakcje per sklep

Napisz konsumenta, który liczy transakcje i sumy per sklep. Wypisz podsumowanie co 10 wiadomości.

%%file consumer_count.py
from kafka import KafkaConsumer
from collections import Counter, defaultdict
import json

consumer = KafkaConsumer(
    'transactions',
    bootstrap_servers='broker:9092',
    auto_offset_reset='earliest',
    group_id='count-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

store_counts = Counter()
total_amount = defaultdict(float)
msg_count = 0

# TWÓJ KOD
# Dla każdej wiadomości:
#   1. store_counts[store] += 1
#   2. total_amount[store] += amount
#   3. Co 10 wiadomości: print tabela

Część 4: Reguły decyzyjne (20 min)

Zadanie 4.1 — Scoring transakcji

Napisz funkcję, która przyznaje punkty za podejrzane cechy:

Reguła Warunek Punkty
R1 amount > 3000 +3
R2 elektronika i amount > 1500 +2
R3 godzina < 6 (noc) +2

Jeśli suma >= 3 → PODEJRZANA.

def score_transaction(tx):
    score = 0
    rules = []
    # TWÓJ KOD — zaimplementuj reguły R1, R2, R3
    return score, rules

# Test
test_tx = {'tx_id': 'TX999', 'amount': 4500.0, 'category': 'elektronika',
           'timestamp': '2026-04-01T03:15:00'}
print(score_transaction(test_tx))  # powinno dać score >= 5

Zadanie 4.2 — Konsument scoringowy

Połącz scoring z konsumentem Kafki. Podejrzane transakcje wyślij do nowego tematu alerts.

Najpierw utwórz temat:

kafka/bin/kafka-topics.sh --create --topic alerts \
  --bootstrap-server broker:9092 --partitions 1 --replication-factor 1
%%file scoring_consumer.py
from kafka import KafkaConsumer, KafkaProducer
import json

consumer = KafkaConsumer('transactions', bootstrap_servers='broker:9092',
    auto_offset_reset='earliest', group_id='scoring-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8')))

alert_producer = KafkaProducer(bootstrap_servers='broker:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8'))

# TWÓJ KOD
# Dla każdej transakcji: scoruj, jeśli >= 3: wyślij do 'alerts' i wypisz ALERT

Praca domowa

  1. Uruchom jednocześnie: producenta + consumer_filter + scoring_consumer.
  2. W osobnym terminalu sprawdź alerty: uruchom konsumenta tematu alerts.
  3. Wypchnij kod do Git.

Następne zajęcia: Model ML + FastAPI + serwowanie predykcji.