Intorduction to Apache Spark
from pyspark.sql import SparkSession
= SparkSession.builder.appName("Lab6").getOrCreate() spark
spark
- in-memory
SparkSession
SparkContext
...
Example 1 - time series as batch processing
= ((12.5, "2019-01-02 12:00:00"),
temp 17.6, "2019-01-02 12:00:20"),
(14.6, "2019-01-02 12:00:30"),
(22.9, "2019-01-02 12:01:15"),
(17.4, "2019-01-02 12:01:30"),
(25.8, "2019-01-02 12:03:25"),
(27.1, "2019-01-02 12:02:40"),
( )
To create a Spark DataFrame ypu can use a createDataFrame
method on spark object. In the Spark DataFrame we can specify the schema of the data.
Let’s defined our schema as follows:
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
= StructType([
schema "temp", DoubleType(), True),
StructField("time", StringType(), True),
StructField( ])
As you can see all elements beyond the column name and True parameter are considered as python objects.
= (spark.createDataFrame(temp, schema=schema)
df "time", to_timestamp("time"))) .withColumn(
Let’s see how it looks like now.
df.printSchema()
root|-- temperatura: double (nullable = true)
|-- czas: timestamp (nullable = true)
Next, let’s see the data:
print("Schema:")
df.show()+-----------+-------------------+
|temperatura| czas|
+-----------+-------------------+
| 12.5|2019-01-02 12:00:00|
| 17.6|2019-01-02 12:00:20|
| 14.6|2019-01-02 12:00:30|
| 22.9|2019-01-02 12:01:15|
| 17.4|2019-01-02 12:01:30|
| 25.8|2019-01-02 12:03:25|
| 27.1|2019-01-02 12:02:40|
+-----------+-------------------+
Spark as SQL
In Spark we can use SQL queries to get data from our DataFrame.
"df")
df.createOrReplaceTempView("SELECT * FROM df where temp > 21").show()
spark.sql(+-------------------+-----------+
| czas|temperatura|
+-------------------+-----------+
|2019-01-02 12:01:15| 22.9|
|2019-01-02 12:03:25| 25.8|
|2019-01-02 12:02:40| 27.1|
+-------------------+-----------+
Data gruping
Standard groupby on data from a time series will give us a result with a number of rows in each group. Because the time variables have different values, the number of groups obtained will be equal to the number of rows in the table.
= df.groupBy("time").count()
df2 df2.show()
You can use the window
function to group data by time intervals. For example, we can group data by 30-second intervals.
# Thumbling window
import pyspark.sql.functions as F
= df.groupBy(F.window("time","30 seconds")).count()
df2 =False)
df2.show(truncate
+------------------------------------------+-----+
|window |count|
+------------------------------------------+-----+
|{2019-01-02 12:00:00, 2019-01-02 12:00:30}|2 |
|{2019-01-02 12:00:30, 2019-01-02 12:01:00}|1 |
|{2019-01-02 12:01:00, 2019-01-02 12:01:30}|1 |
|{2019-01-02 12:01:30, 2019-01-02 12:02:00}|1 |
|{2019-01-02 12:03:00, 2019-01-02 12:03:30}|1 |
|{2019-01-02 12:02:30, 2019-01-02 12:03:00}|1 |
+------------------------------------------+-----+
Let’s check the schema of the resulting DataFrame
df2.printSchema()
root|-- window: struct (nullable = false)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- count: long (nullable = false)
The main difference between pandas dataframes and spark dataframes is that in spark dataframes you can use complex types - for example struct
.
🔌 Data sources in Spark Structured Streaming
Spark Streaming can be used to process data in real time from various sources. The most popular sources of streaming data are:
✅ rate — only for testing
- in each dataframe we have the following columns:
- timestamp
- value – count numbers (0, 1, 2, …).
= spark.readStream.format("rate").option("rowsPerSecond", 1).load() df
📡 Stream sources:
Socket
(only for testing: nc -lk 9999): Read stream from a socket.
Files
: connect to a directory and read files as streaming data. Can be used with the following file formats: CSV, JSON, ORC or Parquet (np. .csv, .json, .parquet).
Kafka
: Read data stream from Apache Kafka.
Let’s create a stream from the rate
source:
%%file streamrate.py
## run with spark-submit streamrate.py
from pyspark.sql import SparkSession
= SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark "WARN")
spark.sparkContext.setLogLevel(
= (spark.readStream
df format("rate")
."rowsPerSecond", 1)
.option(
.load()
)
= (df.writeStream
query format("console")
."append")
.outputMode("truncate", False)
.option(
.start()
)
query.awaitTermination()
But, You can run it also on jupyter notebook:
from pyspark.sql import SparkSession
= SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark "WARN")
spark.sparkContext.setLogLevel(
def process_batch(df, batch_id, tstop=5):
print(f"Batch ID: {batch_id}")
=False)
df.show(truncateif batch_id == tstop:
df.stop()
= (spark.readStream
df format("rate")
."rowsPerSecond", 1)
.option(
.load()
)
= (df.writeStream
query format("console")
."append")
.outputMode(
.foreachBatch(process_batch)"truncate", False)
.option(
.start() )