Analiza Danych w czasie rzeczywistym kurs dla studentów SGH

Ćwiczenia 5 - Apache Spark DataFrame

RDD -> DataFrame

Obiekt SparkContext

sc
dir(sc)
sc.__dir__()

Tworzenie obiektu RDD i transformacje -> akcje

rdd = sc.parallelize(range(20))
rdd
rdd2 = rdd.map(lambda x: x*x+2)
rdd2

rdd2.collect()

Funkcje

def zwroc_liste(x: float) -> list:
    return [x,x]

rdd3 = rdd.map(zwroc_liste)
rdd3.collect()

rdd4 = rdd.flatMap(zwroc_liste).take(8)
rdd4

Ramki z Rdd

df = rdd3.toDF(['col1','col2'])
df.show()

rdd = sc.parallelize([(1,2,3,'a ba c'), (4,3,5,'tekst')])
df = rdd.toDF(['col1','col3','col2','col4'])
df.show()
df.printSchema()

Nieustrukturyzowane dane w kolumnach DF

import requests
import json

def pobierz_info_slowa(slowo):
    language = 'en-gb'
    headers = {"app_id":"c7f6d128",  "app_key":"73ea2ed8109721300050137e74044fa6"}
    url_1 = "https://od-api.oxforddictionaries.com:443/api/v2/entries/"  
    url = f"{url_1}{language}/{slowo.lower()}"
    return requests.get(url, headers=headers)


odp = pobierz_info_slowa('streaming')

One Word

odp.text
odp.headers
odp.json()
# ======== RDD ================
odp_rdd = sc.parallelize([odp.text])
# ========= DF ==============
df = spark.read.json(odp_rdd)
df.show(truncate=False)
words = ['cogent','digress','dog','cat','diligent','obscure']
lista_odp = [pobierz_info_slowa(word).text for word in words ]
json_rdd = sc.parallelize(lista_odp)
json_df = spark.read.json(json_rdd)
json_df.show()
# for databricks
display(json_df)
json_df.printSchema()

Pola struct i array.

  1. Struct
    json_df.select('metadata.*').printSchema()
    
  2. array
json = """
{
"num": [1,2,3,4]
}
"""
df1 = spark.read.json(sc.parallelize([json]))
df1.printSchema()
display(df1)

from pyspark.sql.functions import *

df1.select(explode('num').alias('number')).show()

json_df.select(explode('results').alias('wynik')).show()

json_df.createOrReplaceTempView('slownik')

flat df

flat_df = spark.sql("""
select id , b
from slownik
lateral view outer explode(results)tmp1 as a
lateral view outer explode(a.lexicalEntries)tmp2 as b
""")

flat_df.printSchema()


flat_df3 = spark.sql("""select id as word,
a.language, definitions as definition,
examples.text as example
from slownik
lateral view outer explode(results)tmp1 as a
lateral view outer explode(a.lexicalEntries)tmp2 as b
lateral view outer explode(b.entries)tmp3 as c
lateral view outer explode(c.senses)tmp4 as d
lateral view outer explode(d.definitions)tmp5 as definitions
lateral view outer explode(d.examples)tmp6 as examples
""")

flat_df3.show()