from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("Lab2-Transactions")
.getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
print(f"Spark {spark.version} — ready")Lab 2: Apache Spark — Batch Analytics on Transaction Data
Why Spark?
In Lab 1 you built a consumer that counts transactions per store. It works — but imagine:
- You need to know how many transactions happened per hour (not all-time totals)
- You have not 50, but 10 million transactions per day
- Your consumer crashes — you lose all state (counters), and have to start over
For tasks like these we use Apache Spark — a distributed computation engine that: - processes data in parallel (across multiple cores / machines) - has built-in time windows (window) - works on both files (batch) and live streams (streaming)
Plan
- Load 10,000 transactions from a file
- Basic aggregations — familiar
groupBy - Tumbling windows — counts per hour / per 30 minutes
- Sliding windows — overlapping intervals
- Preview: the same queries on a live Kafka stream
Part 1: Load the Data
SparkSession — the entry point
Before anything else, create a SparkSession. Think of it as a database connection — one object for the entire session.
Load the JSON file
The data is in data/transactions_10k.jsonl — 10,000 e-commerce transactions from 08:00–11:00. Each line is one JSON record — the same format as Kafka events in Lab 1.
df = spark.read.json("data/transactions_10k.jsonl")
print(f"Record count: {df.count()}")
df.printSchema()df.show(10, truncate=False)Convert the timestamp column
Spark loaded timestamp as a string. To use time windows we need to convert it to TimestampType.
Think of it as changing a cell format in Excel from “text” to “date”.
from pyspark.sql.functions import to_timestamp, col
df = df.withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))
df.printSchema() # timestamp should now be 'timestamp (nullable = true)'Part 2: Basic Aggregations
Before we get to windows, let’s warm up with groupBy — works just like SQL GROUP BY and Pandas.
Task 2.1 — Transaction count and total revenue per store
from pyspark.sql.functions import count, sum as _sum, avg, round as _round
store_summary = (
df.groupBy("store")
.agg(
count("tx_id").alias("tx_count"),
_round(_sum("amount"), 2).alias("total_PLN"),
_round(avg("amount"), 2).alias("avg_PLN"),
)
.orderBy("store")
)
store_summary.show()Task 2.2 — Stats per category
Count transactions, total revenue, min and max amount for each category.
from pyspark.sql.functions import min as _min, max as _max
# YOUR CODE
# df.groupBy("category").agg(...).orderBy("category").show()Part 3: Tumbling Windows
What is a window?
The groupBy("store") above counted all records together — regardless of when a transaction occurred.
A window splits the time axis into intervals and groups data separately within each.
Tumbling window — non-overlapping, back-to-back intervals:
08:00 ──────── 09:00 ──────── 10:00 ──────── 11:00
[ window 1 ] [ window 2 ] [ window 3 ]Each transaction falls into exactly one window.
Task 3.1 — Transaction count per hour (tumbling 1h)
from pyspark.sql.functions import window
hourly = (
df.groupBy(window("timestamp", "1 hour")) # 1-hour tumbling window
.agg(
count("tx_id").alias("tx_count"),
_round(_sum("amount"), 2).alias("total_PLN"),
)
.orderBy("window")
)
hourly.show(truncate=False)The window column is a struct with start and end fields. We can extract them for cleaner output:
(
hourly
.select(
col("window.start").alias("from"),
col("window.end").alias("to"),
"tx_count",
"total_PLN",
)
.show(truncate=False)
)Task 3.2 — 30-minute windows per store
Count transactions and total revenue per store in each 30-minute window. Sort by window, then by store within each window.
# YOUR CODE
# df.groupBy(window("timestamp", "30 minutes"), "store").agg(...).orderBy(...).show()Task 3.3 — Which hour had the highest revenue for store “Kraków”?
Filter by store first, then apply an hourly window, sort descending by total.
from pyspark.sql.functions import desc
# YOUR CODEPart 4: Sliding Windows
Sliding window — intervals overlap:
08:00 ────────────────────────────────── 11:00
[──── 1h ────]
[──── 1h ────]
[──── 1h ────]Two parameters: windowDuration (width) and slideDuration (step size).
Note: one transaction can appear in multiple windows — that is expected.
Task 4.1 — 1h window, 30-minute step
sliding = (
df.groupBy(window("timestamp", "1 hour", "30 minutes")) # 1h width, 30min step
.agg(
count("tx_id").alias("tx_count"),
_round(_sum("amount"), 2).alias("total_PLN"),
)
.select(
col("window.start").alias("from"),
col("window.end").alias("to"),
"tx_count",
"total_PLN",
)
.orderBy("from")
)
sliding.show(truncate=False)Task 4.2 — Compare tumbling vs sliding
Count the total number of result rows in both approaches. Why does sliding produce more rows?
tumbling_rows = (
df.groupBy(window("timestamp", "1 hour"))
.agg(count("tx_id"))
.count()
)
sliding_rows = (
df.groupBy(window("timestamp", "1 hour", "30 minutes"))
.agg(count("tx_id"))
.count()
)
print(f"Tumbling (1h): {tumbling_rows} windows")
print(f"Sliding (1h / 30min): {sliding_rows} windows")
# Answer in a comment: why does sliding produce more rows?
# YOUR ANSWER:Part 5: Review Questions
# Answer in comments:
# 1. How many transactions are in the 09:00–10:00 window?
# Check the result from Task 3.1.
# ANSWER:
# 2. What is the difference between groupBy("store") and groupBy(window(...), "store")?
# ANSWER:
# 3. In the sliding window (1h / 30min), how many windows contain transactions from 09:30?
# Hint: draw a timeline.
# ANSWER:Preview: Lab 3
Today you applied windows to a file (historical batch data).
In the next lab the same code will work on a live Kafka stream — the only change is one line:
# Batch (today):
df = spark.read.json("data/transactions_10k.jsonl")
# Streaming (Lab 3):
df = spark.readStream.format("kafka").option(...).load()The window() aggregations stay exactly the same.
Homework
- Find the hour in which store Gdańsk had the lowest average transaction amount.
- Count how many transactions per category occurred in the 09:00–09:30 window.
- Use a 15-minute window and find which quarter-hour had the peak transaction volume (all stores combined).
Next lab: The same window() on a live Kafka stream — watermarks and handling late-arriving events.
spark.stop()