%%file model.py
from joblib import dump
import numpy as np
from sklearn.ensemble import IsolationForest
ran_gen = np.random.RandomState(44)
X = 0.4 * ran_gen.randn(500,2)
X = np.round(X, 3)
X_train = np.r_[X+2, X-2]
clf = IsolationForest(n_estimators=50, max_samples=500, random_state=ran_gen, contamination=0.01)
clf.fit(X_train)
dump(clf, './isolation_forest.joblib')generowanie strumienia danych
!python model.py%%file transaction_producer.py
import json
import time
import logging
import socket
from datetime import datetime
from numpy.random import uniform, choice, randn
from random import random as r
import numpy as np
from confluent_kafka import Producer
KAFKA_BROKER = 'broker:9092'
TRANSACTION_TOPIC = 'transactions'
LAG = 0.5
PROBABILITY_OUTLIER = 0.05
def create_producer():
try:
producer = Producer({
"bootstrap.servers":KAFKA_BROKER,
"client.id": socket.gethostname(),
"enable.idempotence": True,
"batch.size": 64000,
"linger.ms":10,
"acks": "all",
"retries": 5,
"delivery.timeout.ms":1000
})
except Exception as e:
logging.exception("nie mogę utworzyć producenta")
producer = None
return producer
_id = 0
producer = create_producer()
if producer is not None:
while True:
if r() <= PROBABILITY_OUTLIER:
X_test = uniform(low=-4, high=4, size=(1,2))
else:
X = 0.3 * randn(1,2)
X_test = (X + choice(a=[2,-2], size=1, p=[0.5, 0.5]))
X_test = np.round(X_test, 3).tolist()
current_time = datetime.utcnow().isoformat()
record = {
"id": _id,
"data": X_test,
"current_time" : current_time
}
record = json.dumps(record).encode("utf-8")
producer.produce(topic= TRANSACTION_TOPIC, value=record)
producer.flush()
_id +=1
time.sleep(LAG)Uruchom terminal i odpal kod producenta.
python transaction_producer.pyWeryfikacja Kafki
Uruchom nowy terminal i zweryfikuj czy topic transactions jest w Kafce
cd ~
kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092Następnie uruchom
cd ~
kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic transactionsi sprawdź czy transakcje przechodzą.
%%file outliers_detection.py
import json
import os
import time
import numpy as np
import socket
import logging
from datetime import datetime
from joblib import load
from confluent_kafka import Producer, Consumer
from multiprocessing import Process
KAFKA_BROKER = 'broker:9092'
TRANSACTION_TOPIC = 'transactions'
TRANSACTOPM_CG = 'transactions'
ANOMALY_TOPIC = 'anomaly'
NUM_PARTITIONS = 3
MODEL_PATH = os.path.abspath('isolation_forest.joblib')
def create_producer():
try:
producer = Producer({
"bootstrap.servers":KAFKA_BROKER,
"client.id": socket.gethostname(),
"enable.idempotence": True,
"batch.size": 64000,
"linger.ms":10,
"acks": "all",
"retries": 5,
"delivery.timeout.ms":1000
})
except Exception as e:
logging.exception("nie mogę utworzyć producenta")
producer = None
return producer
def create_consumer(topic, group_id):
try:
consumer = Consumer({
"bootstrap.servers": KAFKA_BROKER,
"group.id": group_id,
"client.id": socket.gethostname(),
"isolation.level":"read_committed",
"default.topic.config":{
"auto.offset.reset":"latest",
"enable.auto.commit": False
}
})
consumer.subscribe([topic])
except Exception as e:
logging.exception("nie mogę utworzyć konsumenta")
consumer = None
return consumer
def detekcja_anomalii():
consumer = create_consumer(topic=TRANSACTION_TOPIC, group_id=TRANSACTOPM_CG)
producer = create_producer()
clf = load(MODEL_PATH)
while True:
message = consumer.poll()
if message is None:
continue
if message.error():
logging.error(f"CONSUMER error: {message.error()}")
continue
record = json.loads(message.value().decode('utf-8'))
data = record['data']
prediction = clf.predict(data)
if prediction[0] == -1 :
score = clf.score_samples(data)
record["score"] = np.round(score, 3).tolist()
_id = str(record["id"])
record = json.dumps(record).encode("utf-8")
producer.produce(topic=ANOMALY_TOPIC, value=record)
producer.flush()
consumer.close()
for _ in range(NUM_PARTITIONS):
p = Process(target=detekcja_anomalii)
p.start()Dodaj topic anomaly i uruchom podgląd
kafka/bin/kafka-topics.sh --create --topic anomaly --bootstrap-server broker:9092cd ~
kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic anomalyOdczytanie strumienia w Apache Spark - Socket
%%file socket_stream_start.py
from socket import *
import time
rdd = list()
with open("MobyDick_full.txt", 'r') as ad:
for line in ad:
rdd.append(line)
HOST = 'localhost'
PORT = 9999
ADDR = (HOST, PORT)
tcpSock = socket(AF_INET, SOCK_STREAM)
tcpSock.bind(ADDR)
tcpSock.listen(5)
while True:
c, addr = tcpSock.accept()
print('got connection')
for line in rdd:
try:
c.send(line.encode())
time.sleep(1)
except:
break
c.close()
print('disconnected')%%file streamWordCount.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
if __name__ == "__main__":
spark = SparkSession.builder.appName("Stream_DF").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
lines = (spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load())
words = lines.select(explode(split(lines.value, " ")).alias("word"))
word_counts = words.groupBy("word").count()
streamingQuery = (word_counts
.writeStream
.format("console")
.outputMode("complete")
.trigger(processingTime="5 second")
.start())
streamingQuery.awaitTermination()
Odczytanie strumienia w Apache Spark + Kafka
%%file raw_app.py
## LOAD SPARK SESSION object
SERVER = "broker:9092"
if __name__ == "__main__":
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
raw = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "transactions")
.load()
)
query = (
raw.writeStream
.outputMode("append")
.option("truncate", "false")
.format("console")
.start()
)
query.awaitTermination()
query.stop()spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 raw_app.py