Wprowadzenie do Apache Spark
Apache Spark to ogólnego przeznaczenia, przetwarzający dane w pamięci silnik obliczeniowy. Spark może być używany wraz z Hadoopem, Yarnem i innymi komponentami Big Data, aby w pełni wykorzystać jego możliwości oraz poprawić wydajność aplikacji. Oferuje wysokopoziomowe interfejsy API w językach Scala, Java, Python, R i SQL.
Architektura Spark
Apache Spark działa w architekturze mistrz-podwładny (master-slave), gdzie główny węzeł nazywany jest „Driver”, a węzły podrzędne to „Workers”. Punktem startowym aplikacji Spark jest sc, czyli instancja klasy SparkContext, która działa wewnątrz Drivera.


Główne komponenty Apache Spark
Spark Core
Często nazywany również samym „Spark”. Podstawowym elementem Sparka jest RDD (Resilient Distributed Dataset) — odporna na błędy, rozproszona kolekcja danych, przetwarzana równolegle na wielu węzłach klastra.
Spark SQL
W tej bibliotece dane reprezentowane są jako DataFrame, czyli struktura danych podobna do tabeli relacyjnej. Spark SQL umożliwia analizę danych z użyciem składni podobnej do SQL oraz funkcji przetwarzających dane.
Spark Streaming
Biblioteka ta wprowadza pojęcie D-Stream (Discretized Stream) — strumienia danych dzielonego na małe porcje (mikropartie), które można przetwarzać niemal w czasie rzeczywistym.
MLlib
Biblioteka do uczenia maszynowego, zawierająca popularne algorytmy, takie jak filtrowanie kolaboracyjne, klasyfikacja, klasteryzacja czy regresja.
GraphX
Biblioteka służąca do przetwarzania grafów. Umożliwia rozwiązywanie problemów z zakresu teorii grafów, takich jak PageRank, komponenty spójne i inne.
Uruchomienie Apache Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Lab6").getOrCreate()spark
SparkSession - in-memory
SparkContext
...Przykład 1 - dane ralizujące szereg czasowy
czujnik_temperatury = ((12.5, "2019-01-02 12:00:00"),
(17.6, "2019-01-02 12:00:20"),
(14.6, "2019-01-02 12:00:30"),
(22.9, "2019-01-02 12:01:15"),
(17.4, "2019-01-02 12:01:30"),
(25.8, "2019-01-02 12:03:25"),
(27.1, "2019-01-02 12:02:40"),
)Dane realizujące pomiar temperatury w czasie.
Aby wygenerować DataFrame należy użyć metod createDataFrame. Jednak należy pamiętać aby zdefiniować typy danych.
W następnych laboratoriach szerzej opiszemy typy danych w Sparku.
Zdefiuniujmy schemat naszych danych.
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema = StructType([
StructField("temperatura", DoubleType(), True),
StructField("czas", StringType(), True),
])Jak widać praktycznie wszystkie elementy poza nazwą kolumny oraz parametrem True są przedstawione jako obiekty.
df = (spark.createDataFrame(czujnik_temperatury, schema=schema)
.withColumn("czas", to_timestamp("czas")))Sprawdźmy jak wygląda schemat utworzonej tabeli.
df.printSchema()
root
|-- temperatura: double (nullable = true)
|-- czas: timestamp (nullable = true)
Następnie możemy sprawdzić jak przedstawia się sama tabela.
df.show()
+-----------+-------------------+
|temperatura| czas|
+-----------+-------------------+
| 12.5|2019-01-02 12:00:00|
| 17.6|2019-01-02 12:00:20|
| 14.6|2019-01-02 12:00:30|
| 22.9|2019-01-02 12:01:15|
| 17.4|2019-01-02 12:01:30|
| 25.8|2019-01-02 12:03:25|
| 27.1|2019-01-02 12:02:40|
+-----------+-------------------+Spark jako SQL
Ramki danych w sparku pozwalają wykorzystać język sql:
df.createOrReplaceTempView("czujnik_temperatury")
spark.sql("SELECT * FROM czujnik_temperatury where temperatura > 21").show()
+-------------------+-----------+
| czas|temperatura|
+-------------------+-----------+
|2019-01-02 12:01:15| 22.9|
|2019-01-02 12:03:25| 25.8|
|2019-01-02 12:02:40| 27.1|
+-------------------+-----------+Grupowanie danych
Standardowy grupowanie danych w sparku po zmiennej “czas” wygeneruje nam liczbę wierszy w każdym grupie. Ze względu, iż zmienne czasowe mają różne wartości, ilość otrzymanych grup będzie równa ilości wierszy w tabeli.
df2 = df.groupBy("czas").count()
df2.show()Wykorzystując funkcję window możemy wygenerować grupy czasowe w zależności od wybranego okna czasowego.
# Thumbling window
import pyspark.sql.functions as F
df2 = df.groupBy(F.window("czas","30 seconds")).count()
df2.show(truncate=False)
+------------------------------------------+-----+
|window |count|
+------------------------------------------+-----+
|{2019-01-02 12:00:00, 2019-01-02 12:00:30}|2 |
|{2019-01-02 12:00:30, 2019-01-02 12:01:00}|1 |
|{2019-01-02 12:01:00, 2019-01-02 12:01:30}|1 |
|{2019-01-02 12:01:30, 2019-01-02 12:02:00}|1 |
|{2019-01-02 12:03:00, 2019-01-02 12:03:30}|1 |
|{2019-01-02 12:02:30, 2019-01-02 12:03:00}|1 |
+------------------------------------------+-----+Sprawdźmy schemat
df2.printSchema()
root
|-- window: struct (nullable = false)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- count: long (nullable = false)Podstawowa różnica między pandasowymi ramkami danych i sparkowymi jest taka, że w komórce danych sparkowych można używać typów złożonych - np struct.
🔌 Źródła danych w Spark Structured Streaming
Spark Structured Streaming pozwala na przetwarzanie danych w czasie rzeczywistym z różnych źródeł strumieniowych. Najpopularniejsze z nich to:
✅ rate — źródło testowe
- Automatycznie generuje dane: co sekundę dodaje wiersz.
- Każdy wiersz zawiera:
- timestamp – znacznik czasu,
- value – licznik rosnący (0, 1, 2, …).
- Używane do testowania logiki strumieniowania bez konieczności podpinania zewnętrznych źródeł.
df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()📡 Inne źródła strumieni:
Socket (do testów np. nc -lk 9999): To źródło nasłuchuje na wskazanym porcie gniazda (socket) i wczytuje dowolne dane do Spark Streaming. Również służy wyłącznie do celów testowych.
Plik (File): Nasłuchuje określonego katalogu i traktuje pojawiające się tam pliki jako dane strumieniowe. Obsługuje formaty takie jak CSV, JSON, ORC oraz Parquet (np. .csv, .json, .parquet).
Kafka: Odczytuje dane z Apache Kafka® i jest kompatybilne z brokerami w wersji 0.10.0 lub wyższej.
📤 Output Modes – tryby wypisywania wyników
outputMode określa jak Spark wypisuje dane po każdej mikroserii (micro-batch). Dostępne tryby to:
append Wypisuje tylko nowe wiersze, które zostały dodane w tej mikroserii. Najczęściej używany.
update Wypisuje zmienione wiersze - czyli zaktualizowane agregaty.
complete Wypisuje całą tabelę agregacji po każdej mikroserii. Wymaga pełnej agregacji (np. groupBy).
Utwórzmy nasz pierwszy strumieniowy DataFrame w Sparku, korzystając ze źródła danych typu rate.
Możemy zrealizować kod skryptu
%%file streamrate.py
## uruchom przez spark-submit streamrate.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
df = (spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)
query = (df.writeStream
.format("console")
.outputMode("append")
.option("truncate", False)
.start()
)
query.awaitTermination()Albo uruchomić kod w notatniku
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
def process_batch(df, batch_id, tstop=5):
print(f"Batch ID: {batch_id}")
df.show(truncate=False)
if batch_id == tstop:
df.stop()
df = (spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)
query = (df.writeStream
.format("console")
.outputMode("append")
.foreachBatch(process_batch)
.option("truncate", False)
.start()
)