Lab 3: Spark Structured Streaming — okna na żywym strumieniu

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("okienka").getOrCreate()
spark
czujnik_temperatury = ((12.5, "2019-01-02 12:00:00"),
(17.6, "2019-01-02 12:00:20"),
(14.6,  "2019-01-02 12:00:30"),
(22.9,  "2019-01-02 12:01:15"),
(17.4,  "2019-01-02 12:01:30"),
(25.8,  "2019-01-02 12:03:25"),
(27.1,  "2019-01-02 12:02:40"),
)
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schema = StructType([
    StructField("temperatura", DoubleType(), True),
    StructField("czas", StringType(), True),
])
df = (spark.createDataFrame(czujnik_temperatury, schema=schema)
      .withColumn("czas", to_timestamp("czas")))

df.printSchema()
df.show(3)
df.createOrReplaceTempView("df")
spark.sql("select czas, temperatura from df where temperatura > 21").show(5)
# Thumbling window

import pyspark.sql.functions as F

df2 = df.groupBy(F.window("czas","30 seconds")).count()
df2.show(truncate=False)
df2.printSchema()
spark.stop()

🔌 Źródła danych w Spark Structured Streaming

Spark Structured Streaming pozwala na przetwarzanie danych w czasie rzeczywistym z różnych źródeł strumieniowych. Najpopularniejsze z nich to:

✅ rate — źródło testowe

  • Automatycznie generuje dane: co sekundę dodaje wiersz.
  • Każdy wiersz zawiera:
  • timestamp – znacznik czasu,
  • value – licznik rosnący (0, 1, 2, …).
  • Używane do testowania logiki strumieniowania bez konieczności podpinania zewnętrznych źródeł.
df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

📡 Inne źródła strumieni:

Socket (do testów np. nc -lk 9999): To źródło nasłuchuje na wskazanym porcie gniazda (socket) i wczytuje dowolne dane do Spark Streaming. Również służy wyłącznie do celów testowych.

Plik (File): Nasłuchuje określonego katalogu i traktuje pojawiające się tam pliki jako dane strumieniowe. Obsługuje formaty takie jak CSV, JSON, ORC oraz Parquet (np. .csv, .json, .parquet).

Kafka: Odczytuje dane z Apache Kafka® i jest kompatybilne z brokerami w wersji 0.10.0 lub wyższej.

📤 Output Modes – tryby wypisywania wyników

outputMode określa jak Spark wypisuje dane po każdej mikroserii (micro-batch). Dostępne tryby to:

append Wypisuje tylko nowe wiersze, które zostały dodane w tej mikroserii. Najczęściej używany.

update Wypisuje zmienione wiersze - czyli zaktualizowane agregaty.

complete Wypisuje całą tabelę agregacji po każdej mikroserii. Wymaga pełnej agregacji (np. groupBy).

Utwórzmy nasz pierwszy strumieniowy DataFrame w Sparku, korzystając ze źródła danych typu rate.

%%file streamrate.py
## uruchom przez spark-submit streamrate.py

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = (spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .load()
)


query = (df.writeStream 
    .format("console") 
    .outputMode("append") 
    .option("truncate", False) 
    .start()
) 

query.awaitTermination()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

def process_batch(df, batch_id, tstop=5):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id == tstop:
        df.stop()


df = (spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .load()
)

query = (df.writeStream 
    .format("console") 
    .outputMode("append")
    .foreachBatch(process_batch)
    .option("truncate", False) 
    .start()
)

symulator

Ustawmy generowane dane tak by działo się coś ciekawego

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

def process_batch(df, batch_id, tstop=5):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id == tstop:
        df.stop()

from pyspark.sql.functions import col, expr

df = (spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .load()
)

stream = (df.withColumn("czas", col("timestamp"))
        .withColumn("temperatura", expr("20 + rand() * 10"))
        .select("czas", "temperatura")
       )

query = (stream.writeStream 
    .format("console") 
    .outputMode("append")
    .foreachBatch(process_batch)
    .option("truncate", False) 
    .start()
)

🔹 1. Transformacja bezstanowa (stateless transformation)

To każda operacja, która nie zależy od danych historycznych — np. filtrowanie, mapowanie.

✅ Przykład: filtruj temperatury powyżej 28°C

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

def process_batch(df, batch_id, tstop=5):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id == tstop:
        df.stop()

from pyspark.sql.functions import col, expr

df = (spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .load()
)

stream = (df.withColumn("czas", col("timestamp"))
        .withColumn("temperatura", expr("20 + rand() * 10"))
        .select("czas", "temperatura")
       )

stream_filtered = stream.filter(col("temperatura") > 28)

query = (stream_filtered.writeStream 
    .format("console") 
    .outputMode("append")
    .foreachBatch(process_batch)
    .option("truncate", False) 
    .start()
)

🔹 2. Prosty model wykrywający anomalie (bezstanowy)

Powiedzmy, że uznajemy temperatury > 29.5°C za anomalie.

Dodaj kolumnę anomaly:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, when

spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

def process_batch(df, batch_id, tstop=10):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id == tstop:
        df.stop()



df = (spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .load()
)

stream = (df.withColumn("czas", col("timestamp"))
        .withColumn("temperatura", expr("20 + rand() * 10"))
        .select("czas", "temperatura")
       )

stream_anomaly = stream.withColumn(
    "anomaly",
    when(col("temperatura") > 29.5, "TAK").otherwise("NIE")
)

query = (stream_anomaly.writeStream 
    .format("console") 
    .outputMode("append")
    .foreachBatch(process_batch)
    .option("truncate", False) 
    .start()
)

To też jest bezstanowe – tylko bieżąca wartość jest oceniana.

🔹 3. Okno czasowe: Tumbling Window

Pokazuje, jak dane są grupowane co np. 10 sekund bez nakładania się. zwroc uwagę na zmianę output mode !!!

✅ Przykład:

from pyspark.sql import SparkSession
from pyspark.sql.functions import window

spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

def process_batch(df, batch_id, tstop=30):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id == tstop:
        df.stop()



df = (spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .load()
)

stream = (df.withColumn("czas", col("timestamp"))
        .withColumn("temperatura", expr("20 + rand() * 10"))
        .select("czas", "temperatura")
       )
# Konwertuj 'temperatura' na typ Double, aby agregacja była możliwa
stream = stream.withColumn("temperatura", col("temperatura").cast("double"))

tumbling_window = (stream
    .groupBy(window(col("czas"), "10 seconds"))
    .avg("temperatura")
)

query = (tumbling_window.writeStream 
    .format("console") 
    .outputMode("complete")
    .foreachBatch(process_batch)
    .option("truncate", False) 
    .start()
)

🔹 4. Okno czasowe: Sliding Window

Te same dane mogą wpaść do wielu nakładających się okien.

✅ Przykład:

from pyspark.sql import SparkSession
from pyspark.sql.functions import window

spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

def process_batch(df, batch_id, tstop=30):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id == tstop:
        df.stop()



df = (spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .load()
)

stream = (df.withColumn("czas", col("timestamp"))
        .withColumn("temperatura", expr("20 + rand() * 10"))
        .select("czas", "temperatura")
       )
# Konwertuj 'temperatura' na typ Double, aby agregacja była możliwa
stream = stream.withColumn("temperatura", col("temperatura").cast("double"))


sliding = (stream.groupBy(window(col("czas"), "10 seconds", "5 seconds"))
           .avg("temperatura")
          )

query = (sliding.writeStream 
    .format("console") 
    .outputMode("complete")
    .foreachBatch(process_batch)
    .option("truncate", False) 
    .start()
)

🔹 Źródło plikowe (JSON)

✅ Generator danych:

%%file generator.py
# generator.py
import json, os, random, time
from datetime import datetime, timedelta

output_dir = "data/stream"
os.makedirs(output_dir, exist_ok=True)

sklepy = ['Warszawa', 'Kraków', 'Gdańsk', 'Wrocław']
kategorie = ['elektronika', 'odzież', 'żywność', 'książki']

def generate_transaction():
    return {
        'tx_id': f'TX{random.randint(1000,9999)}',
        'user_id': f'u{random.randint(1,20):02d}',
        'amount': round(random.uniform(5.0, 5000.0), 2),
        'store': random.choice(sklepy),
        'category': random.choice(kategorie),
        'timestamp': datetime.now().isoformat(),
    }

# Simulate file-based streaming
while True:
    batch = [generate_transaction() for _ in range(2)]
    filename = f"{output_dir}/events_{int(time.time())}.json"
    with open(filename, "w") as f:
        for e in batch:
            f.write(json.dumps(e) + "\n")
    print(f"Wrote: {filename}")
    time.sleep(5)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import from_json, to_timestamp
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("jsonDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

tx_schema = StructType([
    StructField("tx_id",     StringType()),
    StructField("user_id",   StringType()),
    StructField("amount",    DoubleType()),
    StructField("store",     StringType()),
    StructField("category",  StringType()),
    StructField("timestamp", StringType()),
])

batch_counter = {"count": 0}

def process_batch(df, batch_id, tstop=5):
    batch_counter["count"] += 1
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id == tstop:
        df.stop()
    


stream = (spark.readStream
          .schema(tx_schema)
          .json("data/stream"))

query = (stream.writeStream
         .format("console")
         .foreachBatch(process_batch)
         .start())