from pyspark.sql import SparkSession
= SparkSession.builder.getOrCreate() spark
Przetwarzanie Spark DataFrame
Przygotowanie danych
mkdir data
cd data
curl -L -o donation.zip http://bit.ly/1Aoywaq
unzip donation.zip
unzip 'block_*.zip'
# create dataframe
= spark.read.csv("data/block*.csv")
prev prev
2)
prev.show(
prev.show()
# dodatkowe opcje z header i wartości null
= spark.read.option("header", "true")\
parsed "nullValue", "?")\
.option("inferSchema", "true")\
.option("data/block*.csv") .csv(
5)
parsed.show(
parsed.printSchema()
obsługiwane formaty
- parquet
- orc
- json
- jdbc
- avro
- yrxy
- image
- libsvm
- binary
- xml
format("parquet").save("data/block2.parquet") parsed.write.
= spark.read.format("parquet").load("data/block2.parquet") t
2) t.show(
schematy danych
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
= StructType([
schema "Date", StringType(), True),
StructField("Open", DoubleType(), True),
StructField("High", DoubleType(), True),
StructField("Low", DoubleType(), True),
StructField("Close", DoubleType(), True),
StructField("Volume", IntegerType(), True),
StructField("Name", StringType(), True)
StructField(
])
= """Date STRING, Open FLOAT, High FLOAT,
ddlSchemaStr Low FLOAT, Close FLOAT, Voulme INT, Name String
"""
from pyspark.sql import SparkSession
= SparkSession.builder.getOrCreate()
spark
= spark.read.option("header", True)\
df "AAPL_2006-01-01_to_2018-01-01.csv", schema=ddlSchemaStr)
.csv(
5) df.show(
dane niustrukturyzowane
%%file test.json
{"id": "0001",
"type": "donut",
"name": "Cake",
"ppu": 0.55,
"batters":
{"batter":
["id": "1001", "type": "Regular" },
{ "id": "1002", "type": "Chocolate" },
{ "id": "1003", "type": "Blueberry" }
{
]
},"topping":
["id": "5001", "type": "None" },
{ "id": "5002", "type": "Glazed" },
{ "id": "5005", "type": "Sugar" },
{ "id": "5007", "type": "Powdered Sugar" },
{ "id": "5006", "type": "Chocolate with Sprinkles" },
{ "id": "5003", "type": "Chocolate" },
{ "id": "5004", "type": "Maple" }
{
] }
= spark.read.json("test.json", multiLine = "true") rawDFjson
rawDFjson.printSchema()
= rawDFjson.withColumnRenamed("id", "key") sampleDF
= sampleDF.select("key", "batters.batter")
batDF
batDF.printSchema()1, False) batDF.show(
from pyspark.sql.functions import explode
= batDF.select("key", explode("batter").alias("new_batter"))
bat2DF bat2DF.show()
bat2DF.printSchema()
"key", "new_batter.*").show() bat2DF.select(
= (sampleDF
finalBatDF "key",
.select("batters.batter").alias("new_batter"))
explode("key", "new_batter.*")
.select("id", "bat_id")
.withColumnRenamed("type", "bat_type"))
.withColumnRenamed( finalBatDF.show()
= (sampleDF
topDF "key", explode("topping").alias("new_topping"))
.select("key", "new_topping.*")
.select("id", "top_id")
.withColumnRenamed("type", "top_type")
.withColumnRenamed(
)10, False) topDF.show(
Eksploracyjna Analiza Danych
# zweryfikuj schemat danych
parsed.printSchema()
# sprawdz wartosci dla pierwszego rzedu
parsed.first()
# ile przypadkow
parsed.count()
# target "is_match" liczba zgodnych i niezgodnych rekordow
from pyspark.sql.functions import col
"is_match").count().orderBy(col("count").desc()).show() parsed.groupBy(
# inne agregaty agg
from pyspark.sql.functions import avg, stddev, stddev_pop
"cmp_sex"), stddev("cmp_sex"), stddev_pop("cmp_sex")).show() parsed.agg(avg(
# polecenia sql - przypisanie nazwy dla silnika sql - tabela przejsciowa
"dane") parsed.createOrReplaceTempView(
""" SELECT is_match, COUNT(*) cnt FROM dane group by is_match order by cnt DESC""").show() spark.sql(
# zbiorcze statystyki
= parsed.describe()
summary summary.show()
"summary", "cmp_fname_c1", "cmp_fname_c2").show() summary.select(
która zmienna lepiej opisze dane c1 czy c2
# statystyki dla poszczegolnych klas
# filtrowanie sql
= parsed.where("is_match = true")
matches
# filtrowanie pyspark
= parsed.filter(col("is_match") == False)
misses
= matches.describe()
match_summary = misses.describe() miss_summary
match_summary.show()
miss_summary.show()
Tabele przestawne spark
= summary.toPandas() summary_p
summary_p.head()
summary_p.shape
= summary_p.set_index('summary').transpose().reset_index()
summary_p = summary_p.rename(columns={'index':'field'})
summary_p = summary_p.rename_axis(None, axis=1) summary_p
= spark.createDataFrame(summary_p)
summaryT summaryT.show()
# czy dobre typy danych ?? summaryT.printSchema()
from pyspark.sql.types import DoubleType
for c in summaryT.columns:
if c == 'field':
continue
= summaryT.withColumn(c, summaryT[c].cast(DoubleType())) summaryT
# teraz lepiej summaryT.printSchema()
def pivot_summary(desc):
= desc.toPandas()
desc_p = desc_p.set_index('summary').transpose().reset_index()
desc_p = desc_p.rename(columns={'index':'field'})
desc_p = desc_p.rename_axis(None, axis=1)
desc_p = spark.createDataFrame(desc_p)
descT for c in descT.columns:
if c == 'field':
continue
else:
= descT.withColumn(c, descT[c].cast(DoubleType()))
descT return descT
= pivot_summary(match_summary)
match_summaryT = pivot_summary(miss_summary) miss_summaryT
"match_s")
match_summaryT.createOrReplaceTempView("miss_s") miss_summaryT.createOrReplaceTempView(
"""
spark.sql(Select a.field, a.count + b.count total, a.mean - b.mean delta
from match_s a inner join miss_s b on a.field = b.field
where a.field not in ("id_1", "id_2")
order by delta DESC, total DESC
""").show()
do modelu :
cmp_plz
,cmp_by
,cmp_bd
,cmp_lname_c1
,cmp_bm
## score = suma zmiennych
= ['cmp_plz','cmp_by','cmp_bd','cmp_lname_c1','cmp_bm']
zmienne = " + ".join(zmienne) suma
suma
from pyspark.sql.functions import expr
= parsed.fillna(0, subset=zmienne)\
scored 'score', expr(suma))\
.withColumn('score','is_match') .select(
scored.show()
# ocena wartosci progowej
def crossTabs(scored, t):
return scored.selectExpr(f"score >= {t} as above", "is_match")\
"above").pivot("is_match",("true","false"))\
.groupBy( .count()
4.0).show() crossTabs(scored,
2.0).show() crossTabs(scored,