Intorduction to Apache Spark

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Lab6").getOrCreate()
spark

SparkSession - in-memory

SparkContext

...

Example 1 - time series as batch processing

temp = ((12.5, "2019-01-02 12:00:00"),
(17.6, "2019-01-02 12:00:20"),
(14.6,  "2019-01-02 12:00:30"),
(22.9,  "2019-01-02 12:01:15"),
(17.4,  "2019-01-02 12:01:30"),
(25.8,  "2019-01-02 12:03:25"),
(27.1,  "2019-01-02 12:02:40"),
)

To create a Spark DataFrame ypu can use a createDataFrame method on spark object. In the Spark DataFrame we can specify the schema of the data.

Let’s defined our schema as follows:

from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schema = StructType([
    StructField("temp", DoubleType(), True),
    StructField("time", StringType(), True),
])

As you can see all elements beyond the column name and True parameter are considered as python objects.

df = (spark.createDataFrame(temp, schema=schema)
      .withColumn("time", to_timestamp("time")))

Let’s see how it looks like now.

df.printSchema()

root
 |-- temperatura: double (nullable = true)
 |-- czas: timestamp (nullable = true)

Next, let’s see the data:

print("Schema:")
df.show()
+-----------+-------------------+
|temperatura|               czas|
+-----------+-------------------+
|       12.5|2019-01-02 12:00:00|
|       17.6|2019-01-02 12:00:20|
|       14.6|2019-01-02 12:00:30|
|       22.9|2019-01-02 12:01:15|
|       17.4|2019-01-02 12:01:30|
|       25.8|2019-01-02 12:03:25|
|       27.1|2019-01-02 12:02:40|
+-----------+-------------------+

Spark as SQL

In Spark we can use SQL queries to get data from our DataFrame.

df.createOrReplaceTempView("df")
spark.sql("SELECT * FROM df where temp > 21").show()
+-------------------+-----------+
|               czas|temperatura|
+-------------------+-----------+
|2019-01-02 12:01:15|       22.9|
|2019-01-02 12:03:25|       25.8|
|2019-01-02 12:02:40|       27.1|
+-------------------+-----------+

Data gruping

Standard groupby on data from a time series will give us a result with a number of rows in each group. Because the time variables have different values, the number of groups obtained will be equal to the number of rows in the table.

df2 = df.groupBy("time").count()
df2.show()

You can use the window function to group data by time intervals. For example, we can group data by 30-second intervals.

# Thumbling window

import pyspark.sql.functions as F

df2 = df.groupBy(F.window("time","30 seconds")).count()
df2.show(truncate=False)

+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+-----+
|{2019-01-02 12:00:00, 2019-01-02 12:00:30}|2    |
|{2019-01-02 12:00:30, 2019-01-02 12:01:00}|1    |
|{2019-01-02 12:01:00, 2019-01-02 12:01:30}|1    |
|{2019-01-02 12:01:30, 2019-01-02 12:02:00}|1    |
|{2019-01-02 12:03:00, 2019-01-02 12:03:30}|1    |
|{2019-01-02 12:02:30, 2019-01-02 12:03:00}|1    |
+------------------------------------------+-----+

Let’s check the schema of the resulting DataFrame

df2.printSchema()
root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)

The main difference between pandas dataframes and spark dataframes is that in spark dataframes you can use complex types - for example struct.

🔌 Data sources in Spark Structured Streaming

Spark Streaming can be used to process data in real time from various sources. The most popular sources of streaming data are:

✅ rate — only for testing

  • in each dataframe we have the following columns:
  • timestamp
  • value – count numbers (0, 1, 2, …).
df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

📡 Stream sources:

Socket (only for testing: nc -lk 9999): Read stream from a socket.

Files: connect to a directory and read files as streaming data. Can be used with the following file formats: CSV, JSON, ORC or Parquet (np. .csv, .json, .parquet).

Kafka: Read data stream from Apache Kafka.

Let’s create a stream from the rate source:

%%file streamrate.py
## run with spark-submit streamrate.py

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = (spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .load()
)


query = (df.writeStream 
    .format("console") 
    .outputMode("append") 
    .option("truncate", False) 
    .start()
) 

query.awaitTermination()

But, You can run it also on jupyter notebook:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

def process_batch(df, batch_id, tstop=5):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id == tstop:
        df.stop()


df = (spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .load()
)

query = (df.writeStream 
    .format("console") 
    .outputMode("append")
    .foreachBatch(process_batch)
    .option("truncate", False) 
    .start()
)