Zarzadzanie zrodlami danych strumieniowych i segmentacja klientow

🔄 Wprowadzenie

W tym laboratorium zapoznasz sie z roznymi metodami zasilania danych strumieniowych w Apache Spark oraz zastosowaniem prostych transformacji, filtrowania i segmentacji klientow w czasie rzeczywistym.

💡 Pomocnicza funkcja do wyswietlania naszych danych strumieniowych

batch_counter = {"count": 0}

def process_batch(df, batch_id):
    batch_counter["count"] += 1
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_counter["count"] % 5 == 0:
        spark.stop()

🔹 rate jako źródło kontrolowanego strumienia

✅ Zadanie 1

  1. Przygotuj strumien danych z format('rate'), ustaw rowsPerSecond na 5.
  2. Utworz kolumne user_id: expr("concat('u', cast(rand()*100 as int))")
  3. Dodaj kolumne event_type: expr("case when rand() > 0.7 then 'purchase' else 'view' end")

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

rate_df = (spark....)

events = (rate_df....)
    
query = (events.writeStream
         .format("console")
         .foreachBatch(process_batch)
         .start())

🔹 Filtrowanie danych bez agregacji (append mode)

Zobacz jak działa poniższy kod i na jego podstawie:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("AppendExample").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# Źródło rate - generuje timestamp + value
rate_df = (spark.readStream
           .format("rate")
           .option("rowsPerSecond", 5)
           .load())

# Filtracja bez potrzeby agregacji (bezstanowe przetwarzanie)
filtered = rate_df.filter(col("value") % 2 == 0) \
                  .withColumn("info", expr("concat('even:', value)"))

# outputMode = append → pokazuje tylko nowe wiersze, bez stanu
query = (filtered.writeStream 
    .outputMode("append") 
    .format("console") 
    .option("truncate", False) 
    .foreachBatch(process_batch)
    .start()
        )
  1. Skorzystaj z danych z poprzedniego zadania.
  2. Wyfiltruj tylko purchase.
purchases = events....

query = (purchases.writeStream
         .format("console")
         .outputMode("append")
         .foreachBatch(process_batch)
         .start())

🔹 Źródło plikowe (JSON)

✅ Generator danych:

%%file generator.py
# generator.py
import json, os, random, time
from datetime import datetime, timedelta

output_dir = "data/stream"
os.makedirs(output_dir, exist_ok=True)

event_types = ["view", "cart", "purchase"]
categories = ["electronics", "books", "fashion", "home", "sports"]

def generate_event():
    return {
        "user_id": f"u{random.randint(1, 50)}",
        "event_type": random.choices(event_types, weights=[0.6, 0.25, 0.15])[0],
        "timestamp": (datetime.utcnow() - timedelta(seconds=random.randint(0, 300))).isoformat(),
        "product_id": f"p{random.randint(100, 120)}",
        "category": random.choice(categories),
        "price": round(random.uniform(10, 1000), 2)
    }

# Simulate file-based streaming
while True:
    batch = [generate_event() for _ in range(50)]
    filename = f"{output_dir}/events_{int(time.time())}.json"
    with open(filename, "w") as f:
        for e in batch:
            f.write(json.dumps(e) + "\n")
    print(f"Wrote: {filename}")
    time.sleep(5)

✅ Schemat danych:

{
  "user_id": "u123",
  "event_type": "purchase", // albo "view", "cart", "click"
  "timestamp": "2025-05-09T15:24:00Z",
  "product_id": "p456",
  "category": "electronics",
  "price": 299.99
}
  1. Utwórz zmienną schema, która zrealizuje schamat danych naszej ramki. Wykorzystaj StringType(), TimestampType(),DoubleType()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName("RealTimeEcommerce").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# StringType(), TimestampType(), DoubleType()

schema = ...

✅ Odczyt danych z katalogu:

stream = (spark.readStream
          .schema(schema)
          .json("data/stream"))

query = (stream.writeStream
         .format("console")
         .foreachBatch(process_batch)
         .start())

🔹 Bezstanowe zliczanie zdarzen

  1. Przygotuj zmienną agg1 zliczającą zdarzenia należące do danej grupy event_type.
agg1 = (stream....)

# pamietaj, że agregacje wymagają opcji complete
query = (agg1
         .writeStream
         .outputMode("complete")
         .format("console")
         .foreachBatch(process_batch)
         .start()
        )

🔹 Agregacja w oknach czasowych

withWatermark("timestamp", "1 minute")

💡 Do czego służy: Informuje Sparka, że dane przychodzą z opóźnieniem i należy je przetwarzać tylko do określonego limitu wstecz (tutaj: 1 minuta).

🚨 Dlaczego ważne: Bez watermarku Spark trzymałby w pamięci wszystkie dane, by móc je jeszcze pogrupować. Watermark pozwala zwolnić pamięć.

  1. Pogrupuj typy zdarzen w thumbling window, w oknie co 5 minut
  2. dodaj watermark z ustawieniem na 1 minutę.
windowed = (stream...)

query = (
    windowed.writeStream
    .outputMode("append")
    .foreachBatch(process_batch)
    .format("console")
    .start()
)
  1. Zmień thumbling window na sliding window z szerokością okna 5 minut i startem nowego okna co 1 minutę.
windowed = (stream...)

query = (
    windowed.writeStream
    .outputMode("append")
    .foreachBatch(process_batch)
    .format("console")
    .start()
)

🔹 Segmentacja klientow

🧩 Logika segmentacji:

  1. jeśli był purchase → “Buyer”
  2. jeśli był cart, ale nie purchase → “Cart abandoner”
  3. jeśli tylko view → “Lurker”

groupBy(window(...), "user_id")

💡 Do czego służy: Grupujemy dane per użytkownik w konkretnym przedziale czasu (oknie 5-minutowym).

⏱️ window(“timestamp”, “5 minutes”): Funkcja okna czasowego – każda grupa będzie dotyczyć jednego użytkownika w konkretnym 5-minutowym interwale.

agg(collect_set("event_type"))

💡 Do czego służy: Zbiera wszystkie typy zdarzeń (view, cart, purchase) danego użytkownika w danym oknie.

🧠 Dlaczego collect_set a nie collect_list?: collect_set usuwa duplikaty — interesuje nas tylko czy coś się zdarzyło, a nie ile razy.

  1. withColumn(… expr(…))

💡 Do czego służy: Na podstawie zbioru zdarzeń określamy segment użytkownika.

🔎 array_contains: Funkcja sprawdzająca, czy dany typ zdarzenia znajduje się w tablicy.

🧠 Co warto wiedzieć:

  • Segmentacja to klasyczne zastosowanie agregacji i transformacji strumienia.
  • Łączenie window + watermark jest kluczowe do kontroli stanu.
  • collect_set umożliwia prostą analizę zachowań, bez potrzeby przechowywania surowych danych.
  • expr() daje elastyczność, by używać składni SQL wewnątrz kodu DataFrame.

⚙️ Porownanie trybow outputMode

outputMode Opis Kiedy uzywac Wymagania
append Wypisywane sa tylko nowe wiersze Filtrowanie, wzbogacanie bez agregacji Nie dziala z groupBy
update Wypisywane sa tylko zmienione wiersze Agregacje z watermarkami Wymaga zarzadzania stanem
complete Wypisywane jest calosciowe podsumowanie Podsumowania okien, snapshoty Moze byc kosztowne