%%file app.py
from flask import Flask
app = Flask(__name__)
@app.route("/") # adres URL: http://localhost:5000/
def home():
return "Witaj w systemie monitoringu transakcji!"
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)Lab 5: Flask — serwowanie modelu przez REST API
Co to jest API?
Wyobraź sobie restaurację. Ty (klient) nie wchodzisz do kuchni i nie gotujesz sam — zamawiasz przez kelnera. Kelner przyjmuje zamówienie, zanosi do kuchni, wraca z odpowiedzią.
API (Application Programming Interface) działa tak samo:
- Ty wysyłasz zapytanie (request) — np. “oceń tę transakcję”
- API przetwarza je (wywołuje model, sprawdza reguły)
- Wraca z odpowiedzią (response) — np.
{"fraud": true, "score": 0.92}
Dzięki temu model ML może działać jako osobna usługa, którą wywołuje konsument Kafki, aplikacja mobilna, dashboard — cokolwiek.
Co to jest REST i HTTP?
HTTP to protokół komunikacji — ten sam, którego używa przeglądarka. Każde zapytanie ma:
- metodę:
GET(pobierz dane) lubPOST(wyślij dane do przetworzenia) - adres URL: np.
http://localhost:5000/score - opcjonalnie ciało (body): dane JSON
REST to styl projektowania API — każdy zasób ma swój URL, metody HTTP mają określone znaczenie.
Gdzie to pasuje w naszym projekcie?
Lab 1: Kafka producer → konsument Python
Lab 2: Spark DataFrame
Lab 3: Spark Streaming
Lab 3: konsument → Spark Streaming → alerty
Lab 5: Flask API ← (tu jesteśmy — budujemy usługę scoringową)
Lab 6: konsument Kafki → wywołuje Flask API → alerty
Plan
- Pierwsze API — Hello World
- Parametry i ścieżki URL
- Endpoint POST — przyjmowanie JSON
- Scoring transakcji — reguły biznesowe przez API
Część 1: Pierwsze API
Flask to minimalny framework webowy dla Pythona — kilka linii kodu i masz działający serwer.
Używamy %%file żeby zapisać kod do pliku, a potem uruchamiamy serwer w tle (żeby notebook nadal działał).
import subprocess, time, requests
server = subprocess.Popen(["python", "app.py"])
time.sleep(2) # daj serwerowi chwilę na start
response = requests.get("http://localhost:5000/")
print(f"Status: {response.status_code}")
print(f"Treść: {response.text}")Status 200 oznacza sukces. Inne kody które zobaczysz:
| Kod | Znaczenie |
|---|---|
| 200 | OK — zapytanie obsłużone poprawnie |
| 400 | Bad Request — złe dane wejściowe |
| 404 | Not Found — taki adres URL nie istnieje |
| 500 | Internal Server Error — błąd w kodzie serwera |
Część 2: Parametry i ścieżki URL
API może przyjmować dane na dwa sposoby: - parametry query: ?name=Anna — w adresie URL po znaku ? - parametry ścieżki: /transaction/TX001 — część URL
Zadanie 2.1 — Endpoint z parametrem query
server.kill() # zatrzymaj poprzedni serwer%%file app.py
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route("/")
def home():
return "Witaj w systemie monitoringu transakcji!"
@app.route("/hello") # GET /hello?name=Anna
def hello():
name = request.args.get("name", "nieznajomy") # odczytaj parametr query
return f"Cześć, {name}!"
@app.route("/transaction/<tx_id>") # GET /transaction/TX0042
def get_transaction(tx_id):
return jsonify({"tx_id": tx_id, "status": "znaleziono"})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)server = subprocess.Popen(["python", "app.py"])
time.sleep(2)
# Parametr query
r1 = requests.get("http://localhost:5000/hello")
r2 = requests.get("http://localhost:5000/hello?name=Anna")
print(r1.text) # Cześć, nieznajomy!
print(r2.text) # Cześć, Anna!
# Parametr ścieżki
r3 = requests.get("http://localhost:5000/transaction/TX0042")
print(r3.json())Zadanie 2.2 — Dodaj endpoint /status
Dodaj endpoint GET /status który zwraca JSON: {"service": "monitoring", "version": "1.0", "status": "ok"}.
server.kill()
# TWÓJ KOD — dopisz endpoint do app.py i przetestuj
# %%file app.py ...Część 3: Endpoint POST — przyjmowanie JSON
Do tej pory używaliśmy GET — zapytania bez danych. Scoring transakcji wymaga przesłania danych do serwera — tu używamy POST z ciałem JSON.
Analogia: GET to pytanie “jaka jest cena?”, POST to “kup mi to za tę cenę, oto dane karty”.
%%file app.py
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route("/echo", methods=["POST"]) # akceptuj tylko POST
def echo():
data = request.get_json() # odczytaj ciało JSON
return jsonify({
"otrzymalem": data,
"liczba_pol": len(data),
})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)server = subprocess.Popen(["python", "app.py"])
time.sleep(2)
transakcja = {
"tx_id": "TX0042",
"amount": 4500.0,
"store": "Warszawa",
"category": "elektronika",
}
r = requests.post("http://localhost:5000/echo", json=transakcja)
print(r.json())Zadanie 3.1 — Przetestuj błędne zapytanie
Spróbuj wywołać /echo metodą GET zamiast POST. Co zwraca serwer? Jaki kod statusu?
r_bad = requests.get("http://localhost:5000/echo")
print(f"Status: {r_bad.status_code}")
# Odpowiedz w komentarzu: co oznacza ten kod błędu?
# ODPOWIEDŹ:Część 4: Scoring transakcji przez API
Teraz łączymy Flask z logiką biznesową. Endpoint /score przyjmuje transakcję i zwraca ocenę ryzyka.
Na razie używamy reguł (nie ML) — model dołączymy w Lab 6.
Zadanie 4.1 — Funkcja scoringowa
server.kill()
def score_transaction(tx: dict) -> dict:
"""
Ocenia ryzyko transakcji na podstawie reguł biznesowych.
Zwraca dict z polami: score (0-8), risk_level, triggered_rules.
"""
score = 0
rules = []
if tx.get("amount", 0) > 3000:
score += 3; rules.append("R1: kwota > 3000")
if tx.get("category") == "elektronika" and tx.get("amount", 0) > 1500:
score += 2; rules.append("R2: elektronika > 1500")
# TWÓJ KOD — dodaj regułę R3: godzina < 6 (użyj pola 'hour' jeśli jest w tx)
if score >= 5:
risk_level = "CRITICAL"
elif score >= 3:
risk_level = "HIGH"
elif score >= 1:
risk_level = "MEDIUM"
else:
risk_level = "LOW"
return {"score": score, "risk_level": risk_level, "triggered_rules": rules}
# Szybki test bez serwera
test_tx = {"tx_id": "TX001", "amount": 4500.0, "category": "elektronika", "hour": 3}
print(score_transaction(test_tx))Zadanie 4.2 — Endpoint /score
%%file app.py
from flask import Flask, request, jsonify
app = Flask(__name__)
def score_transaction(tx):
score = 0
rules = []
if tx.get("amount", 0) > 3000:
score += 3; rules.append("R1: kwota > 3000")
if tx.get("category") == "elektronika" and tx.get("amount", 0) > 1500:
score += 2; rules.append("R2: elektronika > 1500")
if tx.get("hour", 12) < 6:
score += 2; rules.append("R3: nocna godzina")
risk_level = "CRITICAL" if score >= 5 else "HIGH" if score >= 3 else "MEDIUM" if score >= 1 else "LOW"
return {"score": score, "risk_level": risk_level, "triggered_rules": rules}
@app.route("/score", methods=["POST"])
def score():
tx = request.get_json()
if not tx or "amount" not in tx:
return jsonify({"error": "Brak pola 'amount'"}), 400
result = score_transaction(tx)
result["tx_id"] = tx.get("tx_id", "unknown")
return jsonify(result)
@app.route("/health")
def health():
return jsonify({"status": "ok", "version": "1.0-rules"})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)server = subprocess.Popen(["python", "app.py"])
time.sleep(2)
cases = [
{"tx_id": "TX001", "amount": 50.0, "category": "żywność", "hour": 14},
{"tx_id": "TX002", "amount": 1800.0, "category": "elektronika", "hour": 10},
{"tx_id": "TX003", "amount": 4500.0, "category": "elektronika", "hour": 3},
]
for tx in cases:
r = requests.post("http://localhost:5000/score", json=tx)
res = r.json()
print(f"{tx['tx_id']} {tx['amount']:>7.0f} PLN → {res['risk_level']:8s} (score={res['score']}) {res['triggered_rules']}")Zadanie 4.3 — Przetestuj obsługę błędów
Wyślij zapytanie bez pola amount. Sprawdź kod statusu i treść odpowiedzi.
# TWÓJ KOD
# r = requests.post("http://localhost:5000/score", json={"tx_id": "TX000"})
# print(r.status_code, r.json())Zadanie 4.4 — Pytania kontrolne
# 1. Jaka jest różnica między GET a POST?
# ODPOWIEDŹ:
# 2. Dlaczego używamy jsonify() zamiast return {"key": "value"}?
# Wskazówka: spróbuj oba i porównaj nagłówki odpowiedzi.
# ODPOWIEDŹ:
# 3. Co się stanie jeśli dwie osoby wywołają /score w tym samym czasie?
# ODPOWIEDŹ:server.kill()
print("Serwer zatrzymany.")Praca domowa
- Dodaj endpoint
GET /statsktóry zwraca: ile zapytań obsłużył serwer, ile z nich byłoHIGH/CRITICAL.
(Wskazówka: użyj globalnego słownika jako licznika —counters = {"total": 0, "high": 0}) - Dodaj walidację: jeśli
amountjest ujemne — zwróć błąd 400. - Wywołaj
/scorez 10 transakcjami pętlą i wypisz tabelę wyników.
Następne zajęcia: zamieniamy reguły na prawdziwy model ML — trenujemy RandomForest i ładujemy go do Flask.