Wykład 5 — Apache Spark i Structured Streaming

Analiza danych w czasie rzeczywistym

Apache Spark jako silnik rozproszonego przetwarzania, Structured Streaming, integracja Spark–Kafka, kompletny pipeline real-time analytics.
Note{{< fa clock >}} Czas trwania: 1,5h

Cel wykładu: Poznanie Apache Spark jako silnika do rozproszonego przetwarzania danych. Wprowadzenie do Structured Streaming i integracji Spark–Kafka. Przegląd kompletnego pipeline’u real-time analytics.


1 Dlaczego Spark?

Na poprzednich wykładach poznaliśmy Kafkę — system, który transportuje strumienie danych. Ale Kafka sama w sobie nie wykonuje złożonych analiz. Do tego potrzebujemy silnika przetwarzania — i tu wchodzi Apache Spark.

Spark to silnik do rozproszonego przetwarzania danych, który obsługuje zarówno tryb wsadowy (batch), jak i strumieniowy (streaming). Powstał na UC Berkeley w 2009 roku jako odpowiedź na ograniczenia Hadoop MapReduce — głównie na jego powolność wynikającą z ciągłego zapisu na dysk.

Tip{{< fa memory >}} Przetwarzanie w pamięci

Dane trzymane w RAM, nie na dysku. Nawet 100x szybciej niż MapReduce.

Tip{{< fa cubes >}} Jeden silnik, wiele trybów

Batch, streaming, SQL, ML, grafy — wszystko w jednym frameworku.

Tip{{< fa python >}} API w wielu językach

PySpark, Scala, Java, R.

Tip{{< fa wand-magic-sparkles >}} Lazy evaluation

Spark buduje plan wykonania i optymalizuje go zanim cokolwiek obliczy.

flowchart TB
    subgraph "Apache Spark"
        CORE["Spark Core\n(RDD, zarządzanie pamięcią)"]
        SQL["Spark SQL\n& DataFrames"]
        SS["Structured\nStreaming"]
        ML["MLlib\n(Machine Learning)"]
        GR["GraphX\n(Grafy)"]
    end
    SQL --> CORE
    SS --> CORE
    ML --> CORE
    GR --> CORE

    style CORE fill:#FF9800,color:#fff
    style SQL fill:#2196F3,color:#fff
    style SS fill:#F44336,color:#fff
    style ML fill:#4CAF50,color:#fff
    style GR fill:#9C27B0,color:#fff

Ekosystem Apache Spark


2 PySpark — podstawy

2.1 SparkSession — punkt wejścia

Każdy program PySpark zaczyna się od utworzenia SparkSession:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MojaPierwszaAplikacja") \
    .master("local[*]") \
    .getOrCreate()

local[*] oznacza: uruchom Sparka lokalnie, używając wszystkich dostępnych rdzeni procesora. W produkcji zamiast tego podaje się adres klastra.

2.2 DataFrame API

Spark DataFrame to odpowiednik tabeli SQL lub DataFrame z Pandas — ale rozproszony na wielu maszynach.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count

spark = SparkSession.builder.appName("Demo").master("local[*]").getOrCreate()

# Tworzenie DataFrame
dane = [
    ("Warszawa", "Elektronika", 4299.00),
    ("Kraków", "Odzież", 189.99),
    ("Warszawa", "Żywność", 87.50),
    ("Gdańsk", "Elektronika", 2599.00),
    ("Kraków", "Elektronika", 1299.00),
    ("Warszawa", "Odzież", 349.99),
    ("Gdańsk", "Żywność", 156.00),
]

df = spark.createDataFrame(dane, ["miasto", "kategoria", "kwota"])
df.show()

2.3 Transformacje i akcje

Spark rozróżnia dwa typy operacji:

Definiują co chcesz zrobić, ale nie wykonują obliczeń. Są leniwe (lazy).

Przykłady: filter, groupBy, select, join.

# Transformacje (lazy — nic się jeszcze nie liczy)
wynik = df \
    .filter(col("kwota") > 100) \
    .groupBy("miasto") \
    .agg(
        sum("kwota").alias("suma"),
        count("kwota").alias("liczba"),
        avg("kwota").alias("srednia")
    )

Wyzwalają faktyczne obliczenia.

Przykłady: show, count, collect, write.

# Akcja (tu Spark faktycznie wykonuje obliczenia)
wynik.show()

spark.stop()

Możesz też używać zwykłego SQL-a:

df.createOrReplaceTempView("sprzedaz")

spark.sql("""
    SELECT miasto,
           SUM(kwota) as suma,
           COUNT(*) as liczba
    FROM sprzedaz
    WHERE kwota > 100
    GROUP BY miasto
    ORDER BY suma DESC
""").show()

3 Structured Streaming — strumienie jako tabele

ImportantKluczowa koncepcja

Structured Streaming traktuje strumień danych jak tabelę, do której ciągle dopisywane są nowe wiersze. Dzięki temu piszesz kod strumieniowy prawie identycznie jak batch — używasz tego samego DataFrame API.

flowchart TB
    subgraph "Strumień wejściowy"
        T1["Batch t1"] --> TAB["Nieograniczona\ntabela wejściowa\n(nowe wiersze\ndopisywane ciągle)"]
        T2["Batch t2"] --> TAB
        T3["Batch t3"] --> TAB
        T4["Batch t4..."] --> TAB
    end
    TAB -->|"Zapytanie\n(query)"| RES["Tabela wynikowa\n(aktualizowana\nz każdym triggerem)"]
    RES --> OUT["Wyjście:\nkonsola / Kafka / pliki / baza"]

    style TAB fill:#2196F3,color:#fff
    style RES fill:#4CAF50,color:#fff
    style OUT fill:#FF9800,color:#fff

Structured Streaming: strumień jako nieograniczona tabela

3.1 Tryby wyjścia (Output Modes)

Note{{< fa plus >}} Append

Do wyniku dopisywane są tylko nowe wiersze. Domyślny tryb.

Warning{{< fa arrows-rotate >}} Complete

Cała tabela wynikowa jest nadpisywana (np. po agregacji).

Tip{{< fa pen >}} Update

Zapisywane są tylko zmienione wiersze.

3.2 Przykład: streaming z plików CSV

Najprostszy przykład — Spark monitoruje katalog i przetwarza nowe pliki CSV w miarę ich pojawiania się:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, sum
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

spark = SparkSession.builder.appName("StreamingDemo").master("local[*]").getOrCreate()

# Schemat danych
schema = StructType() \
    .add("transaction_id", StringType()) \
    .add("amount", DoubleType()) \
    .add("store", StringType()) \
    .add("timestamp", TimestampType())

# Czytanie strumienia z katalogu CSV
streaming_df = spark.readStream \
    .schema(schema) \
    .option("header", True) \
    .csv("/data/incoming/")

# Agregacja w oknach 5-minutowych
result = streaming_df \
    .groupBy(
        window(col("timestamp"), "5 minutes"),
        col("store")
    ) \
    .agg(sum("amount").alias("total"))

# Zapis wyniku do konsoli
query = result.writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime="10 seconds") \
    .start()

# query.awaitTermination()

4 Integracja Spark + Kafka

ImportantSerce kursu

Kafka transportuje dane, Spark je przetwarza. To fundamentalna para w architekturze real-time analytics.

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, avg, count
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

spark = SparkSession.builder \
    .appName("KafkaStreaming") \
    .master("local[*]") \
    .getOrCreate()

# Odczyt strumienia z Kafki
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "transakcje") \
    .option("startingOffsets", "latest") \
    .load()

# Kafka zwraca dane jako bajty — trzeba je sparsować
schema = StructType() \
    .add("id", StringType()) \
    .add("kwota", DoubleType()) \
    .add("sklep", StringType()) \
    .add("czas", TimestampType())

parsed_df = kafka_df \
    .selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.*")
# Agregacja: średnia kwota i liczba transakcji
# w oknach 5-minutowych per sklep
wynik = parsed_df \
    .withWatermark("czas", "2 minutes") \
    .groupBy(
        window(col("czas"), "5 minutes"),
        col("sklep")
    ) \
    .agg(
        avg("kwota").alias("srednia_kwota"),
        count("*").alias("liczba_transakcji")
    )

# Zapis do konsoli (na lab: do bazy danych lub dashboardu)
query = wynik.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", False) \
    .trigger(processingTime="10 seconds") \
    .start()

# query.awaitTermination()

Zwróć uwagę na .withWatermark("czas", "2 minutes") — to mechanizm watermarkingu, o którym mówiliśmy na Wykładzie 2. Spark toleruje opóźnienie zdarzeń do 2 minut i automatycznie odrzuca te, które przyszły później.


5 Kompletny pipeline — podsumowanie kursu

Łączmy wszystko, co poznaliśmy na wykładach:

flowchart LR
    subgraph "Źródła danych"
        WEB["Aplikacja\nwebowa"]
        PAY["System\npłatności"]
        IOT["Sensory\nIoT"]
    end
    subgraph "Transport"
        K["Apache Kafka\n(W4)"]
    end
    subgraph "Przetwarzanie"
        SS["Spark Streaming\n(W5)"]
        ML["FastAPI + ML\n(W3)"]
    end
    subgraph "Wyniki"
        DASH["Dashboard\n(near RT)"]
        ALERT["Alerty\n(real-time)"]
        REP["Raporty\n(batch)"]
    end

    WEB --> K
    PAY --> K
    IOT --> K
    K --> SS --> DASH
    K --> ML --> ALERT
    SS --> REP

    style K fill:#FF9800,color:#fff
    style SS fill:#2196F3,color:#fff
    style ML fill:#4CAF50,color:#fff
    style DASH fill:#E3F2FD,stroke:#2196F3
    style ALERT fill:#FFEBEE,stroke:#F44336
    style REP fill:#F3E5F5,stroke:#9C27B0

Kompletny pipeline real-time analytics — podsumowanie kursu

Mapa kursu
Komponent Rola Wykład
Typy danych, OLTP/OLAP, Data Lake Kontekst i historia W1
Lambda/Kappa, okna czasowe Architektura W2
ML batch vs online, SGD, anomalie Modele i algorytmy W3
Apache Kafka Transport danych W4
Apache Spark + Structured Streaming Przetwarzanie W5

6 Złożoność algorytmów — uwaga praktyczna

CautionProjektując pipeline real-time, rozważ wymiar obliczeniowy
  • Duże dane, proste obliczenia — np. filtrowanie logów, agregacje. Spark radzi sobie świetnie.
  • Małe dane, ciężkie obliczenia — np. trenowanie modelu deep learning. Lepiej offline na GPU.
  • Duże dane, ciężkie obliczenia — np. analiza wideo w czasie rzeczywistym. Wymaga specjalistycznej architektury (GPU cluster, edge computing).

Dla systemów real-time kluczowe jest, żeby czas przetwarzania jednego zdarzenia był krótszy niż interwał między zdarzeniami. Inaczej kolejka rośnie bez końca.


7 Co dalej — laboratoria

Na laboratoriach zbudujecie kompletny system krok po kroku:

Plan laboratoriów
Lab Temat
1–2 Środowisko (Git, Docker, Python), analiza eksploracyjna danych
3–4 Model ML (scikit-learn), przygotowanie do wdrożenia
5–6 Kafka w Dockerze — producent i konsument w Pythonie
7–8 PySpark + Structured Streaming + Kafka
9–10 Kompletny pipeline + projekt grupowy

8 Podsumowanie

Apache Spark to silnik, który zamienia surowe strumienie danych w wartość biznesową. W połączeniu z Kafką tworzy potężną platformę real-time analytics. Structured Streaming upraszcza programowanie strumieniowe — piszesz kod jak dla batch, a Spark zajmuje się resztą.

Important{{< fa flag-checkered >}} To był ostatni wykład

Na laboratoriach przełożycie tę wiedzę na praktykę — od pierwszego docker compose up po kompletny system przetwarzania danych w czasie rzeczywistym.

Tip{{< fa laptop-code >}} Przed Lab 1

Upewnij się, że masz zainstalowanego Dockera i Gita (patrz strona Narzędzia). Sklonuj repozytorium kursu i uruchom docker compose up — jeśli działa, jesteś gotowy.