Lab 5: Flask — Serving a Scoring Model via REST API

What is an API?

Think of a restaurant. You (the customer) don’t walk into the kitchen and cook yourself — you order through a waiter. The waiter takes your request, passes it to the kitchen, and comes back with a response.

An API (Application Programming Interface) works the same way: - You send a request — e.g. “score this transaction” - The API processes it (runs the model, checks rules) - It returns a response — e.g. {"fraud": true, "score": 0.92}

This lets an ML model run as a separate service that can be called by a Kafka consumer, a mobile app, a dashboard — anything.

What is REST and HTTP?

HTTP is the communication protocol — the same one your browser uses. Each request has: - a method: GET (fetch data) or POST (send data for processing) - a URL: e.g. http://localhost:5000/score - optionally a body: JSON data

REST is a design style for APIs — each resource has its own URL, HTTP methods have defined meanings.

Where does this fit in our project?

Lab 1: Kafka producer → Python consumer
Lab 2: Spark DataFrame
Lab 3: Spark Streaming
Lab 4: consumer → Spark Streaming → alerts
Lab 5: Flask API  ← (here — building the scoring service)
Lab 6: Kafka consumer → calls Flask API → alerts

Plan

  1. First API — Hello World
  2. URL parameters and path variables
  3. POST endpoint — accepting JSON
  4. Transaction scoring — business rules through the API

Part 1: First API

Flask is a minimal web framework for Python — a few lines of code and you have a running server.

We use %%file to save the code to a file, then start the server in the background so the notebook keeps working.

%%file app.py
from flask import Flask

app = Flask(__name__)

@app.route("/")                      # URL: http://localhost:5000/
def home():
    return "Welcome to the transaction monitoring system!"

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)
Writing app.py
import subprocess, time, requests

server = subprocess.Popen(["python", "app.py"])
time.sleep(2)   # give the server a moment to start

response = requests.get("http://localhost:5000/")
print(f"Status: {response.status_code}")
print(f"Body:   {response.text}")
Status: 200
Body:   Welcome to the transaction monitoring system!

Status 200 means success. Other codes you will encounter:

Code Meaning
200 OK — request handled successfully
400 Bad Request — invalid input data
404 Not Found — this URL does not exist
500 Internal Server Error — bug in server code

Part 2: URL Parameters and Path Variables

An API can receive data in two ways: - query parameters: ?name=Anna — appended to the URL after ? - path variables: /transaction/TX001 — embedded in the URL

Task 2.1 — Endpoint with query parameter

server.kill()
%%file app.py
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route("/")
def home():
    return "Welcome to the transaction monitoring system!"

@app.route("/hello")                  # GET /hello?name=Anna
def hello():
    name = request.args.get("name", "stranger")   # read query parameter
    return f"Hello, {name}!"

@app.route("/transaction/<tx_id>")    # GET /transaction/TX0042
def get_transaction(tx_id):
    return jsonify({"tx_id": tx_id, "status": "found"})

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)
Overwriting app.py
server = subprocess.Popen(["python", "app.py"])
time.sleep(2)

# Query parameter
r1 = requests.get("http://localhost:5000/hello")
r2 = requests.get("http://localhost:5000/hello?name=Anna")
print(r1.text)   # Hello, stranger!
print(r2.text)   # Hello, Anna!

# Path variable
r3 = requests.get("http://localhost:5000/transaction/TX0042")
print(r3.json())
Hello, stranger!
Hello, Anna!
{'status': 'found', 'tx_id': 'TX0042'}

Task 2.2 — Add a /status endpoint

Add GET /status that returns JSON: {"service": "monitoring", "version": "1.0", "status": "ok"}.

server.kill()

# YOUR CODE — add the endpoint to app.py and test it
# %%file app.py ...

Part 3: POST Endpoint — Accepting JSON

So far we used GET — requests without data. Scoring a transaction requires sending data to the server — that’s what POST with a JSON body is for.

Analogy: GET is “what’s the price?”, POST is “buy this for me, here are my card details”.

%%file app.py
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route("/echo", methods=["POST"])  # accept POST only
def echo():
    data = request.get_json()           # read JSON body
    return jsonify({
        "received": data,
        "field_count": len(data),
    })

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)
Overwriting app.py
server = subprocess.Popen(["python", "app.py"])
time.sleep(2)

transaction = {
    "tx_id": "TX0042",
    "amount": 4500.0,
    "store": "Krakow",
    "category": "electronics",
}

r = requests.post("http://localhost:5000/echo", json=transaction)
print(r.json())
{'field_count': 4, 'received': {'amount': 4500.0, 'category': 'electronics', 'store': 'Krakow', 'tx_id': 'TX0042'}}

Task 3.1 — Test a wrong method

Try calling /echo with GET instead of POST. What does the server return? What status code?

r_bad = requests.get("http://localhost:5000/echo")
print(f"Status: {r_bad.status_code}")

# Answer in a comment: what does this status code mean?
# ANSWER:
Status: 405

Part 4: Transaction Scoring via API

Now we connect Flask to business logic. The /score endpoint accepts a transaction and returns a risk assessment.

We use rules for now — the ML model comes in Lab 6.

Task 4.1 — Scoring function

server.kill()

def score_transaction(tx: dict) -> dict:
    """
    Assess transaction risk using business rules.
    Returns dict with: score (0-8), risk_level, triggered_rules.
    """
    score = 0
    rules = []

    if tx.get("amount", 0) > 3000:
        score += 3; rules.append("R1: amount > 3000")

    if tx.get("category") == "electronics" and tx.get("amount", 0) > 1500:
        score += 2; rules.append("R2: electronics > 1500")

    # YOUR CODE — add rule R3: hour < 6 (use 'hour' field if present in tx)

    if score >= 5:
        risk_level = "CRITICAL"
    elif score >= 3:
        risk_level = "HIGH"
    elif score >= 1:
        risk_level = "MEDIUM"
    else:
        risk_level = "LOW"

    return {"score": score, "risk_level": risk_level, "triggered_rules": rules}

# Quick test without a server
test_tx = {"tx_id": "TX001", "amount": 4500.0, "category": "electronics", "hour": 3}
print(score_transaction(test_tx))
{'score': 5, 'risk_level': 'CRITICAL', 'triggered_rules': ['R1: amount > 3000', 'R2: electronics > 1500']}

Task 4.2 — /score endpoint

%%file app.py
from flask import Flask, request, jsonify

app = Flask(__name__)

def score_transaction(tx):
    score = 0
    rules = []
    if tx.get("amount", 0) > 3000:
        score += 3; rules.append("R1: amount > 3000")
    if tx.get("category") == "electronics" and tx.get("amount", 0) > 1500:
        score += 2; rules.append("R2: electronics > 1500")
    if tx.get("hour", 12) < 6:
        score += 2; rules.append("R3: night hour")
    risk_level = "CRITICAL" if score >= 5 else "HIGH" if score >= 3 else "MEDIUM" if score >= 1 else "LOW"
    return {"score": score, "risk_level": risk_level, "triggered_rules": rules}

@app.route("/score", methods=["POST"])
def score():
    tx = request.get_json()
    if not tx or "amount" not in tx:
        return jsonify({"error": "Missing required field 'amount'"}), 400
    result = score_transaction(tx)
    result["tx_id"] = tx.get("tx_id", "unknown")
    return jsonify(result)

@app.route("/health")
def health():
    return jsonify({"status": "ok", "version": "1.0-rules"})

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)
Overwriting app.py
server = subprocess.Popen(["python", "app.py"])
time.sleep(2)

cases = [
    {"tx_id": "TX001", "amount": 50.0,   "category": "food",        "hour": 14},
    {"tx_id": "TX002", "amount": 1800.0,  "category": "electronics", "hour": 10},
    {"tx_id": "TX003", "amount": 4500.0,  "category": "electronics", "hour": 3},
]

for tx in cases:
    r = requests.post("http://localhost:5000/score", json=tx)
    res = r.json()
    print(f"{tx['tx_id']}  {tx['amount']:>7.0f} PLN  → {res['risk_level']:8s} (score={res['score']})  {res['triggered_rules']}")
TX001       50 PLN  → LOW      (score=0)  []
TX002     1800 PLN  → MEDIUM   (score=2)  ['R2: electronics > 1500']
TX003     4500 PLN  → CRITICAL (score=7)  ['R1: amount > 3000', 'R2: electronics > 1500', 'R3: night hour']

Task 4.3 — Test error handling

Send a request without the amount field. Check the status code and response body.

# YOUR CODE
# r = requests.post("http://localhost:5000/score", json={"tx_id": "TX000"})
# print(r.status_code, r.json())

Task 4.4 — Review questions

# 1. What is the difference between GET and POST?
#    ANSWER:

# 2. Why use jsonify() instead of return {"key": "value"}?
#    Hint: try both and compare the response headers.
#    ANSWER:

# 3. What happens if two people call /score at the same time?
#    ANSWER:
server.kill()
print("Server stopped.")

Homework

  1. Add endpoint GET /stats that returns: how many requests the server handled, how many were HIGH/CRITICAL.
    (Hint: use a global dict as a counter — counters = {"total": 0, "high": 0})
  2. Add validation: if amount is negative — return a 400 error.
  3. Call /score with 10 transactions in a loop and print a summary table.

Next lab: Replace rules with a real ML model — train a RandomForest and load it into Flask.