from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sparkApache Spark EDA
Load datasets
Open new terminal
mkdir data
cd data
curl -L -o donation.zip http://bit.ly/1Aoywaq
unzip donation.zip
unzip 'block_*.zip'
rm donation.zip block*.zip now we can download not only one file (as in pandas) but all files with our data.
prev = spark.read.csv("data/block*.csv")prevprev.show(2)prev.show()parsed = (
    spark.read
    .option("header", "true")
    .option("nullValue", "?")
    .option("inferSchema", "true")
    .csv("data/block*.csv")
)find info about
.format()method in Apache Spark
parsed.show(5)parsed.printSchema()other formats
- parquet
 - orc
 - json
 - jdbc
 - avro
 - yrxy
 - image
 - libsvm
 - binary
 - xml
 
parsed.write.format("parquet").save("data/block.parquet")t = spark.read.format("parquet").load("data/block.parquet")t.show(2)spark.stop()data schema
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
schema = StructType([
  StructField("Date", StringType(), True),
  StructField("Open", DoubleType(), True),
  StructField("High", DoubleType(), True),
  StructField("Low", DoubleType(), True),
  StructField("Close", DoubleType(), True),
  StructField("Volume", IntegerType(), True),
  StructField("Name", StringType(), True)
])
ddlSchemaStr = """Date STRING, Open FLOAT, High FLOAT, 
Low FLOAT, Close FLOAT, Voulme INT, Name String 
"""from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.option("header", True)\
.csv("data/stocks/AAPL_2006-01-01_to_2018-01-01.csv", schema=ddlSchemaStr)
df.show(5)Unstructured data
%%file test.json
{
 "id": "0001",
 "type": "donut",
 "name": "Cake",
 "ppu": 0.55,
 "batters":
  {
   "batter":
    [
     { "id": "1001", "type": "Regular" },
     { "id": "1002", "type": "Chocolate" },
     { "id": "1003", "type": "Blueberry" }
    ]
  },
 "topping":
  [
   { "id": "5001", "type": "None" },
   { "id": "5002", "type": "Glazed" },
   { "id": "5005", "type": "Sugar" },
   { "id": "5007", "type": "Powdered Sugar" },
   { "id": "5006", "type": "Chocolate with Sprinkles" },
   { "id": "5003", "type": "Chocolate" },
   { "id": "5004", "type": "Maple" }
  ]
}Overwriting test.json
rawDFjson = spark.read.json("test.json", multiLine = "true")rawDFjson.printSchema()root
 |-- batters: struct (nullable = true)
 |    |-- batter: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- ppu: double (nullable = true)
 |-- topping: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- type: string (nullable = true)
sampleDF = rawDFjson.withColumnRenamed("id", "key")sampleDF.printSchema()root
 |-- batters: struct (nullable = true)
 |    |-- batter: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |-- key: string (nullable = true)
 |-- name: string (nullable = true)
 |-- ppu: double (nullable = true)
 |-- topping: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- type: string (nullable = true)
batDF = sampleDF.select("key", "batters.batter")
batDF.printSchema()root
 |-- key: string (nullable = true)
 |-- batter: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- type: string (nullable = true)
batDF.show(1, False)+----+-------------------------------------------------------+
|key |batter                                                 |
+----+-------------------------------------------------------+
|0001|[{1001, Regular}, {1002, Chocolate}, {1003, Blueberry}]|
+----+-------------------------------------------------------+
from pyspark.sql.functions import explode
bat2DF = batDF.select("key", explode("batter").alias("new_batter"))
bat2DF.show()+----+-----------------+
| key|       new_batter|
+----+-----------------+
|0001|  {1001, Regular}|
|0001|{1002, Chocolate}|
|0001|{1003, Blueberry}|
+----+-----------------+
bat2DF.printSchema()root
 |-- key: string (nullable = true)
 |-- new_batter: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- type: string (nullable = true)
bat2DF.select("key", "new_batter.*").show()+----+----+---------+
| key|  id|     type|
+----+----+---------+
|0001|1001|  Regular|
|0001|1002|Chocolate|
|0001|1003|Blueberry|
+----+----+---------+
finalBatDF = (sampleDF
        .select("key",  
explode("batters.batter").alias("new_batter"))
        .select("key", "new_batter.*")
        .withColumnRenamed("id", "bat_id")
        .withColumnRenamed("type", "bat_type"))
finalBatDF.show()topDF = (sampleDF
        .select("key", explode("topping").alias("new_topping"))
        .select("key", "new_topping.*")
        .withColumnRenamed("id", "top_id")
        .withColumnRenamed("type", "top_type")
        )
topDF.show(10, False)EDA
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
parsed = (
    spark.read
    .option("header", "true")
    .option("nullValue", "?")
    .option("inferSchema", "true")
    .csv("data/block*.csv")
)parsed.printSchema()parsed.first()What is the
Row()object? What is basic object for a pandas DataFrame?
parsed.count()Let’s check info about our Target
from pyspark.sql.functions import col
parsed.groupBy("is_match").count().orderBy(col("count").desc()).show()You can find aggregations about other features
from pyspark.sql.functions import avg, stddev, stddev_pop
parsed.agg(avg("cmp_sex"), stddev("cmp_sex"), stddev_pop("cmp_sex")).show()You can also do everything on SQL
parsed.createOrReplaceTempView("dane")spark.sql(""" SELECT is_match, COUNT(*) cnt FROM dane group by is_match order by cnt DESC""").show()summary = parsed.describe()
summary.show()summary.select("summary", "cmp_fname_c1", "cmp_fname_c2").show()How we can filter our data with rows and columns?
# like in sql
matches = parsed.where("is_match = true")
# like in spark
misses = parsed.filter(col("is_match") == False)
match_summary = matches.describe()
miss_summary = misses.describe()match_summary.show()miss_summary.show()Go to pandas data frame
summary_p = summary.toPandas()summary_p.head()summary_p.shapesummary_p = summary_p.set_index('summary').transpose().reset_index()
summary_p = summary_p.rename(columns={'index':'field'})
summary_p = summary_p.rename_axis(None, axis=1)summaryT = spark.createDataFrame(summary_p)
summaryT.show()Check our schema
summaryT.printSchema() from pyspark.sql.types import DoubleType
for c in summaryT.columns:
    if c == 'field':
        continue
    summaryT = summaryT.withColumn(c, summaryT[c].cast(DoubleType()))summaryT.printSchema()def pivot_summary(desc):
    desc_p = desc.toPandas()
    desc_p = desc_p.set_index('summary').transpose().reset_index()
    desc_p = desc_p.rename(columns={'index':'field'})
    desc_p = desc_p.rename_axis(None, axis=1)
    descT = spark.createDataFrame(desc_p)
    for c in descT.columns:
        if c == 'field':
            continue
        else:
            descT = descT.withColumn(c, descT[c].cast(DoubleType()))
    return descTmatch_summaryT = pivot_summary(match_summary)
miss_summaryT = pivot_summary(miss_summary)joins
match_summaryT.createOrReplaceTempView("match_s")
miss_summaryT.createOrReplaceTempView("miss_s")spark.sql("""
Select a.field, a.count + b.count total, a.mean - b.mean delta
from match_s a inner join miss_s b on a.field = b.field 
where a.field not in ("id_1", "id_2")
order by delta DESC, total DESC
""").show()do modelu :
cmp_plz,cmp_by,cmp_bd,cmp_lname_c1,cmp_bm
## score = sum of features
features = ['cmp_plz','cmp_by','cmp_bd','cmp_lname_c1','cmp_bm']
score = " + ".join(features)scorefrom pyspark.sql.functions import exprscored = parsed.fillna(0, subset=features)\
.withColumn('score', expr(score))\
.select('score','is_match')scored.show()# ocena wartosci progowej
def crossTabs(scored, t):
    return scored.selectExpr(f"score >= {t} as above", "is_match")\
    .groupBy("above").pivot("is_match",("true","false"))\
    .count()crossTabs(scored, 4.0).show()crossTabs(scored, 2.0).show()