simple example in Spark DataFrame
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.appName("testowanie")\
.getOrCreate()
df = spark.read\
.option('header','True')\
.option('inferSchema','True')\
.option('sep',',')\
.csv("diabetes.csv")
df.write.format('parquet')\
.mode("overwrite")\
.option("compression", "snappy")\
.save("diab_parquet")
df.columns
df.count()
df.schema
df.show(5)
df.printSchema()
numeric_features = [t[0] for t in df.dtypes if t[1]== 'int']
df.select(numeric_features).describe().toPandas().transpose()
df.groupby("Outcome").count().show()
from pyspark.sql.functions import *
# check if there are NaN values
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).toPandas().head()
from pyspark.sql.functions import udf
y_udf = udf(lambda y: "No" if y==0 else "Yes", StringType())
df = df.withColumn('HasDiab', y_udf('OutCome')).drop("OutCome")
df.show(2, vertical=True)
df.printSchema()
# feature bining
def udf_multiple(age):
if age <= 25:
return 'Under 25'
elif age >= 25 and age <= 35:
return 'Between 25 and 35'
elif age > 35 and age < 50:
return 'Between 36 and 49'
elif age >= 50:
return 'Over 50'
else: return 'N/A'
education_udf = udf(udf_multiple)
df=df.withColumn("Age_udf", education_udf("Age"))
df.show(2, vertical=True)
df = spark.read.option("header", "true").option('inferSchema', "true").option("sep", ",").csv("scores.csv")
df.show()
from pyspark.ml.feature import VectorAssembler
cols = ['score1', 'score2']
assembler = VectorAssembler(inputCols = cols , outputCol="features")
featureDF = assembler.transform(df).drop(*cols)
featureDF.show()
featureDF.printSchema()
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="result", outputCol="label")
labelDF = indexer.fit(featureDF).transform(featureDF)
train, test = labelDF.randomSplit([0.8, 0.2], seed=24)
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=5)
model = lr.fit(train)
pred_train = model.transform(train)
pred_test = model.transform(test)
pred_train.toPandas()
pred_train.show(2, vertical=True)
pred_train.select(['probability', 'prediction']).toPandas()
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
model.summary.roc.show(3)
import matplotlib.pyplot as plt
train_summ = model.summary
roc = train_summ.roc.toPandas()
plt.plot(roc['FPR'], roc['TPR'])
plt.title('Roc Curve')
plt.show()
print(f"gini: {2*evaluator.evaluate(pred_test)-1}")
# spark stream on data frame
spark.stop()
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
spark = SparkSession\
.builder\
.master("local[2]")\
.appName("Stream_DF")\
.getOrCreate()
# inner database for spark
spark.sql("Create database test")
spark.catalog.listDatabases()
spark.sparkContext.setLogLevel("ERROR")
# load data
lines = spark\
.readStream\
.format("socket")\
.option("host", "localhost")\
.option("port", 9999)\
.load()
# transform data
words = lines\
.select(f.explode(f.split(lines.value, " "))\
.alias("word"))
wordCounts = words.groupBy("word").count()
# save results
query = (wordCounts\
.writeStream\
.outputMode("complete")\
.format("console")\
.start())
query.awaitTermination()
%%file spark2.py
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
if __name__ == "__main__":
spark = SparkSession.builder.master("local[2]").appName("Stream_DF").getOrCreate()
print("="*50)
print("Zaczynamy DataFrame")
print("="*50)
spark.sparkContext.setLogLevel("ERROR")
lines = spark.readStream\
.format("socket")\
.option("host", "localhost")\
.option("port", 9999)\
.load()
words = lines.select(f.explode(f.split(lines.value, " ")).alias("word"))
wordCounts = words.groupBy("word").count()
query = wordCounts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
#query.stop()
run it
! spark-submit spark2.py