- Persist data to PostgreSQL.
 - Monitor changes to data using the Debezium Connector.
 - Stream data from a Kafka topic using PySpark (Spark Streaming).
 - Convert the streaming data to Delta Lake format.
 - Write the Delta Lake data to MinIO (AWS Object Storage).
 - Query the data with Trino.
 - Display the results in DBeaver.
 
Before runing this script, ensure you have the following installed.
Note:  The project was setup on Ubuntu 22.04 OS.
- Ubuntu 22.04 (prefered, but you can use Ubuntu 20.04)
 - Python 3.10
 - Apache Spark (installed locally)
 - Apache Airflow
 - Confluent Containers (Zookeeper, Kafka, Schema Registry, Connect, Control Center)
 - Docker
 - Minio
 - Trino, DBeaver CE
 - Delta Lake
 - Debezium, Debezium UI
 
- Clone the repository
 
$ git clone https://github.com/VuBacktracking/stream-data-processing.git
$ cd stream-data-processing- Start our data streaming infrastructure
 
$ sudo service docker start
$ docker compose -f storage-docker-compose.yaml -f stream-docker-compose.yaml up -d- Setup environment
 
$ python3 -m venv .venv
$ pip install -r requirements.txtCreate .env file and paste your MINIO keys, SPARK_HOME in it.
# MinIO
- MINIO_ACCESS_KEY='minio_access_key'
- MINIO_SECRET_KEY='minio_secret_key'
- MINIO_ENDPOINT='http://localhost:9000'
- BUCKET_NAME='datalake'
# Postgres SQL
- POSTGRES_DB='v9'
- POSTGRES_USER='v9'
- POSTGRES_PASSWORD='v9'
# Spark
- SPARK_HOME=""- Services
 
- Postgres is accessible on the default port 5432.
 - Debezium UI: http://localhost:8085.
 - Kafka Control Center: http://localhost:9021.
 - Trino: http://localhost:8084.
 - MinIO: http://localhost:9001.
 
- Step 1. Start Debezium Connection
 
cd debezium
bash run-cdc.sh register_connector conf/products-cdc-config.jsonYou should see the connection is running like the image below in the port http://localhost:8085.
- Step 2. Create table and insert data into Database
 
python3 database-operations/create_table.py
python3 database-operations/insert_table.pyIn the PostgreSQL connection, you should see the database v9 and the table products like the image below.
- Step 3. Start Streaming Data to MinIO
 
python3 stream_processing/delta-to-minio.pyAfter putting data to MinIO storage, you can go to the port http://localhost:9001 and see the result like this image
Create your Trino schema and table in Dbeaver
-- Create the schema if it doesn't exist
CREATE SCHEMA IF NOT EXISTS lakehouse.products
WITH (location = 's3://datalake/');
-- Create the products table
CREATE TABLE IF NOT EXISTS lakehouse.products.products (
    id VARCHAR,
    name VARCHAR,
    original_price DOUBLE,
    price DOUBLE,
    fulfillment_type VARCHAR,
    brand VARCHAR,
    review_count INTEGER,
    rating_average DOUBLE,
    favourite_count INTEGER,
    current_seller VARCHAR,
    number_of_images INTEGER,
    category VARCHAR,
    quantity_sold INTEGER,
    discount DOUBLE
) WITH (
    location = 's3://datalake/products/'
);