from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("okienka").getOrCreate()
sparkLab 3: Spark Structured Streaming — okna na żywym strumieniu
czujnik_temperatury = ((12.5, "2019-01-02 12:00:00"),
(17.6, "2019-01-02 12:00:20"),
(14.6, "2019-01-02 12:00:30"),
(22.9, "2019-01-02 12:01:15"),
(17.4, "2019-01-02 12:01:30"),
(25.8, "2019-01-02 12:03:25"),
(27.1, "2019-01-02 12:02:40"),
)from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema = StructType([
StructField("temperatura", DoubleType(), True),
StructField("czas", StringType(), True),
])df = (spark.createDataFrame(czujnik_temperatury, schema=schema)
.withColumn("czas", to_timestamp("czas")))
df.printSchema()df.show(3)df.createOrReplaceTempView("df")spark.sql("select czas, temperatura from df where temperatura > 21").show(5)# Thumbling window
import pyspark.sql.functions as F
df2 = df.groupBy(F.window("czas","30 seconds")).count()
df2.show(truncate=False)df2.printSchema()spark.stop()🔌 Źródła danych w Spark Structured Streaming
Spark Structured Streaming pozwala na przetwarzanie danych w czasie rzeczywistym z różnych źródeł strumieniowych. Najpopularniejsze z nich to:
✅ rate — źródło testowe
- Automatycznie generuje dane: co sekundę dodaje wiersz.
- Każdy wiersz zawiera:
- timestamp – znacznik czasu,
- value – licznik rosnący (0, 1, 2, …).
- Używane do testowania logiki strumieniowania bez konieczności podpinania zewnętrznych źródeł.
df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()📡 Inne źródła strumieni:
Socket (do testów np. nc -lk 9999): To źródło nasłuchuje na wskazanym porcie gniazda (socket) i wczytuje dowolne dane do Spark Streaming. Również służy wyłącznie do celów testowych.
Plik (File): Nasłuchuje określonego katalogu i traktuje pojawiające się tam pliki jako dane strumieniowe. Obsługuje formaty takie jak CSV, JSON, ORC oraz Parquet (np. .csv, .json, .parquet).
Kafka: Odczytuje dane z Apache Kafka® i jest kompatybilne z brokerami w wersji 0.10.0 lub wyższej.
📤 Output Modes – tryby wypisywania wyników
outputMode określa jak Spark wypisuje dane po każdej mikroserii (micro-batch). Dostępne tryby to:
append Wypisuje tylko nowe wiersze, które zostały dodane w tej mikroserii. Najczęściej używany.
update Wypisuje zmienione wiersze - czyli zaktualizowane agregaty.
complete Wypisuje całą tabelę agregacji po każdej mikroserii. Wymaga pełnej agregacji (np. groupBy).
Utwórzmy nasz pierwszy strumieniowy DataFrame w Sparku, korzystając ze źródła danych typu rate.
%%file streamrate.py
## uruchom przez spark-submit streamrate.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
df = (spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)
query = (df.writeStream
.format("console")
.outputMode("append")
.option("truncate", False)
.start()
)
query.awaitTermination()from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
def process_batch(df, batch_id, tstop=5):
print(f"Batch ID: {batch_id}")
df.show(truncate=False)
if batch_id == tstop:
df.stop()
df = (spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)
query = (df.writeStream
.format("console")
.outputMode("append")
.foreachBatch(process_batch)
.option("truncate", False)
.start()
)symulator
Ustawmy generowane dane tak by działo się coś ciekawego
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
def process_batch(df, batch_id, tstop=5):
print(f"Batch ID: {batch_id}")
df.show(truncate=False)
if batch_id == tstop:
df.stop()
from pyspark.sql.functions import col, expr
df = (spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)
stream = (df.withColumn("czas", col("timestamp"))
.withColumn("temperatura", expr("20 + rand() * 10"))
.select("czas", "temperatura")
)
query = (stream.writeStream
.format("console")
.outputMode("append")
.foreachBatch(process_batch)
.option("truncate", False)
.start()
)🔹 1. Transformacja bezstanowa (stateless transformation)
To każda operacja, która nie zależy od danych historycznych — np. filtrowanie, mapowanie.
✅ Przykład: filtruj temperatury powyżej 28°C
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
def process_batch(df, batch_id, tstop=5):
print(f"Batch ID: {batch_id}")
df.show(truncate=False)
if batch_id == tstop:
df.stop()
from pyspark.sql.functions import col, expr
df = (spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)
stream = (df.withColumn("czas", col("timestamp"))
.withColumn("temperatura", expr("20 + rand() * 10"))
.select("czas", "temperatura")
)
stream_filtered = stream.filter(col("temperatura") > 28)
query = (stream_filtered.writeStream
.format("console")
.outputMode("append")
.foreachBatch(process_batch)
.option("truncate", False)
.start()
)🔹 2. Prosty model wykrywający anomalie (bezstanowy)
Powiedzmy, że uznajemy temperatury > 29.5°C za anomalie.
Dodaj kolumnę anomaly:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, when
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
def process_batch(df, batch_id, tstop=10):
print(f"Batch ID: {batch_id}")
df.show(truncate=False)
if batch_id == tstop:
df.stop()
df = (spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)
stream = (df.withColumn("czas", col("timestamp"))
.withColumn("temperatura", expr("20 + rand() * 10"))
.select("czas", "temperatura")
)
stream_anomaly = stream.withColumn(
"anomaly",
when(col("temperatura") > 29.5, "TAK").otherwise("NIE")
)
query = (stream_anomaly.writeStream
.format("console")
.outputMode("append")
.foreachBatch(process_batch)
.option("truncate", False)
.start()
)To też jest bezstanowe – tylko bieżąca wartość jest oceniana.
🔹 3. Okno czasowe: Tumbling Window
Pokazuje, jak dane są grupowane co np. 10 sekund bez nakładania się. zwroc uwagę na zmianę output mode !!!
✅ Przykład:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
def process_batch(df, batch_id, tstop=30):
print(f"Batch ID: {batch_id}")
df.show(truncate=False)
if batch_id == tstop:
df.stop()
df = (spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)
stream = (df.withColumn("czas", col("timestamp"))
.withColumn("temperatura", expr("20 + rand() * 10"))
.select("czas", "temperatura")
)
# Konwertuj 'temperatura' na typ Double, aby agregacja była możliwa
stream = stream.withColumn("temperatura", col("temperatura").cast("double"))
tumbling_window = (stream
.groupBy(window(col("czas"), "10 seconds"))
.avg("temperatura")
)
query = (tumbling_window.writeStream
.format("console")
.outputMode("complete")
.foreachBatch(process_batch)
.option("truncate", False)
.start()
)🔹 4. Okno czasowe: Sliding Window
Te same dane mogą wpaść do wielu nakładających się okien.
✅ Przykład:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
def process_batch(df, batch_id, tstop=30):
print(f"Batch ID: {batch_id}")
df.show(truncate=False)
if batch_id == tstop:
df.stop()
df = (spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)
stream = (df.withColumn("czas", col("timestamp"))
.withColumn("temperatura", expr("20 + rand() * 10"))
.select("czas", "temperatura")
)
# Konwertuj 'temperatura' na typ Double, aby agregacja była możliwa
stream = stream.withColumn("temperatura", col("temperatura").cast("double"))
sliding = (stream.groupBy(window(col("czas"), "10 seconds", "5 seconds"))
.avg("temperatura")
)
query = (sliding.writeStream
.format("console")
.outputMode("complete")
.foreachBatch(process_batch)
.option("truncate", False)
.start()
)🔹 Źródło plikowe (JSON)
✅ Generator danych:
%%file generator.py
# generator.py
import json, os, random, time
from datetime import datetime, timedelta
output_dir = "data/stream"
os.makedirs(output_dir, exist_ok=True)
sklepy = ['Warszawa', 'Kraków', 'Gdańsk', 'Wrocław']
kategorie = ['elektronika', 'odzież', 'żywność', 'książki']
def generate_transaction():
return {
'tx_id': f'TX{random.randint(1000,9999)}',
'user_id': f'u{random.randint(1,20):02d}',
'amount': round(random.uniform(5.0, 5000.0), 2),
'store': random.choice(sklepy),
'category': random.choice(kategorie),
'timestamp': datetime.now().isoformat(),
}
# Simulate file-based streaming
while True:
batch = [generate_transaction() for _ in range(2)]
filename = f"{output_dir}/events_{int(time.time())}.json"
with open(filename, "w") as f:
for e in batch:
f.write(json.dumps(e) + "\n")
print(f"Wrote: {filename}")
time.sleep(5)from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import from_json, to_timestamp
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("jsonDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
tx_schema = StructType([
StructField("tx_id", StringType()),
StructField("user_id", StringType()),
StructField("amount", DoubleType()),
StructField("store", StringType()),
StructField("category", StringType()),
StructField("timestamp", StringType()),
])
batch_counter = {"count": 0}
def process_batch(df, batch_id, tstop=5):
batch_counter["count"] += 1
print(f"Batch ID: {batch_id}")
df.show(truncate=False)
if batch_id == tstop:
df.stop()
stream = (spark.readStream
.schema(tx_schema)
.json("data/stream"))
query = (stream.writeStream
.format("console")
.foreachBatch(process_batch)
.start())