%%file model.py
from joblib import dump
import numpy as np
from sklearn.ensemble import IsolationForest
= np.random.RandomState(44)
ran_gen
= 0.4 * ran_gen.randn(500,2)
X = np.round(X, 3)
X = np.r_[X+2, X-2]
X_train
= IsolationForest(n_estimators=50, max_samples=500, random_state=ran_gen, contamination=0.01)
clf
clf.fit(X_train)
'./isolation_forest.joblib') dump(clf,
generowanie strumienia danych
!python model.py
%%file transaction_producer.py
import json
import time
import logging
import socket
from datetime import datetime
from numpy.random import uniform, choice, randn
from random import random as r
import numpy as np
from confluent_kafka import Producer
= 'broker:9092'
KAFKA_BROKER = 'transactions'
TRANSACTION_TOPIC = 0.5
LAG = 0.05
PROBABILITY_OUTLIER
def create_producer():
try:
= Producer({
producer "bootstrap.servers":KAFKA_BROKER,
"client.id": socket.gethostname(),
"enable.idempotence": True,
"batch.size": 64000,
"linger.ms":10,
"acks": "all",
"retries": 5,
"delivery.timeout.ms":1000
})except Exception as e:
"nie mogę utworzyć producenta")
logging.exception(= None
producer return producer
= 0
_id = create_producer()
producer
if producer is not None:
while True:
if r() <= PROBABILITY_OUTLIER:
= uniform(low=-4, high=4, size=(1,2))
X_test else:
= 0.3 * randn(1,2)
X = (X + choice(a=[2,-2], size=1, p=[0.5, 0.5]))
X_test = np.round(X_test, 3).tolist()
X_test = datetime.utcnow().isoformat()
current_time = {
record "id": _id,
"data": X_test,
"current_time" : current_time
}= json.dumps(record).encode("utf-8")
record = TRANSACTION_TOPIC, value=record)
producer.produce(topic
producer.flush()+=1
_id time.sleep(LAG)
Uruchom terminal i odpal kod producenta.
python transaction_producer.py
Weryfikacja Kafki
Uruchom nowy terminal i zweryfikuj czy topic transactions
jest w Kafce
cd ~
kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092
Następnie uruchom
cd ~
kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic transactions
i sprawdź czy transakcje przechodzą.
%%file outliers_detection.py
import json
import os
import time
import numpy as np
import socket
import logging
from datetime import datetime
from joblib import load
from confluent_kafka import Producer, Consumer
from multiprocessing import Process
= 'broker:9092'
KAFKA_BROKER = 'transactions'
TRANSACTION_TOPIC = 'transactions'
TRANSACTOPM_CG = 'anomaly'
ANOMALY_TOPIC = 3
NUM_PARTITIONS
= os.path.abspath('isolation_forest.joblib')
MODEL_PATH
def create_producer():
try:
= Producer({
producer "bootstrap.servers":KAFKA_BROKER,
"client.id": socket.gethostname(),
"enable.idempotence": True,
"batch.size": 64000,
"linger.ms":10,
"acks": "all",
"retries": 5,
"delivery.timeout.ms":1000
})except Exception as e:
"nie mogę utworzyć producenta")
logging.exception(= None
producer return producer
def create_consumer(topic, group_id):
try:
= Consumer({
consumer "bootstrap.servers": KAFKA_BROKER,
"group.id": group_id,
"client.id": socket.gethostname(),
"isolation.level":"read_committed",
"default.topic.config":{
"auto.offset.reset":"latest",
"enable.auto.commit": False
}
})
consumer.subscribe([topic])except Exception as e:
"nie mogę utworzyć konsumenta")
logging.exception(= None
consumer return consumer
def detekcja_anomalii():
= create_consumer(topic=TRANSACTION_TOPIC, group_id=TRANSACTOPM_CG)
consumer = create_producer()
producer = load(MODEL_PATH)
clf while True:
= consumer.poll()
message if message is None:
continue
if message.error():
f"CONSUMER error: {message.error()}")
logging.error(continue
= json.loads(message.value().decode('utf-8'))
record = record['data']
data = clf.predict(data)
prediction if prediction[0] == -1 :
= clf.score_samples(data)
score "score"] = np.round(score, 3).tolist()
record[= str(record["id"])
_id = json.dumps(record).encode("utf-8")
record =ANOMALY_TOPIC, value=record)
producer.produce(topic
producer.flush()
consumer.close()
for _ in range(NUM_PARTITIONS):
= Process(target=detekcja_anomalii)
p p.start()
Dodaj topic anomaly i uruchom podgląd
kafka/bin/kafka-topics.sh --create --topic anomaly --bootstrap-server broker:9092
cd ~
kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic anomaly
Odczytanie strumienia w Apache Spark - Socket
%%file socket_stream_start.py
from socket import *
import time
= list()
rdd with open("MobyDick_full.txt", 'r') as ad:
for line in ad:
rdd.append(line)
= 'localhost'
HOST = 9999
PORT = (HOST, PORT)
ADDR = socket(AF_INET, SOCK_STREAM)
tcpSock
tcpSock.bind(ADDR)5)
tcpSock.listen(
while True:
= tcpSock.accept()
c, addr print('got connection')
for line in rdd:
try:
c.send(line.encode())1)
time.sleep(except:
break
c.close()print('disconnected')
%%file streamWordCount.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
if __name__ == "__main__":
= SparkSession.builder.appName("Stream_DF").getOrCreate()
spark "ERROR")
spark.sparkContext.setLogLevel(
= (spark
lines
.readStreamformat("socket")
."host", "localhost")
.option("port", 9999)
.option(
.load())
= lines.select(explode(split(lines.value, " ")).alias("word"))
words = words.groupBy("word").count()
word_counts
= (word_counts
streamingQuery
.writeStreamformat("console")
."complete")
.outputMode(="5 second")
.trigger(processingTime
.start())
streamingQuery.awaitTermination()
Odczytanie strumienia w Apache Spark + Kafka
%%file raw_app.py
## LOAD SPARK SESSION object
= "broker:9092"
SERVER
if __name__ == "__main__":
from pyspark.sql import SparkSession
= SparkSession.builder.getOrCreate()
spark "ERROR")
spark.sparkContext.setLogLevel(
= (
raw
spark.readStreamformat("kafka")
."kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "transactions")
.option(
.load()
)
= (
query
raw.writeStream"append")
.outputMode("truncate", "false")
.option(format("console")
.
.start()
)
query.awaitTermination() query.stop()
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 raw_app.py