from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("lab3").getOrCreate()
sparkLab 3: Spark Structured Streaming — Windows on a Live Stream
temperature_sensor = (
(12.5, "2019-01-02 12:00:00"),
(17.6, "2019-01-02 12:00:20"),
(14.6, "2019-01-02 12:00:30"),
(22.9, "2019-01-02 12:01:15"),
(17.4, "2019-01-02 12:01:30"),
(25.8, "2019-01-02 12:03:25"),
(27.1, "2019-01-02 12:02:40"),
)from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema = StructType([
StructField("temperature", DoubleType(), True),
StructField("time", StringType(), True),
])df = (
spark.createDataFrame(temperature_sensor, schema=schema)
.withColumn("time", to_timestamp("time"))
)
df.printSchema()df.show(3)df.createOrReplaceTempView("df")spark.sql("SELECT time, temperature FROM df WHERE temperature > 21").show(5)# Tumbling window on batch data
import pyspark.sql.functions as F
df2 = df.groupBy(F.window("time", "30 seconds")).count()
df2.show(truncate=False)df2.printSchema()spark.stop()Streaming Sources in Spark Structured Streaming
Spark Structured Streaming can ingest data from various sources. The most common:
rate — test source
Automatically generates rows at a fixed rate. Each row contains: - timestamp — event time - value — monotonically increasing counter (0, 1, 2, …)
Used to test streaming logic without any external dependency.
df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()Other sources
Socket (nc -lk 9999) — listens on a TCP port; for testing only.
File — watches a directory and treats new files as a stream; supports CSV, JSON, Parquet.
Kafka — reads from Apache Kafka topics (broker version ≥ 0.10.0).
Output Modes
outputMode controls what Spark writes after each micro-batch:
| Mode | What is written | When to use |
|---|---|---|
append |
Only new rows added in this micro-batch | Most common; stateless ops |
update |
Only rows whose aggregate value changed | Streaming aggregations |
complete |
The entire result table after every micro-batch | Debugging; requires groupBy |
Let’s create our first streaming DataFrame using the rate source.
%%file streamrate.py
## Run with: spark-submit streamrate.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
df = (
spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)
query = (
df.writeStream
.format("console")
.outputMode("append")
.option("truncate", False)
.start()
)
query.awaitTermination()from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
def process_batch(df, batch_id, tstop=5):
print(f"Batch ID: {batch_id}")
df.show(truncate=False)
if batch_id >= tstop:
raise Exception("stop")
df = (
spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)
query = (
df.writeStream
.outputMode("append")
.foreachBatch(process_batch)
.option("truncate", False)
.start()
)
try:
query.awaitTermination()
except:
query.stop()Simulator
Let’s make the generated data more interesting.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
def process_batch(df, batch_id, tstop=5):
print(f"Batch ID: {batch_id}")
df.show(truncate=False)
if batch_id >= tstop:
raise Exception("stop")
df = (
spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)
stream = (
df.withColumn("time", col("timestamp"))
.withColumn("temperature", expr("20 + rand() * 10"))
.select("time", "temperature")
)
query = (
stream.writeStream
.outputMode("append")
.foreachBatch(process_batch)
.option("truncate", False)
.start()
)
try:
query.awaitTermination()
except:
query.stop()1. Stateless Transformation
Any operation that does not depend on historical data — e.g. filtering, column derivation.
✅ Example: filter temperatures above 28 °C
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
def process_batch(df, batch_id, tstop=5):
print(f"Batch ID: {batch_id}")
df.show(truncate=False)
if batch_id >= tstop:
raise Exception("stop")
df = (
spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)
stream = (
df.withColumn("time", col("timestamp"))
.withColumn("temperature", expr("20 + rand() * 10"))
.select("time", "temperature")
)
stream_filtered = stream.filter(col("temperature") > 28)
query = (
stream_filtered.writeStream
.outputMode("append")
.foreachBatch(process_batch)
.option("truncate", False)
.start()
)
try:
query.awaitTermination()
except:
query.stop()2. Simple Stateless Anomaly Detector
Let’s flag temperatures above 29.5 °C as anomalies by adding an anomaly column.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, when
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
def process_batch(df, batch_id, tstop=10):
print(f"Batch ID: {batch_id}")
df.show(truncate=False)
if batch_id >= tstop:
raise Exception("stop")
df = (
spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)
stream = (
df.withColumn("time", col("timestamp"))
.withColumn("temperature", expr("20 + rand() * 10"))
.select("time", "temperature")
)
stream_anomaly = stream.withColumn(
"anomaly",
when(col("temperature") > 29.5, "YES").otherwise("NO")
)
query = (
stream_anomaly.writeStream
.outputMode("append")
.foreachBatch(process_batch)
.option("truncate", False)
.start()
)
try:
query.awaitTermination()
except:
query.stop()This is also stateless — only the current value is evaluated.
3. Tumbling Window
Groups data into fixed, non-overlapping intervals of e.g. 10 seconds.
⚠️ Note the output mode change to complete — required for streaming aggregations.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, window
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
def process_batch(df, batch_id, tstop=30):
print(f"Batch ID: {batch_id}")
df.show(truncate=False)
if batch_id >= tstop:
raise Exception("stop")
df = (
spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)
stream = (
df.withColumn("time", col("timestamp"))
.withColumn("temperature", expr("20 + rand() * 10").cast("double"))
.select("time", "temperature")
)
tumbling_window = (
stream
.groupBy(window(col("time"), "10 seconds"))
.avg("temperature")
)
query = (
tumbling_window.writeStream
.outputMode("complete") # full table after every micro-batch
.foreachBatch(process_batch)
.option("truncate", False)
.start()
)
try:
query.awaitTermination()
except:
query.stop()4. Sliding Window
The same event can fall into multiple overlapping windows.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, window
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
def process_batch(df, batch_id, tstop=30):
print(f"Batch ID: {batch_id}")
df.show(truncate=False)
if batch_id >= tstop:
raise Exception("stop")
df = (
spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)
stream = (
df.withColumn("time", col("timestamp"))
.withColumn("temperature", expr("20 + rand() * 10").cast("double"))
.select("time", "temperature")
)
sliding = (
stream
.groupBy(window(col("time"), "10 seconds", "5 seconds")) # 10s window, 5s step
.avg("temperature")
)
query = (
sliding.writeStream
.outputMode("complete")
.foreachBatch(process_batch)
.option("truncate", False)
.start()
)
try:
query.awaitTermination()
except:
query.stop()File Source (JSON)
Data generator
%%file generator.py
# generator.py — run in a separate terminal: python generator.py
import json, os, random, time
from datetime import datetime
output_dir = "data/stream"
os.makedirs(output_dir, exist_ok=True)
stores = ['Warsaw', 'Krakow', 'Gdansk', 'Wroclaw']
categories = ['electronics', 'clothing', 'food', 'books']
def generate_transaction():
return {
'tx_id': f'TX{random.randint(1000, 9999)}',
'user_id': f'u{random.randint(1, 20):02d}',
'amount': round(random.uniform(5.0, 5000.0), 2),
'store': random.choice(stores),
'category': random.choice(categories),
'timestamp': datetime.now().isoformat(),
}
while True:
batch = [generate_transaction() for _ in range(2)]
filename = f"{output_dir}/events_{int(time.time())}.json"
with open(filename, "w") as f:
for e in batch:
f.write(json.dumps(e) + "\n")
print(f"Wrote: {filename}")
time.sleep(5)from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("jsonDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
tx_schema = StructType([
StructField("tx_id", StringType()),
StructField("user_id", StringType()),
StructField("amount", DoubleType()),
StructField("store", StringType()),
StructField("category", StringType()),
StructField("timestamp", StringType()),
])
batch_counter = {"count": 0}
def process_batch(df, batch_id, tstop=5):
batch_counter["count"] += 1
print(f"Batch ID: {batch_id}")
df.show(truncate=False)
if batch_id >= tstop:
raise Exception("stop")
# In a terminal: python generator.py
stream = (
spark.readStream
.schema(tx_schema)
.json("data/stream")
)
query = (
stream.writeStream
.foreachBatch(process_batch)
.start()
)
try:
query.awaitTermination()
except:
query.stop()