Real-Time Analytics Lectures for SGH students

Laboratory 4 - Apache Spark Batch processing

Batch processing -

simple example in Spark DataFrame

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .appName("testowanie")\
        .getOrCreate()


df = spark.read\
     .option('header','True')\
     .option('inferSchema','True')\
     .option('sep',',')\
     .csv("diabetes.csv")

df.write.format('parquet')\
.mode("overwrite")\
.option("compression", "snappy")\
.save("diab_parquet")

df.columns

df.count()

df.schema

df.show(5)

df.printSchema()
numeric_features = [t[0] for t in df.dtypes if t[1]== 'int']
df.select(numeric_features).describe().toPandas().transpose()

df.groupby("Outcome").count().show()

from pyspark.sql.functions import *
# check if there are NaN values
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).toPandas().head()

UDF

from pyspark.sql.functions import udf

y_udf = udf(lambda y: "No" if y==0 else "Yes", StringType()) 
df = df.withColumn('HasDiab', y_udf('OutCome')).drop("OutCome")
df.show(2, vertical=True)
df.printSchema()

# feature bining
def udf_multiple(age):
    if age <= 25:
        return 'Under 25'
    elif age >= 25 and age <= 35:
        return 'Between 25 and 35'
    elif age > 35 and age < 50:
        return 'Between 36 and 49'
    elif age >= 50:
        return 'Over 50'
    else: return 'N/A'

education_udf = udf(udf_multiple)

df=df.withColumn("Age_udf", education_udf("Age"))

df.show(2, vertical=True)

Scores data

df = spark.read.option("header", "true").option('inferSchema', "true").option("sep", ",").csv("scores.csv")
df.show()

from pyspark.ml.feature import VectorAssembler
cols = ['score1', 'score2']
assembler = VectorAssembler(inputCols = cols , outputCol="features")
featureDF = assembler.transform(df).drop(*cols)

featureDF.show()
featureDF.printSchema()
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="result", outputCol="label")
labelDF = indexer.fit(featureDF).transform(featureDF)


train, test = labelDF.randomSplit([0.8, 0.2], seed=24)

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=5)
model = lr.fit(train)

pred_train = model.transform(train)
pred_test = model.transform(test)

pred_train.toPandas()
pred_train.show(2, vertical=True)


pred_train.select(['probability', 'prediction']).toPandas()

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
model.summary.roc.show(3)
import matplotlib.pyplot as plt

train_summ = model.summary
roc = train_summ.roc.toPandas()
plt.plot(roc['FPR'], roc['TPR'])
plt.title('Roc Curve')
plt.show()

print(f"gini:  {2*evaluator.evaluate(pred_test)-1}")

# spark stream on data frame 
spark.stop()

Stream DataFrame

from pyspark.sql import SparkSession
import pyspark.sql.functions as f

spark = SparkSession\
.builder\
.master("local[2]")\
.appName("Stream_DF")\
.getOrCreate()


# inner database for spark
spark.sql("Create database test")
spark.catalog.listDatabases()

spark.sparkContext.setLogLevel("ERROR")

# load data 
lines = spark\
.readStream\
.format("socket")\
.option("host", "localhost")\
.option("port", 9999)\
.load()

# transform data 
words = lines\
.select(f.explode(f.split(lines.value, " "))\
        .alias("word"))
wordCounts = words.groupBy("word").count()

# save results 
query = (wordCounts\
.writeStream\
.outputMode("complete")\
.format("console")\
.start())
query.awaitTermination()

new file

%%file spark2.py

from pyspark.sql import SparkSession
import pyspark.sql.functions as f

if __name__ == "__main__":
    spark = SparkSession.builder.master("local[2]").appName("Stream_DF").getOrCreate()
    print("="*50)
    print("Zaczynamy DataFrame")
    print("="*50)
    spark.sparkContext.setLogLevel("ERROR")
    lines = spark.readStream\
        .format("socket")\
        .option("host", "localhost")\
        .option("port", 9999)\
        .load()
    words = lines.select(f.explode(f.split(lines.value, " ")).alias("word"))
    wordCounts = words.groupBy("word").count()
    query = wordCounts.writeStream.outputMode("complete").format("console").start()
    query.awaitTermination()
    #query.stop()

run it

! spark-submit spark2.py