generowanie strumienia danych

%%file model.py

from joblib import dump
import numpy as np
from sklearn.ensemble import IsolationForest
 
ran_gen = np.random.RandomState(44)
 
X = 0.4 * ran_gen.randn(500,2)
X = np.round(X, 3)
X_train = np.r_[X+2, X-2]
 
clf = IsolationForest(n_estimators=50, max_samples=500, random_state=ran_gen, contamination=0.01)
clf.fit(X_train)
 
dump(clf, './isolation_forest.joblib')
!python model.py
%%file transaction_producer.py
 
import json
import time 
import logging
import socket
from datetime import datetime
from numpy.random import uniform, choice, randn
 
from random import random as r
 
import numpy as np
from confluent_kafka import Producer
 
 
KAFKA_BROKER = 'broker:9092'
TRANSACTION_TOPIC = 'transactions'
LAG = 0.5
PROBABILITY_OUTLIER = 0.05
 
def create_producer():
    try:
        producer = Producer({
        "bootstrap.servers":KAFKA_BROKER,
        "client.id": socket.gethostname(),
        "enable.idempotence": True,
        "batch.size": 64000,
        "linger.ms":10,
        "acks": "all",
        "retries": 5,
        "delivery.timeout.ms":1000
        })
    except Exception as e:
        logging.exception("nie mogę utworzyć producenta")
        producer = None
    return producer
 
 
_id = 0 
producer = create_producer()
 
if producer is not None:
    while True:
        if r() <= PROBABILITY_OUTLIER:
            X_test = uniform(low=-4, high=4, size=(1,2))
        else:
            X = 0.3 * randn(1,2)
            X_test = (X + choice(a=[2,-2], size=1, p=[0.5, 0.5]))
        X_test = np.round(X_test, 3).tolist()
        current_time = datetime.utcnow().isoformat()
        record = {
        "id": _id,
        "data": X_test,
        "current_time" : current_time
        }
        record = json.dumps(record).encode("utf-8")
        producer.produce(topic= TRANSACTION_TOPIC, value=record)
        producer.flush()
        _id +=1 
        time.sleep(LAG)

Uruchom terminal i odpal kod producenta.

python transaction_producer.py

Weryfikacja Kafki

Uruchom nowy terminal i zweryfikuj czy topic transactions jest w Kafce

cd ~
kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092

Następnie uruchom

cd ~
kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic transactions

i sprawdź czy transakcje przechodzą.

%%file outliers_detection.py
 
import json
import os
import time 
import numpy as np
import socket
import logging
from datetime import datetime
from joblib import load
from confluent_kafka import Producer, Consumer
from multiprocessing import Process
 
KAFKA_BROKER = 'broker:9092'
TRANSACTION_TOPIC = 'transactions'
TRANSACTOPM_CG = 'transactions'
ANOMALY_TOPIC = 'anomaly'
NUM_PARTITIONS = 3
 
MODEL_PATH = os.path.abspath('isolation_forest.joblib')
 
def create_producer():
    try:
        producer = Producer({
        "bootstrap.servers":KAFKA_BROKER,
        "client.id": socket.gethostname(),
        "enable.idempotence": True,
        "batch.size": 64000,
        "linger.ms":10,
        "acks": "all",
        "retries": 5,
        "delivery.timeout.ms":1000
        })
    except Exception as e:
        logging.exception("nie mogę utworzyć producenta")
        producer = None
    return producer
 
def create_consumer(topic, group_id):
    try:
 
        consumer = Consumer({
          "bootstrap.servers": KAFKA_BROKER,
          "group.id": group_id,
          "client.id": socket.gethostname(),
          "isolation.level":"read_committed",
          "default.topic.config":{
                    "auto.offset.reset":"latest",
                    "enable.auto.commit": False
            }
        })
        consumer.subscribe([topic])
    except Exception as e:
        logging.exception("nie mogę utworzyć konsumenta")
        consumer = None
    return consumer
def detekcja_anomalii():
    consumer = create_consumer(topic=TRANSACTION_TOPIC, group_id=TRANSACTOPM_CG)
    producer = create_producer()
    clf = load(MODEL_PATH)
    while True:
        message = consumer.poll()
        if message is None:
            continue
        if message.error():
            logging.error(f"CONSUMER error: {message.error()}")
            continue
        record = json.loads(message.value().decode('utf-8'))
        data = record['data']
        prediction = clf.predict(data)
        if prediction[0] == -1 :
            score = clf.score_samples(data)
            record["score"] = np.round(score, 3).tolist()
            _id = str(record["id"])
            record = json.dumps(record).encode("utf-8")
            producer.produce(topic=ANOMALY_TOPIC, value=record)
            producer.flush()
    consumer.close()

 
for _ in range(NUM_PARTITIONS):
    p = Process(target=detekcja_anomalii)
    p.start()

Dodaj topic anomaly i uruchom podgląd

kafka/bin/kafka-topics.sh --create --topic anomaly --bootstrap-server broker:9092
cd ~
kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic anomaly

Odczytanie strumienia w Apache Spark - Socket

%%file socket_stream_start.py

from socket import *
import time

rdd = list()
with open("MobyDick_full.txt", 'r') as ad:
    for line in ad:
        rdd.append(line)

HOST = 'localhost'
PORT = 9999
ADDR = (HOST, PORT)
tcpSock = socket(AF_INET, SOCK_STREAM)
tcpSock.bind(ADDR)
tcpSock.listen(5)


while True:
    c, addr = tcpSock.accept()
    print('got connection')
    for line in rdd:
        try:
            c.send(line.encode())
            time.sleep(1)
        except:
            break
    c.close()
    print('disconnected')
%%file streamWordCount.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

if __name__ == "__main__":
    spark = SparkSession.builder.appName("Stream_DF").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    
    lines = (spark
         .readStream
         .format("socket")
         .option("host", "localhost")
         .option("port", 9999)
         .load())

    words = lines.select(explode(split(lines.value, " ")).alias("word"))
    word_counts = words.groupBy("word").count()

    streamingQuery = (word_counts
         .writeStream
         .format("console")
         .outputMode("complete")
         .trigger(processingTime="5 second")
         .start())

    streamingQuery.awaitTermination()
         
         

Odczytanie strumienia w Apache Spark + Kafka

%%file raw_app.py

## LOAD SPARK SESSION object

SERVER = "broker:9092"

if __name__ == "__main__":
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
        
    raw = (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "broker:9092")
        .option("subscribe", "transactions")
        .load()
    )
    
    query =  (
        raw.writeStream
        .outputMode("append")
        .option("truncate", "false")
        .format("console")
        .start()
    )
    
    
    query.awaitTermination()
    query.stop()
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 raw_app.py