from pyspark.sql import SparkSession
Przygotowanie danych
= SparkSession.builder.getOrCreate() spark
spark
SparkSession - in-memory
mkdir data
cd data
curl -L -o donation.zip http://bit.ly/1Aoywaq
unzip donation.zip
unzip 'block_*.zip'
# create dataframe
= spark.read.csv("data/block*.csv") prev
prev
2) prev.show(
prev.show()
# dodatkowe opcje z header i wartości null
= spark.read.option("header", "true")\
parsed "nullValue", "?")\
.option("inferSchema", "true")\
.option("data/block*.csv") .csv(
5) parsed.show(
parsed.printSchema()
inne formaty
- parquet
- orc
- json
- jdbc
- avro
- yrxy
- image
- libsvm
- binary
- xml
format("parquet").save("data/block2.parquet") parsed.write.
= spark.read.format("parquet").load("data/block.parquet") t
2) t.show(
spark.stop()
schematy danych
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
= StructType([
schema "Date", StringType(), True),
StructField("Open", DoubleType(), True),
StructField("High", DoubleType(), True),
StructField("Low", DoubleType(), True),
StructField("Close", DoubleType(), True),
StructField("Volume", IntegerType(), True),
StructField("Name", StringType(), True)
StructField(
])
= """Date STRING, Open FLOAT, High FLOAT,
ddlSchemaStr Low FLOAT, Close FLOAT, Voulme INT, Name String
"""
from pyspark.sql import SparkSession
= SparkSession.builder.getOrCreate()
spark
= spark.read.option("header", True)\
df "data/stocks/AAPL_2006-01-01_to_2018-01-01.csv", schema=ddlSchemaStr)
.csv(
5) df.show(
dane niustrukturyzowane
%%file test.json
{"id": "0001",
"type": "donut",
"name": "Cake",
"ppu": 0.55,
"batters":
{"batter":
["id": "1001", "type": "Regular" },
{ "id": "1002", "type": "Chocolate" },
{ "id": "1003", "type": "Blueberry" }
{
]
},"topping":
["id": "5001", "type": "None" },
{ "id": "5002", "type": "Glazed" },
{ "id": "5005", "type": "Sugar" },
{ "id": "5007", "type": "Powdered Sugar" },
{ "id": "5006", "type": "Chocolate with Sprinkles" },
{ "id": "5003", "type": "Chocolate" },
{ "id": "5004", "type": "Maple" }
{
] }
Overwriting test.json
= spark.read.json("test.json", multiLine = "true") rawDFjson
rawDFjson.printSchema()
root
|-- batters: struct (nullable = true)
| |-- batter: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- id: string (nullable = true)
| | | |-- type: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- ppu: double (nullable = true)
|-- topping: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- type: string (nullable = true)
|-- type: string (nullable = true)
= rawDFjson.withColumnRenamed("id", "key") sampleDF
sampleDF.printSchema()
root
|-- batters: struct (nullable = true)
| |-- batter: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- id: string (nullable = true)
| | | |-- type: string (nullable = true)
|-- key: string (nullable = true)
|-- name: string (nullable = true)
|-- ppu: double (nullable = true)
|-- topping: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- type: string (nullable = true)
|-- type: string (nullable = true)
= sampleDF.select("key", "batters.batter")
batDF batDF.printSchema()
root
|-- key: string (nullable = true)
|-- batter: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- type: string (nullable = true)
1, False) batDF.show(
+----+-------------------------------------------------------+
|key |batter |
+----+-------------------------------------------------------+
|0001|[{1001, Regular}, {1002, Chocolate}, {1003, Blueberry}]|
+----+-------------------------------------------------------+
from pyspark.sql.functions import explode
= batDF.select("key", explode("batter").alias("new_batter"))
bat2DF bat2DF.show()
+----+-----------------+
| key| new_batter|
+----+-----------------+
|0001| {1001, Regular}|
|0001|{1002, Chocolate}|
|0001|{1003, Blueberry}|
+----+-----------------+
bat2DF.printSchema()
root
|-- key: string (nullable = true)
|-- new_batter: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- type: string (nullable = true)
"key", "new_batter.*").show() bat2DF.select(
+----+----+---------+
| key| id| type|
+----+----+---------+
|0001|1001| Regular|
|0001|1002|Chocolate|
|0001|1003|Blueberry|
+----+----+---------+
= (sampleDF
finalBatDF "key",
.select("batters.batter").alias("new_batter"))
explode("key", "new_batter.*")
.select("id", "bat_id")
.withColumnRenamed("type", "bat_type"))
.withColumnRenamed( finalBatDF.show()
= (sampleDF
topDF "key", explode("topping").alias("new_topping"))
.select("key", "new_topping.*")
.select("id", "top_id")
.withColumnRenamed("type", "top_type")
.withColumnRenamed(
)10, False) topDF.show(
Eksploracyjna Analiza Danych
# zweryfikuj schemat danych
parsed.printSchema()
# sprawdz wartosci dla pierwszego rzedu
parsed.first()
# ile przypadkow
parsed.count()
# zapisz do pamieci na klastrze (1 maszyna)
parsed.cache()
# target "is_match" liczba zgodnych i niezgodnych rekordow
from pyspark.sql.functions import col
"is_match").count().orderBy(col("count").desc()).show() parsed.groupBy(
# inne agregaty agg
from pyspark.sql.functions import avg, stddev, stddev_pop
"cmp_sex"), stddev("cmp_sex"), stddev_pop("cmp_sex")).show() parsed.agg(avg(
# polecenia sql - przypisanie nazwy dla silnika sql - tabela przejsciowa
"dane") parsed.createOrReplaceTempView(
""" SELECT is_match, COUNT(*) cnt FROM dane group by is_match order by cnt DESC""").show() spark.sql(
# zbiorcze statystyki
= parsed.describe()
summary summary.show()
"summary", "cmp_fname_c1", "cmp_fname_c2").show() summary.select(
która zmienna lepiej opisze dane c1 czy c2
# statystyki dla poszczegolnych klas
# filtrowanie sql
= parsed.where("is_match = true")
matches # filtrowanie pyspark
= parsed.filter(col("is_match") == False)
misses
= matches.describe()
match_summary = misses.describe() miss_summary
match_summary.show()
miss_summary.show()
Tabele przestawne spark
= summary.toPandas() summary_p
summary_p.head()
summary_p.shape
= summary_p.set_index('summary').transpose().reset_index()
summary_p = summary_p.rename(columns={'index':'field'})
summary_p = summary_p.rename_axis(None, axis=1) summary_p
= spark.createDataFrame(summary_p)
summaryT summaryT.show()
# czy dobre typy danych ?? summaryT.printSchema()
from pyspark.sql.types import DoubleType
for c in summaryT.columns:
if c == 'field':
continue
= summaryT.withColumn(c, summaryT[c].cast(DoubleType())) summaryT
# teraz lepiej summaryT.printSchema()
# wykonaj to samo dla tabel match i miss
def pivot_summary(desc):
= desc.toPandas()
desc_p = desc_p.set_index('summary').transpose().reset_index()
desc_p = desc_p.rename(columns={'index':'field'})
desc_p = desc_p.rename_axis(None, axis=1)
desc_p = spark.createDataFrame(desc_p)
descT for c in descT.columns:
if c == 'field':
continue
else:
= descT.withColumn(c, descT[c].cast(DoubleType()))
descT return descT
= pivot_summary(match_summary)
match_summaryT = pivot_summary(miss_summary) miss_summaryT
złączenia
"match_s")
match_summaryT.createOrReplaceTempView("miss_s") miss_summaryT.createOrReplaceTempView(
"""
spark.sql(Select a.field, a.count + b.count total, a.mean - b.mean delta
from match_s a inner join miss_s b on a.field = b.field
where a.field not in ("id_1", "id_2")
order by delta DESC, total DESC
""").show()
do modelu :
cmp_plz
,cmp_by
,cmp_bd
,cmp_lname_c1
,cmp_bm
## score = suma zmiennych
= ['cmp_plz','cmp_by','cmp_bd','cmp_lname_c1','cmp_bm']
zmienne = " + ".join(zmienne) suma
suma
from pyspark.sql.functions import expr
= parsed.fillna(0, subset=zmienne)\
scored 'score', expr(suma))\
.withColumn('score','is_match') .select(
scored.show()
# ocena wartosci progowej
def crossTabs(scored, t):
return scored.selectExpr(f"score >= {t} as above", "is_match")\
"above").pivot("is_match",("true","false"))\
.groupBy( .count()
4.0).show() crossTabs(scored,
2.0).show() crossTabs(scored,