# Paste the output of the --list command here as a comment:
# YOUR RESULTLab 1: Kafka in Python – Producers, Consumers, and Real-Time ETL
Goal
Your first hands-on experience with Apache Kafka:
- Send transaction events to Kafka (producer),
- Read and filter events in real time (stateless consumer),
- Count and aggregate events on the fly (stateful consumer).
Business case: You work for an e-commerce company. Your task is to build a transaction monitoring system that processes purchase events as they happen.
This case will evolve over the next 9 labs into a complete real-time analytics platform.
Setup
Make sure your course environment is running:
cd jupyterlab-project
docker compose up -dOpen JupyterLab at http://localhost:8888 and create this notebook there.
All code below runs inside JupyterLab in the Docker environment.
Part 1: Create a Kafka Topic
Open a terminal in JupyterLab (File > New > Terminal) and create a topic:
kafka/bin/kafka-topics.sh --create \
--topic transactions \
--bootstrap-server broker:9092 \
--partitions 3 \
--replication-factor 1Verify:
kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092Part 2: Producer – Generating Transactions
Task 2.1 – Write a transaction producer
Create a file producer.py that sends simulated e-commerce transactions to the transactions topic.
Each transaction should be a JSON with fields: - tx_id (e.g., “TX0001”) - user_id (random from “u01” to “u20”) - amount (random float 5.0 – 5000.0) - store (random from: “Warsaw”, “Krakow”, “Gdansk”, “Wroclaw”) - category (random from: “electronics”, “clothing”, “food”, “books”) - timestamp (current time in ISO format)
Send one transaction per second. Print each one as it’s sent.
%%file producer.py
from kafka import KafkaProducer
import json, random, time
from datetime import datetime
producer = KafkaProducer(
bootstrap_servers='broker:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def generate_transaction():
# YOUR CODE
# Return a dict with: tx_id, user_id, amount, store, category, timestamp
pass
# YOUR CODE
# Loop: generate transaction, send to topic 'transactions', print, sleep 1s
# Send 50 transactions totalRun the producer in a JupyterLab terminal:
python producer.pyLeave it running and continue to the next task.
Part 3: Stateless Consumer – Filtering
Stateless processing means each event is processed independently, without remembering previous events.
Task 3.1 – Filter large transactions
Write a consumer that reads from transactions and prints only transactions with amount > 1000.
%%file consumer_filter.py
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'transactions',
bootstrap_servers='broker:9092',
auto_offset_reset='earliest',
group_id='filter-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
print("Listening for large transactions (amount > 1000)...")
# YOUR CODE
# For each message: check if amount > 1000, if so print an alert
# Format: ALERT: TX0042 | 2345.67 PLN | Warsaw | electronicsTask 3.2 – Transform and enrich
Write a consumer that adds a risk_level field to each transaction: - amount > 3000 -> “HIGH” - amount > 1000 -> “MEDIUM” - otherwise -> “LOW”
Print each enriched transaction.
%%file consumer_enrich.py
from kafka import KafkaConsumer
import json
# YOUR CODE
# Read from 'transactions' (use a DIFFERENT group_id!)
# Add risk_level field based on amount
# Print enriched transactionPart 4: Stateful Consumer – Aggregating
Stateful processing means we maintain state (counters, accumulators) across events.
Task 4.1 – Count transactions per store
Write a consumer that keeps a running count of transactions per store and prints updated totals every 10 messages.
%%file consumer_count.py
from kafka import KafkaConsumer
from collections import Counter
import json
consumer = KafkaConsumer(
'transactions',
bootstrap_servers='broker:9092',
auto_offset_reset='earliest',
group_id='count-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
store_counts = Counter()
total_amount = {}
msg_count = 0
# YOUR CODE
# For each message:
# 1. Increment store_counts[store]
# 2. Add amount to total_amount[store]
# 3. Every 10 messages, print a summary table:
# Store | Count | Total Amount | Avg AmountTask 4.2 – Running statistics per category
Write a consumer that tracks per category: - number of transactions - total revenue - min and max transaction amount
Print updated stats every 10 messages.
%%file consumer_stats.py
from kafka import KafkaConsumer
from collections import defaultdict
import json
# YOUR CODEPart 5: Multiple Consumers
Task 5.1 – Run everything together
Open 3 terminals in JupyterLab and run simultaneously:
python producer.py(generates events)python consumer_filter.py(filters large transactions)python consumer_count.py(counts per store)
Observe how both consumers process the same stream independently.
Task 5.2 – Questions
# Answer these questions:
# 1. What happens if you start consumer_filter.py AFTER the producer has finished?
# (Hint: check auto_offset_reset)
#
# 2. What happens if two consumers have the SAME group_id?
#
# 3. What is the difference between stateless and stateful processing?
# Give one example of each from this lab.
# YOUR ANSWERSHomework
- Write a consumer that detects velocity anomalies: alert if the same
user_idmakes more than 3 transactions within 60 seconds. (Hint: keep a dict of{user_id: [timestamps]}) - Push all your code to your Git repository.
Next lab: We’ll process the same Kafka stream with Apache Spark – windowed aggregations and more powerful analytics.