Przetwarzanie Spark DataFrame

Przygotowanie danych

mkdir data
cd data
curl -L -o donation.zip http://bit.ly/1Aoywaq
unzip donation.zip
unzip 'block_*.zip'
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# create dataframe 
prev = spark.read.csv("data/block*.csv")
prev
prev.show(2)

prev.show()
# dodatkowe opcje z header i wartości null 
parsed = spark.read.option("header", "true")\
.option("nullValue", "?")\
.option("inferSchema", "true")\
.csv("data/block*.csv")
parsed.show(5)

parsed.printSchema()

obsługiwane formaty

  • parquet
  • orc
  • json
  • jdbc
  • avro
  • yrxy
  • image
  • libsvm
  • binary
  • xml
parsed.write.format("parquet").save("data/block2.parquet")
t = spark.read.format("parquet").load("data/block2.parquet")
t.show(2)

schematy danych

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

schema = StructType([
  StructField("Date", StringType(), True),
  StructField("Open", DoubleType(), True),
  StructField("High", DoubleType(), True),
  StructField("Low", DoubleType(), True),
  StructField("Close", DoubleType(), True),
  StructField("Volume", IntegerType(), True),
  StructField("Name", StringType(), True)
])


ddlSchemaStr = """Date STRING, Open FLOAT, High FLOAT, 
Low FLOAT, Close FLOAT, Voulme INT, Name String 
"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.option("header", True)\
.csv("AAPL_2006-01-01_to_2018-01-01.csv", schema=ddlSchemaStr)

df.show(5)

dane niustrukturyzowane

%%file test.json

{
 "id": "0001",
 "type": "donut",
 "name": "Cake",
 "ppu": 0.55,
 "batters":
  {
   "batter":
    [
     { "id": "1001", "type": "Regular" },
     { "id": "1002", "type": "Chocolate" },
     { "id": "1003", "type": "Blueberry" }
    ]
  },
 "topping":
  [
   { "id": "5001", "type": "None" },
   { "id": "5002", "type": "Glazed" },
   { "id": "5005", "type": "Sugar" },
   { "id": "5007", "type": "Powdered Sugar" },
   { "id": "5006", "type": "Chocolate with Sprinkles" },
   { "id": "5003", "type": "Chocolate" },
   { "id": "5004", "type": "Maple" }
  ]
}
rawDFjson = spark.read.json("test.json", multiLine = "true")
rawDFjson.printSchema()
sampleDF = rawDFjson.withColumnRenamed("id", "key")
batDF = sampleDF.select("key", "batters.batter")
batDF.printSchema()
batDF.show(1, False)
from pyspark.sql.functions import explode
bat2DF = batDF.select("key", explode("batter").alias("new_batter"))
bat2DF.show()
bat2DF.printSchema()
bat2DF.select("key", "new_batter.*").show()
finalBatDF = (sampleDF
        .select("key",  
explode("batters.batter").alias("new_batter"))
        .select("key", "new_batter.*")
        .withColumnRenamed("id", "bat_id")
        .withColumnRenamed("type", "bat_type"))
finalBatDF.show()
topDF = (sampleDF
        .select("key", explode("topping").alias("new_topping"))
        .select("key", "new_topping.*")
        .withColumnRenamed("id", "top_id")
        .withColumnRenamed("type", "top_type")
        )
topDF.show(10, False)

Eksploracyjna Analiza Danych

# zweryfikuj schemat danych
parsed.printSchema()
# sprawdz wartosci dla pierwszego rzedu
parsed.first()
# ile przypadkow 
parsed.count()
# target "is_match" liczba zgodnych i niezgodnych rekordow
from pyspark.sql.functions import col

parsed.groupBy("is_match").count().orderBy(col("count").desc()).show()
# inne agregaty agg
from pyspark.sql.functions import avg, stddev, stddev_pop

parsed.agg(avg("cmp_sex"), stddev("cmp_sex"), stddev_pop("cmp_sex")).show()
# polecenia sql - przypisanie nazwy dla silnika sql - tabela przejsciowa
parsed.createOrReplaceTempView("dane")
spark.sql(""" SELECT is_match, COUNT(*) cnt FROM dane group by is_match order by cnt DESC""").show()
# zbiorcze statystyki 
summary = parsed.describe()
summary.show()
summary.select("summary", "cmp_fname_c1", "cmp_fname_c2").show()

która zmienna lepiej opisze dane c1 czy c2

# statystyki dla poszczegolnych klas

# filtrowanie sql
matches = parsed.where("is_match = true")


# filtrowanie pyspark
misses = parsed.filter(col("is_match") == False)

match_summary = matches.describe()
miss_summary = misses.describe()
match_summary.show()
miss_summary.show()

Tabele przestawne spark

summary_p = summary.toPandas()
summary_p.head()
summary_p.shape
summary_p = summary_p.set_index('summary').transpose().reset_index()
summary_p = summary_p.rename(columns={'index':'field'})
summary_p = summary_p.rename_axis(None, axis=1)
summaryT = spark.createDataFrame(summary_p)
summaryT.show()
summaryT.printSchema() # czy dobre typy danych ?? 
from pyspark.sql.types import DoubleType

for c in summaryT.columns:
    if c == 'field':
        continue
    summaryT = summaryT.withColumn(c, summaryT[c].cast(DoubleType()))
summaryT.printSchema() # teraz lepiej
def pivot_summary(desc):
    desc_p = desc.toPandas()
    desc_p = desc_p.set_index('summary').transpose().reset_index()
    desc_p = desc_p.rename(columns={'index':'field'})
    desc_p = desc_p.rename_axis(None, axis=1)
    descT = spark.createDataFrame(desc_p)
    for c in descT.columns:
        if c == 'field':
            continue
        else:
            descT = descT.withColumn(c, descT[c].cast(DoubleType()))
    return descT
match_summaryT = pivot_summary(match_summary)
miss_summaryT = pivot_summary(miss_summary)
match_summaryT.createOrReplaceTempView("match_s")
miss_summaryT.createOrReplaceTempView("miss_s")
spark.sql("""
Select a.field, a.count + b.count total, a.mean - b.mean delta
from match_s a inner join miss_s b on a.field = b.field 
where a.field not in ("id_1", "id_2")
order by delta DESC, total DESC
""").show()

do modelu : cmp_plz, cmp_by, cmp_bd, cmp_lname_c1, cmp_bm

## score = suma zmiennych
zmienne = ['cmp_plz','cmp_by','cmp_bd','cmp_lname_c1','cmp_bm']
suma = " + ".join(zmienne)
suma
from pyspark.sql.functions import expr
scored = parsed.fillna(0, subset=zmienne)\
.withColumn('score', expr(suma))\
.select('score','is_match')
scored.show()
# ocena wartosci progowej
def crossTabs(scored, t):
    return scored.selectExpr(f"score >= {t} as above", "is_match")\
    .groupBy("above").pivot("is_match",("true","false"))\
    .count()
crossTabs(scored, 4.0).show()
crossTabs(scored, 2.0).show()