from pyspark import SparkContext
Apache Spark intro
Apache Spark is a general-purpose, in-memory computing engine. Spark can be used with Hadoop, Yarn and other Big Data components to harness the power of Spark and improve the performance of your applications. It provides high-level APIs in Scala, Java, Python, R, and SQL.
Spark Architecture
Apache Spark works in a master-slave architecture where the master is called “Driver” and slaves are called “Workers”. The starting point of your Spark application is sc
, a Spark Context Class instance. It runs inside the driver.
Apache Spark: Sometimes also called Spark Core. The Spark Core implementation is a RDD (Resilient Distributed Dataset) which is a collection of distributed data across different nodes of the cluster that are processed in parallel.
Spark SQL: The implementation here is DataFrame, which is a relational representation of the data. It provides functions with SQL like capabilities. Also, we can write SQL like queries for our data analysis.
Spark Streaming: The implementation provided by this library is D-stream, also called Discretized Stream. This library provides capabilities to process/transform data in near real-time.
MLlib: This is a Machine Learning library with commonly used algorithms including collaborative filtering, classification, clustering, and regression.
GraphX: This library helps us to process Graphs, solving various problems (like Page Rank, Connected Components, etc) using Graph Theory.
Let’s dig a little deeper into Apache Spark (Spark Core), starting with RDD.
= SparkContext(appName="myAppName") sc
sc
RDD
- Resilient Distributed Dataset
- Podstawowa abstrakcja oraz rdzeń Sparka
- Obsługiwane przez dwa rodzaje operacji:
- Akcje:
- operacje uruchamiająceegzekucję transformacji na RDD
- przyjmują RDD jako input i zwracają wynik NIE będący RDD
- Transformacje:
- leniwe operacje
- przyjmują RDD i zwracają RDD
- Akcje:
- In-Memory - dane RDD przechowywane w pamięci
- Immutable
- Lazy evaluated
- Parallel - przetwarzane równolegle
- Partitioned - rozproszone
WAŻNE informacje !
Ważne do zrozumienia działania SPARKA:
Term | Definition |
---|---|
RDD | Resilient Distributed Dataset |
Transformation | Spark operation that produces an RDD |
Action | Spark operation that produces a local object |
Spark Job | Sequence of transformations on data with a final action |
Dwie podstawowe metody tworzenia RDD:
Method | Result |
---|---|
sc.parallelize(array) |
Create RDD of elements of array (or list) |
sc.textFile(path/to/file) |
Create RDD of lines from file |
Podstawowe transformacje
Transformation Example | Result |
---|---|
filter(lambda x: x % 2 == 0) |
Discard non-even elements |
map(lambda x: x * 2) |
Multiply each RDD element by 2 |
map(lambda x: x.split()) |
Split each string into words |
flatMap(lambda x: x.split()) |
Split each string into words and flatten sequence |
sample(withReplacement=True,0.25) |
Create sample of 25% of elements with replacement |
union(rdd) |
Append rdd to existing RDD |
distinct() |
Remove duplicates in RDD |
sortBy(lambda x: x, ascending=False) |
Sort elements in descending order |
Podstawowe akcje
Action | Result |
---|---|
collect() |
Convert RDD to in-memory list |
take(3) |
First 3 elements of RDD |
top(3) |
Top 3 elements of RDD |
takeSample(withReplacement=True,3) |
Create sample of 3 elements with replacement |
sum() |
Find element sum (assumes numeric elements) |
mean() |
Find element mean (assumes numeric elements) |
stdev() |
Find element deviation (assumes numeric elements) |
= ['Books', 'DVD', 'CD', 'PenDrive']
keywords = sc.parallelize(keywords)
key_rdd key_rdd.collect()
= key_rdd.map(lambda x: x.lower()) # transformacja key_small
# akcja key_small.collect()
sc.stop()
Spark’s core data structure
✅: A low level object that lets Spark work its magic by splitting data across multiple nodes in the cluster.
❌: However, RDDs are hard to work with directly, so we’ll be using the Spark DataFrame abstraction built on top of RDDs.
MAP REDUCE
from pyspark.sql import SparkSession
= SparkSession.builder.appName("new").getOrCreate()
spark
# otrzymanie obiektu SparkContext
= spark.sparkContext sc
import re
# Word Count on RDD
"MobyDick.txt")\
sc.textFile(map(lambda x: re.findall(r"[a-z']+", x.lower())) \
.lambda x: [(y, 1) for y in x]).reduceByKey(lambda x,y: x + y)\
.flatMap(5) .take(
SPARK STREAMING
Część Sparka odpowiedzialna za przetwarzanie danych w czasie rzeczywistym.
Dane mogą pochodzić z różnych źródeł np. sokety TCP, Kafka, etc. Korzystając z poznanych już metod map, reduce, join, oraz window
można w łatwy sposób generować przetwarzanie strumienia tak jaby był to nieskończony ciąg RDD. Ponadto nie ma problemu aby wywołać na strumieniu operacje ML czy wykresy.
Cała procedura przedstawia się następująco:
SPARK STREAMING w tej wersji wprowadza abstrakcje zwaną discretized stream
DStream (reprezentuje sekwencję RDD).
Operacje na DStream można wykonywać w API JAVA, SCALA, Python, R (nie wszystkie możliwości są dostępne dla Pythona).
Spark Streaming potrzebuje minium 2 rdzenie.
- StreamingContext(sparkContext, batchDuration) - reprezentuje połączenie z klastrem i służy do tworzenia DStreamów,
batchDuration
wskazuje na granularność batch’y (w sekundach) - socketTextStream(hostname, port) - tworzy DStream na podstawie danych napływających ze wskazanego źródła TCP
- flatMap(f), map(f), reduceByKey(f) - działają analogicznie jak w przypadku RDD z tym że tworzą nowe DStream’y
- pprint(n) - printuje pierwsze
n
(domyślnie 10) elementów z każdego RDD wygenerowanego w DStream’ie - StreamingContext.start() - rozpoczyna działania na strumieniach
- StreamingContext.awaitTermination(timeout) - oczekuje na zakończenie działań na strumieniach
- StreamingContext.stop(stopSparkContext, stopGraceFully) - kończy działania na strumieniach
Obiekt StreamingContext można wygenerować za pomocą obiektu SparkContext.
import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with two working thread
# and batch interval of 1 second
= SparkContext("local[2]", "NetworkWordCount2")
sc = StreamingContext(sc, 2)
ssc
# DStream
= ssc.socketTextStream("localhost", 9998)
lines
# podziel każdą linię na wyrazy
# DStream jest mapowany na kolejny DStream
# words = lines.flatMap(lambda line: line.split(" "))
= lines.flatMap(lambda x: re.findall(r"[a-z']+", x.lower()))
words
# zliczmy każdy wyraz w każdym batchu
# DStream jest mapowany na kolejny DStream
# pairs = words.map(lambda word: (word, 1))
# DStream jest mapowany na kolejny DStream
# wordCounts = pairs.reduceByKey(lambda x, y: x + y)
= words.map(lambda word: (word,1)).reduceByKey(lambda x,y: x+y)
wordCounts # wydrukuj pierwszy element
wordCounts.pprint()
# before start run a stream data
# Start the computation
ssc.start()
ssc.awaitTermination()
ssc.stop() sc.stop()
# w konsoli linuxowej netcat Nmap for windows
!nc -lk 9998
%%file start_stream.py
from socket import *
import time
= list()
rdd with open("MobyDick_full.txt", 'r') as ad:
for line in ad:
rdd.append(line)
= 'localhost'
HOST = 9998
PORT = (HOST, PORT)
ADDR = socket(AF_INET, SOCK_STREAM)
tcpSock
tcpSock.bind(ADDR)5)
tcpSock.listen(
while True:
= tcpSock.accept()
c, addr print('got connection')
for line in rdd:
try:
c.send(line.encode())1)
time.sleep(except:
break
c.close()print('disconnected')