from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()Przetwarzanie Spark DataFrame
Przygotowanie danych
mkdir data
cd data
curl -L -o donation.zip http://bit.ly/1Aoywaq
unzip donation.zip
unzip 'block_*.zip'# create dataframe
prev = spark.read.csv("data/block*.csv")
prevprev.show(2)
prev.show()# dodatkowe opcje z header i wartości null
parsed = spark.read.option("header", "true")\
.option("nullValue", "?")\
.option("inferSchema", "true")\
.csv("data/block*.csv")parsed.show(5)
parsed.printSchema()obsługiwane formaty
- parquet
- orc
- json
- jdbc
- avro
- yrxy
- image
- libsvm
- binary
- xml
parsed.write.format("parquet").save("data/block2.parquet")t = spark.read.format("parquet").load("data/block2.parquet")t.show(2)schematy danych
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
schema = StructType([
StructField("Date", StringType(), True),
StructField("Open", DoubleType(), True),
StructField("High", DoubleType(), True),
StructField("Low", DoubleType(), True),
StructField("Close", DoubleType(), True),
StructField("Volume", IntegerType(), True),
StructField("Name", StringType(), True)
])
ddlSchemaStr = """Date STRING, Open FLOAT, High FLOAT,
Low FLOAT, Close FLOAT, Voulme INT, Name String
"""from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.option("header", True)\
.csv("AAPL_2006-01-01_to_2018-01-01.csv", schema=ddlSchemaStr)
df.show(5)dane niustrukturyzowane
%%file test.json
{
"id": "0001",
"type": "donut",
"name": "Cake",
"ppu": 0.55,
"batters":
{
"batter":
[
{ "id": "1001", "type": "Regular" },
{ "id": "1002", "type": "Chocolate" },
{ "id": "1003", "type": "Blueberry" }
]
},
"topping":
[
{ "id": "5001", "type": "None" },
{ "id": "5002", "type": "Glazed" },
{ "id": "5005", "type": "Sugar" },
{ "id": "5007", "type": "Powdered Sugar" },
{ "id": "5006", "type": "Chocolate with Sprinkles" },
{ "id": "5003", "type": "Chocolate" },
{ "id": "5004", "type": "Maple" }
]
}rawDFjson = spark.read.json("test.json", multiLine = "true")rawDFjson.printSchema()sampleDF = rawDFjson.withColumnRenamed("id", "key")batDF = sampleDF.select("key", "batters.batter")
batDF.printSchema()
batDF.show(1, False)from pyspark.sql.functions import explode
bat2DF = batDF.select("key", explode("batter").alias("new_batter"))
bat2DF.show()bat2DF.printSchema()bat2DF.select("key", "new_batter.*").show()finalBatDF = (sampleDF
.select("key",
explode("batters.batter").alias("new_batter"))
.select("key", "new_batter.*")
.withColumnRenamed("id", "bat_id")
.withColumnRenamed("type", "bat_type"))
finalBatDF.show()topDF = (sampleDF
.select("key", explode("topping").alias("new_topping"))
.select("key", "new_topping.*")
.withColumnRenamed("id", "top_id")
.withColumnRenamed("type", "top_type")
)
topDF.show(10, False)Eksploracyjna Analiza Danych
# zweryfikuj schemat danych
parsed.printSchema()# sprawdz wartosci dla pierwszego rzedu
parsed.first()# ile przypadkow
parsed.count()# target "is_match" liczba zgodnych i niezgodnych rekordow
from pyspark.sql.functions import col
parsed.groupBy("is_match").count().orderBy(col("count").desc()).show()# inne agregaty agg
from pyspark.sql.functions import avg, stddev, stddev_pop
parsed.agg(avg("cmp_sex"), stddev("cmp_sex"), stddev_pop("cmp_sex")).show()# polecenia sql - przypisanie nazwy dla silnika sql - tabela przejsciowa
parsed.createOrReplaceTempView("dane")spark.sql(""" SELECT is_match, COUNT(*) cnt FROM dane group by is_match order by cnt DESC""").show()# zbiorcze statystyki
summary = parsed.describe()
summary.show()summary.select("summary", "cmp_fname_c1", "cmp_fname_c2").show()która zmienna lepiej opisze dane c1 czy c2
# statystyki dla poszczegolnych klas
# filtrowanie sql
matches = parsed.where("is_match = true")
# filtrowanie pyspark
misses = parsed.filter(col("is_match") == False)
match_summary = matches.describe()
miss_summary = misses.describe()match_summary.show()miss_summary.show()Tabele przestawne spark
summary_p = summary.toPandas()summary_p.head()summary_p.shapesummary_p = summary_p.set_index('summary').transpose().reset_index()
summary_p = summary_p.rename(columns={'index':'field'})
summary_p = summary_p.rename_axis(None, axis=1)summaryT = spark.createDataFrame(summary_p)
summaryT.show()summaryT.printSchema() # czy dobre typy danych ?? from pyspark.sql.types import DoubleType
for c in summaryT.columns:
if c == 'field':
continue
summaryT = summaryT.withColumn(c, summaryT[c].cast(DoubleType()))summaryT.printSchema() # teraz lepiejdef pivot_summary(desc):
desc_p = desc.toPandas()
desc_p = desc_p.set_index('summary').transpose().reset_index()
desc_p = desc_p.rename(columns={'index':'field'})
desc_p = desc_p.rename_axis(None, axis=1)
descT = spark.createDataFrame(desc_p)
for c in descT.columns:
if c == 'field':
continue
else:
descT = descT.withColumn(c, descT[c].cast(DoubleType()))
return descTmatch_summaryT = pivot_summary(match_summary)
miss_summaryT = pivot_summary(miss_summary)match_summaryT.createOrReplaceTempView("match_s")
miss_summaryT.createOrReplaceTempView("miss_s")spark.sql("""
Select a.field, a.count + b.count total, a.mean - b.mean delta
from match_s a inner join miss_s b on a.field = b.field
where a.field not in ("id_1", "id_2")
order by delta DESC, total DESC
""").show()do modelu :
cmp_plz,cmp_by,cmp_bd,cmp_lname_c1,cmp_bm
## score = suma zmiennych
zmienne = ['cmp_plz','cmp_by','cmp_bd','cmp_lname_c1','cmp_bm']
suma = " + ".join(zmienne)sumafrom pyspark.sql.functions import exprscored = parsed.fillna(0, subset=zmienne)\
.withColumn('score', expr(suma))\
.select('score','is_match')scored.show()# ocena wartosci progowej
def crossTabs(scored, t):
return scored.selectExpr(f"score >= {t} as above", "is_match")\
.groupBy("above").pivot("is_match",("true","false"))\
.count()crossTabs(scored, 4.0).show()crossTabs(scored, 2.0).show()