Lab 1: Kafka in Python – Producers, Consumers, and Real-Time ETL

Goal

Your first hands-on experience with Apache Kafka:

  • Send transaction events to Kafka (producer),
  • Read and filter events in real time (stateless consumer),
  • Count and aggregate events on the fly (stateful consumer).

Business case: You work for an e-commerce company. Your task is to build a transaction monitoring system that processes purchase events as they happen.

This case will evolve over the next 9 labs into a complete real-time analytics platform.


Setup

Make sure your course environment is running:

cd jupyterlab-project
docker compose up -d

Open JupyterLab at http://localhost:8888 and create this notebook there.

All code below runs inside JupyterLab in the Docker environment.


Part 1: Create a Kafka Topic

Open a terminal in JupyterLab (File > New > Terminal) and create a topic:

kafka/bin/kafka-topics.sh --create \
  --topic transactions \
  --bootstrap-server broker:9092 \
  --partitions 3 \
  --replication-factor 1

Verify:

kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092
# Paste the output of the --list command here as a comment:
# YOUR RESULT

Part 2: Producer – Generating Transactions

Task 2.1 – Write a transaction producer

Create a file producer.py that sends simulated e-commerce transactions to the transactions topic.

Each transaction should be a JSON with fields: - tx_id (e.g., “TX0001”) - user_id (random from “u01” to “u20”) - amount (random float 5.0 – 5000.0) - store (random from: “Warsaw”, “Krakow”, “Gdansk”, “Wroclaw”) - category (random from: “electronics”, “clothing”, “food”, “books”) - timestamp (current time in ISO format)

Send one transaction per second. Print each one as it’s sent.

%%file producer.py
from kafka import KafkaProducer
import json, random, time
from datetime import datetime

producer = KafkaProducer(
    bootstrap_servers='broker:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def generate_transaction():
    # YOUR CODE
    # Return a dict with: tx_id, user_id, amount, store, category, timestamp
    pass

# YOUR CODE
# Loop: generate transaction, send to topic 'transactions', print, sleep 1s
# Send 50 transactions total

Run the producer in a JupyterLab terminal:

python producer.py

Leave it running and continue to the next task.


Part 3: Stateless Consumer – Filtering

Stateless processing means each event is processed independently, without remembering previous events.

Task 3.1 – Filter large transactions

Write a consumer that reads from transactions and prints only transactions with amount > 1000.

%%file consumer_filter.py
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'transactions',
    bootstrap_servers='broker:9092',
    auto_offset_reset='earliest',
    group_id='filter-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

print("Listening for large transactions (amount > 1000)...")

# YOUR CODE
# For each message: check if amount > 1000, if so print an alert
# Format: ALERT: TX0042 | 2345.67 PLN | Warsaw | electronics

Task 3.2 – Transform and enrich

Write a consumer that adds a risk_level field to each transaction: - amount > 3000 -> “HIGH” - amount > 1000 -> “MEDIUM” - otherwise -> “LOW”

Print each enriched transaction.

%%file consumer_enrich.py
from kafka import KafkaConsumer
import json

# YOUR CODE
# Read from 'transactions' (use a DIFFERENT group_id!)
# Add risk_level field based on amount
# Print enriched transaction

Part 4: Stateful Consumer – Aggregating

Stateful processing means we maintain state (counters, accumulators) across events.

Task 4.1 – Count transactions per store

Write a consumer that keeps a running count of transactions per store and prints updated totals every 10 messages.

%%file consumer_count.py
from kafka import KafkaConsumer
from collections import Counter
import json

consumer = KafkaConsumer(
    'transactions',
    bootstrap_servers='broker:9092',
    auto_offset_reset='earliest',
    group_id='count-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

store_counts = Counter()
total_amount = {}
msg_count = 0

# YOUR CODE
# For each message:
#   1. Increment store_counts[store]
#   2. Add amount to total_amount[store]
#   3. Every 10 messages, print a summary table:
#      Store | Count | Total Amount | Avg Amount

Task 4.2 – Running statistics per category

Write a consumer that tracks per category: - number of transactions - total revenue - min and max transaction amount

Print updated stats every 10 messages.

%%file consumer_stats.py
from kafka import KafkaConsumer
from collections import defaultdict
import json

# YOUR CODE

Part 5: Multiple Consumers

Task 5.1 – Run everything together

Open 3 terminals in JupyterLab and run simultaneously:

  1. python producer.py (generates events)
  2. python consumer_filter.py (filters large transactions)
  3. python consumer_count.py (counts per store)

Observe how both consumers process the same stream independently.

Task 5.2 – Questions

# Answer these questions:
# 1. What happens if you start consumer_filter.py AFTER the producer has finished?
#    (Hint: check auto_offset_reset)
#
# 2. What happens if two consumers have the SAME group_id?
#
# 3. What is the difference between stateless and stateful processing?
#    Give one example of each from this lab.

# YOUR ANSWERS

Homework

  1. Write a consumer that detects velocity anomalies: alert if the same user_id makes more than 3 transactions within 60 seconds. (Hint: keep a dict of {user_id: [timestamps]})
  2. Push all your code to your Git repository.

Next lab: We’ll process the same Kafka stream with Apache Spark – windowed aggregations and more powerful analytics.