Wprowadzenie do Apache Spark

Apache Spark to ogólnego przeznaczenia, przetwarzający dane w pamięci silnik obliczeniowy. Spark może być używany wraz z Hadoopem, Yarnem i innymi komponentami Big Data, aby w pełni wykorzystać jego możliwości oraz poprawić wydajność aplikacji. Oferuje wysokopoziomowe interfejsy API w językach Scala, Java, Python, R i SQL.

Architektura Spark

Apache Spark działa w architekturze mistrz-podwładny (master-slave), gdzie główny węzeł nazywany jest „Driver”, a węzły podrzędne to „Workers”. Punktem startowym aplikacji Spark jest sc, czyli instancja klasy SparkContext, która działa wewnątrz Drivera.

Architektura Spark 1

Architektura Spark 2

Główne komponenty Apache Spark

Spark Core

Często nazywany również samym „Spark”. Podstawowym elementem Sparka jest RDD (Resilient Distributed Dataset) — odporna na błędy, rozproszona kolekcja danych, przetwarzana równolegle na wielu węzłach klastra.

Spark SQL

W tej bibliotece dane reprezentowane są jako DataFrame, czyli struktura danych podobna do tabeli relacyjnej. Spark SQL umożliwia analizę danych z użyciem składni podobnej do SQL oraz funkcji przetwarzających dane.

Spark Streaming

Biblioteka ta wprowadza pojęcie D-Stream (Discretized Stream) — strumienia danych dzielonego na małe porcje (mikropartie), które można przetwarzać niemal w czasie rzeczywistym.

MLlib

Biblioteka do uczenia maszynowego, zawierająca popularne algorytmy, takie jak filtrowanie kolaboracyjne, klasyfikacja, klasteryzacja czy regresja.

GraphX

Biblioteka służąca do przetwarzania grafów. Umożliwia rozwiązywanie problemów z zakresu teorii grafów, takich jak PageRank, komponenty spójne i inne.

Uruchomienie Apache Spark

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Lab6").getOrCreate()
spark

SparkSession - in-memory

SparkContext

...

Przykład 1 - dane ralizujące szereg czasowy

czujnik_temperatury = ((12.5, "2019-01-02 12:00:00"),
(17.6, "2019-01-02 12:00:20"),
(14.6,  "2019-01-02 12:00:30"),
(22.9,  "2019-01-02 12:01:15"),
(17.4,  "2019-01-02 12:01:30"),
(25.8,  "2019-01-02 12:03:25"),
(27.1,  "2019-01-02 12:02:40"),
)

Dane realizujące pomiar temperatury w czasie.

Aby wygenerować DataFrame należy użyć metod createDataFrame. Jednak należy pamiętać aby zdefiniować typy danych.

W następnych laboratoriach szerzej opiszemy typy danych w Sparku.

Zdefiuniujmy schemat naszych danych.

from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schema = StructType([
    StructField("temperatura", DoubleType(), True),
    StructField("czas", StringType(), True),
])

Jak widać praktycznie wszystkie elementy poza nazwą kolumny oraz parametrem True są przedstawione jako obiekty.

df = (spark.createDataFrame(czujnik_temperatury, schema=schema)
      .withColumn("czas", to_timestamp("czas")))

Sprawdźmy jak wygląda schemat utworzonej tabeli.

df.printSchema()

root
 |-- temperatura: double (nullable = true)
 |-- czas: timestamp (nullable = true)

Następnie możemy sprawdzić jak przedstawia się sama tabela.

df.show()
+-----------+-------------------+
|temperatura|               czas|
+-----------+-------------------+
|       12.5|2019-01-02 12:00:00|
|       17.6|2019-01-02 12:00:20|
|       14.6|2019-01-02 12:00:30|
|       22.9|2019-01-02 12:01:15|
|       17.4|2019-01-02 12:01:30|
|       25.8|2019-01-02 12:03:25|
|       27.1|2019-01-02 12:02:40|
+-----------+-------------------+

Spark jako SQL

Ramki danych w sparku pozwalają wykorzystać język sql:

df.createOrReplaceTempView("czujnik_temperatury")
spark.sql("SELECT * FROM czujnik_temperatury where temperatura > 21").show()
+-------------------+-----------+
|               czas|temperatura|
+-------------------+-----------+
|2019-01-02 12:01:15|       22.9|
|2019-01-02 12:03:25|       25.8|
|2019-01-02 12:02:40|       27.1|
+-------------------+-----------+

Grupowanie danych

Standardowy grupowanie danych w sparku po zmiennej “czas” wygeneruje nam liczbę wierszy w każdym grupie. Ze względu, iż zmienne czasowe mają różne wartości, ilość otrzymanych grup będzie równa ilości wierszy w tabeli.

df2 = df.groupBy("czas").count()
df2.show()

Wykorzystując funkcję window możemy wygenerować grupy czasowe w zależności od wybranego okna czasowego.

# Thumbling window

import pyspark.sql.functions as F

df2 = df.groupBy(F.window("czas","30 seconds")).count()
df2.show(truncate=False)

+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+-----+
|{2019-01-02 12:00:00, 2019-01-02 12:00:30}|2    |
|{2019-01-02 12:00:30, 2019-01-02 12:01:00}|1    |
|{2019-01-02 12:01:00, 2019-01-02 12:01:30}|1    |
|{2019-01-02 12:01:30, 2019-01-02 12:02:00}|1    |
|{2019-01-02 12:03:00, 2019-01-02 12:03:30}|1    |
|{2019-01-02 12:02:30, 2019-01-02 12:03:00}|1    |
+------------------------------------------+-----+

Sprawdźmy schemat

df2.printSchema()
root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)

Podstawowa różnica między pandasowymi ramkami danych i sparkowymi jest taka, że w komórce danych sparkowych można używać typów złożonych - np struct.

🔌 Źródła danych w Spark Structured Streaming

Spark Structured Streaming pozwala na przetwarzanie danych w czasie rzeczywistym z różnych źródeł strumieniowych. Najpopularniejsze z nich to:

✅ rate — źródło testowe

  • Automatycznie generuje dane: co sekundę dodaje wiersz.
  • Każdy wiersz zawiera:
  • timestamp – znacznik czasu,
  • value – licznik rosnący (0, 1, 2, …).
  • Używane do testowania logiki strumieniowania bez konieczności podpinania zewnętrznych źródeł.
df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

📡 Inne źródła strumieni:

Socket (do testów np. nc -lk 9999): To źródło nasłuchuje na wskazanym porcie gniazda (socket) i wczytuje dowolne dane do Spark Streaming. Również służy wyłącznie do celów testowych.

Plik (File): Nasłuchuje określonego katalogu i traktuje pojawiające się tam pliki jako dane strumieniowe. Obsługuje formaty takie jak CSV, JSON, ORC oraz Parquet (np. .csv, .json, .parquet).

Kafka: Odczytuje dane z Apache Kafka® i jest kompatybilne z brokerami w wersji 0.10.0 lub wyższej.

📤 Output Modes – tryby wypisywania wyników

outputMode określa jak Spark wypisuje dane po każdej mikroserii (micro-batch). Dostępne tryby to:

append Wypisuje tylko nowe wiersze, które zostały dodane w tej mikroserii. Najczęściej używany.

update Wypisuje zmienione wiersze - czyli zaktualizowane agregaty.

complete Wypisuje całą tabelę agregacji po każdej mikroserii. Wymaga pełnej agregacji (np. groupBy).

Utwórzmy nasz pierwszy strumieniowy DataFrame w Sparku, korzystając ze źródła danych typu rate.

Możemy zrealizować kod skryptu

%%file streamrate.py
## uruchom przez spark-submit streamrate.py

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = (spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .load()
)


query = (df.writeStream 
    .format("console") 
    .outputMode("append") 
    .option("truncate", False) 
    .start()
) 

query.awaitTermination()

Albo uruchomić kod w notatniku

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

def process_batch(df, batch_id, tstop=5):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id == tstop:
        df.stop()


df = (spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .load()
)

query = (df.writeStream 
    .format("console") 
    .outputMode("append")
    .foreachBatch(process_batch)
    .option("truncate", False) 
    .start()
)