Lab 5: Flask — serwowanie modelu przez REST API

Co to jest API?

Wyobraź sobie restaurację. Ty (klient) nie wchodzisz do kuchni i nie gotujesz sam — zamawiasz przez kelnera. Kelner przyjmuje zamówienie, zanosi do kuchni, wraca z odpowiedzią.

API (Application Programming Interface) działa tak samo:

  • Ty wysyłasz zapytanie (request) — np. “oceń tę transakcję”
  • API przetwarza je (wywołuje model, sprawdza reguły)
  • Wraca z odpowiedzią (response) — np. {"fraud": true, "score": 0.92}

Dzięki temu model ML może działać jako osobna usługa, którą wywołuje konsument Kafki, aplikacja mobilna, dashboard — cokolwiek.

Co to jest REST i HTTP?

HTTP to protokół komunikacji — ten sam, którego używa przeglądarka. Każde zapytanie ma:

  • metodę: GET (pobierz dane) lub POST (wyślij dane do przetworzenia)
  • adres URL: np. http://localhost:5000/score
  • opcjonalnie ciało (body): dane JSON

REST to styl projektowania API — każdy zasób ma swój URL, metody HTTP mają określone znaczenie.

Gdzie to pasuje w naszym projekcie?

Lab 1: Kafka producer → konsument Python
Lab 2: Spark DataFrame
Lab 3: Spark Streaming
Lab 3: konsument → Spark Streaming → alerty
Lab 5: Flask API ← (tu jesteśmy — budujemy usługę scoringową)
Lab 6: konsument Kafki → wywołuje Flask API → alerty

Plan

  1. Pierwsze API — Hello World
  2. Parametry i ścieżki URL
  3. Endpoint POST — przyjmowanie JSON
  4. Scoring transakcji — reguły biznesowe przez API

Część 1: Pierwsze API

Flask to minimalny framework webowy dla Pythona — kilka linii kodu i masz działający serwer.

Używamy %%file żeby zapisać kod do pliku, a potem uruchamiamy serwer w tle (żeby notebook nadal działał).

%%file app.py
from flask import Flask

app = Flask(__name__)

@app.route("/")                      # adres URL: http://localhost:5000/
def home():
    return "Witaj w systemie monitoringu transakcji!"

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)
import subprocess, time, requests

server = subprocess.Popen(["python", "app.py"])
time.sleep(2)   # daj serwerowi chwilę na start

response = requests.get("http://localhost:5000/")
print(f"Status: {response.status_code}")
print(f"Treść: {response.text}")

Status 200 oznacza sukces. Inne kody które zobaczysz:

Kod Znaczenie
200 OK — zapytanie obsłużone poprawnie
400 Bad Request — złe dane wejściowe
404 Not Found — taki adres URL nie istnieje
500 Internal Server Error — błąd w kodzie serwera

Część 2: Parametry i ścieżki URL

API może przyjmować dane na dwa sposoby: - parametry query: ?name=Anna — w adresie URL po znaku ? - parametry ścieżki: /transaction/TX001 — część URL

Zadanie 2.1 — Endpoint z parametrem query

server.kill()   # zatrzymaj poprzedni serwer
%%file app.py
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route("/")
def home():
    return "Witaj w systemie monitoringu transakcji!"

@app.route("/hello")                  # GET /hello?name=Anna
def hello():
    name = request.args.get("name", "nieznajomy")   # odczytaj parametr query
    return f"Cześć, {name}!"

@app.route("/transaction/<tx_id>")    # GET /transaction/TX0042
def get_transaction(tx_id):
    return jsonify({"tx_id": tx_id, "status": "znaleziono"})

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)
server = subprocess.Popen(["python", "app.py"])
time.sleep(2)

# Parametr query
r1 = requests.get("http://localhost:5000/hello")
r2 = requests.get("http://localhost:5000/hello?name=Anna")
print(r1.text)   # Cześć, nieznajomy!
print(r2.text)   # Cześć, Anna!

# Parametr ścieżki
r3 = requests.get("http://localhost:5000/transaction/TX0042")
print(r3.json())

Zadanie 2.2 — Dodaj endpoint /status

Dodaj endpoint GET /status który zwraca JSON: {"service": "monitoring", "version": "1.0", "status": "ok"}.

server.kill()

# TWÓJ KOD — dopisz endpoint do app.py i przetestuj
# %%file app.py ...

Część 3: Endpoint POST — przyjmowanie JSON

Do tej pory używaliśmy GET — zapytania bez danych. Scoring transakcji wymaga przesłania danych do serwera — tu używamy POST z ciałem JSON.

Analogia: GET to pytanie “jaka jest cena?”, POST to “kup mi to za tę cenę, oto dane karty”.

%%file app.py
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route("/echo", methods=["POST"])  # akceptuj tylko POST
def echo():
    data = request.get_json()           # odczytaj ciało JSON
    return jsonify({
        "otrzymalem": data,
        "liczba_pol": len(data),
    })

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)
server = subprocess.Popen(["python", "app.py"])
time.sleep(2)

transakcja = {
    "tx_id": "TX0042",
    "amount": 4500.0,
    "store": "Warszawa",
    "category": "elektronika",
}

r = requests.post("http://localhost:5000/echo", json=transakcja)
print(r.json())

Zadanie 3.1 — Przetestuj błędne zapytanie

Spróbuj wywołać /echo metodą GET zamiast POST. Co zwraca serwer? Jaki kod statusu?

r_bad = requests.get("http://localhost:5000/echo")
print(f"Status: {r_bad.status_code}")

# Odpowiedz w komentarzu: co oznacza ten kod błędu?
# ODPOWIEDŹ:

Część 4: Scoring transakcji przez API

Teraz łączymy Flask z logiką biznesową. Endpoint /score przyjmuje transakcję i zwraca ocenę ryzyka.

Na razie używamy reguł (nie ML) — model dołączymy w Lab 6.

Zadanie 4.1 — Funkcja scoringowa

server.kill()

def score_transaction(tx: dict) -> dict:
    """
    Ocenia ryzyko transakcji na podstawie reguł biznesowych.
    Zwraca dict z polami: score (0-8), risk_level, triggered_rules.
    """
    score = 0
    rules = []

    if tx.get("amount", 0) > 3000:
        score += 3; rules.append("R1: kwota > 3000")

    if tx.get("category") == "elektronika" and tx.get("amount", 0) > 1500:
        score += 2; rules.append("R2: elektronika > 1500")

    # TWÓJ KOD — dodaj regułę R3: godzina < 6 (użyj pola 'hour' jeśli jest w tx)

    if score >= 5:
        risk_level = "CRITICAL"
    elif score >= 3:
        risk_level = "HIGH"
    elif score >= 1:
        risk_level = "MEDIUM"
    else:
        risk_level = "LOW"

    return {"score": score, "risk_level": risk_level, "triggered_rules": rules}

# Szybki test bez serwera
test_tx = {"tx_id": "TX001", "amount": 4500.0, "category": "elektronika", "hour": 3}
print(score_transaction(test_tx))

Zadanie 4.2 — Endpoint /score

%%file app.py
from flask import Flask, request, jsonify

app = Flask(__name__)

def score_transaction(tx):
    score = 0
    rules = []
    if tx.get("amount", 0) > 3000:
        score += 3; rules.append("R1: kwota > 3000")
    if tx.get("category") == "elektronika" and tx.get("amount", 0) > 1500:
        score += 2; rules.append("R2: elektronika > 1500")
    if tx.get("hour", 12) < 6:
        score += 2; rules.append("R3: nocna godzina")
    risk_level = "CRITICAL" if score >= 5 else "HIGH" if score >= 3 else "MEDIUM" if score >= 1 else "LOW"
    return {"score": score, "risk_level": risk_level, "triggered_rules": rules}

@app.route("/score", methods=["POST"])
def score():
    tx = request.get_json()
    if not tx or "amount" not in tx:
        return jsonify({"error": "Brak pola 'amount'"}), 400
    result = score_transaction(tx)
    result["tx_id"] = tx.get("tx_id", "unknown")
    return jsonify(result)

@app.route("/health")
def health():
    return jsonify({"status": "ok", "version": "1.0-rules"})

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)
server = subprocess.Popen(["python", "app.py"])
time.sleep(2)

cases = [
    {"tx_id": "TX001", "amount": 50.0,   "category": "żywność",    "hour": 14},
    {"tx_id": "TX002", "amount": 1800.0,  "category": "elektronika", "hour": 10},
    {"tx_id": "TX003", "amount": 4500.0,  "category": "elektronika", "hour": 3},
]

for tx in cases:
    r = requests.post("http://localhost:5000/score", json=tx)
    res = r.json()
    print(f"{tx['tx_id']}  {tx['amount']:>7.0f} PLN  → {res['risk_level']:8s} (score={res['score']})  {res['triggered_rules']}")

Zadanie 4.3 — Przetestuj obsługę błędów

Wyślij zapytanie bez pola amount. Sprawdź kod statusu i treść odpowiedzi.

# TWÓJ KOD
# r = requests.post("http://localhost:5000/score", json={"tx_id": "TX000"})
# print(r.status_code, r.json())

Zadanie 4.4 — Pytania kontrolne

# 1. Jaka jest różnica między GET a POST?
#    ODPOWIEDŹ:

# 2. Dlaczego używamy jsonify() zamiast return {"key": "value"}?
#    Wskazówka: spróbuj oba i porównaj nagłówki odpowiedzi.
#    ODPOWIEDŹ:

# 3. Co się stanie jeśli dwie osoby wywołają /score w tym samym czasie?
#    ODPOWIEDŹ:
server.kill()
print("Serwer zatrzymany.")

Praca domowa

  1. Dodaj endpoint GET /stats który zwraca: ile zapytań obsłużył serwer, ile z nich było HIGH/CRITICAL.
    (Wskazówka: użyj globalnego słownika jako licznika — counters = {"total": 0, "high": 0})
  2. Dodaj walidację: jeśli amount jest ujemne — zwróć błąd 400.
  3. Wywołaj /score z 10 transakcjami pętlą i wypisz tabelę wyników.

Następne zajęcia: zamieniamy reguły na prawdziwy model ML — trenujemy RandomForest i ładujemy go do Flask.