From 175f54e14e7139a11e460d8ee05c65dcfd3d2a36 Mon Sep 17 00:00:00 2001 From: ferdzo Date: Mon, 14 Oct 2024 16:18:41 +0200 Subject: [PATCH] Updates --- .gitignore | 3 +- iotDashboard/gpt.py => gpt.py | 5 +- iotDashboard/db_create.py | 54 ----------------- iotDashboard/forms.py | 2 +- iotDashboard/tasks.py | 58 ++++++++++++------- iotDashboard/views.py | 2 +- .../mqtt_service.py => mqtt_service.py | 34 ++++++++--- proba_publish.py | 37 ++++++++++++ 8 files changed, 107 insertions(+), 88 deletions(-) rename iotDashboard/gpt.py => gpt.py (92%) delete mode 100644 iotDashboard/db_create.py rename iotDashboard/mqtt_service.py => mqtt_service.py (75%) create mode 100644 proba_publish.py diff --git a/.gitignore b/.gitignore index 7ee212e..75d39b9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,10 @@ /iotDashboard/.env +.env /.idea db.sqlite3 demo.db demo.db-shm demo.db-wal /iotDashboard/demo1.db -__pycache__/ +/__pycache__/ iotDashboard/db_create.py diff --git a/iotDashboard/gpt.py b/gpt.py similarity index 92% rename from iotDashboard/gpt.py rename to gpt.py index 9654bd9..6d95063 100644 --- a/iotDashboard/gpt.py +++ b/gpt.py @@ -1,5 +1,5 @@ import json - +import os import redis from dotenv import load_dotenv from openai import OpenAI @@ -7,7 +7,8 @@ from openai import OpenAI load_dotenv() client = OpenAI() -redis_client = redis.StrictRedis(host='10.10.0.1', port=6379, db=0) +REDIS_HOST=os.getenv('REDIS_HOST') +redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0) data = redis_client.get("last5").decode("utf-8") diff --git a/iotDashboard/db_create.py b/iotDashboard/db_create.py deleted file mode 100644 index d76df4c..0000000 --- a/iotDashboard/db_create.py +++ /dev/null @@ -1,54 +0,0 @@ -import psycopg2 -from psycopg2 import sql -import os -from dotenv import load_dotenv - -# Load environment variables -load_dotenv() - -# Define your database connection parameters -DATABASE_NAME = os.getenv('DB_NAME', 'example') -USER = os.getenv('DB_USER', 'postgres') -PASSWORD = os.getenv('DB_PASSWORD', 'coolermaster') -HOST = os.getenv('DB_HOST', '10.10.0.1') -PORT = os.getenv('DB_PORT', '5555') - -def create_sensor_readings_table(): - """Create the sensor_readings table if it does not exist.""" - try: - # Establish connection to the database - conn = psycopg2.connect( - dbname=DATABASE_NAME, - user=USER, - password=PASSWORD, - host=HOST, - port=PORT - ) - - with conn.cursor() as cursor: - # SQL command to create the sensor_readings table - create_table_query = """ - CREATE TABLE IF NOT EXISTS sensor_readings ( - time TIMESTAMPTZ NOT NULL, - device_name VARCHAR(255) NOT NULL, -- Use device_name as a string - metric VARCHAR(50) NOT NULL, -- Type of sensor - value DOUBLE PRECISION NOT NULL, -- The sensor's value - PRIMARY KEY (time, device_name, metric) -- Composite primary key - ); - """ - cursor.execute(create_table_query) - print("Table 'sensor_readings' created or already exists.") - - # Commit changes - conn.commit() - - except Exception as e: - print(f"Error during database operations: {e}") - - finally: - if conn: - conn.close() - print("Database connection closed.") - -if __name__ == "__main__": - create_sensor_readings_table() \ No newline at end of file diff --git a/iotDashboard/forms.py b/iotDashboard/forms.py index 9648423..b661dad 100644 --- a/iotDashboard/forms.py +++ b/iotDashboard/forms.py @@ -1,5 +1,5 @@ from django import forms -from .models import Device, Sensor, SensorType +from iotDashboard.models import Device, Sensor, SensorType class DeviceForm(forms.ModelForm): diff --git a/iotDashboard/tasks.py b/iotDashboard/tasks.py index 874451f..3efb3d3 100644 --- a/iotDashboard/tasks.py +++ b/iotDashboard/tasks.py @@ -1,15 +1,29 @@ import json import datetime +import os import requests import psycopg2 import redis from django.conf import settings from huey import crontab from huey.contrib.djhuey import periodic_task -from .models import Device, Sensor, SensorType +from .models import Device +from dotenv import load_dotenv + +load_dotenv() # Initialize Redis client -redis_client = redis.StrictRedis(host='10.10.0.1', port=6379, db=0) +REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') # Default to localhost if not set +try: + redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0) + print(redis_client) + redis_client.ping() + print('Connected!') +except Exception as ex: + print + 'Error:', ex + exit('Failed to connect, terminating.') + def devices_to_redis(): @@ -50,25 +64,27 @@ def fetch_data_http(device, sensor): return None -def fetch_data_mqtt(device, sensor): - """Fetch data from Redis for a specific MQTT device and sensor.""" - # Get the data for the specific device from Redis - data = redis_client.get(device.name) # Assumes device.name is the Redis key - if data: - data = json.loads(data.decode('utf-8')) +def fetch_data_mqtt_stream(device, sensor): + """Fetch data from Redis Stream for a specific MQTT device and sensor.""" + sensor_name = sensor.type.name.lower() + stream_key = f"mqtt_stream:{device.name}:{sensor_name}" # Key format for the stream + try: + # Read from the Redis stream, waiting for new data with blocking if necessary + stream_data = redis_client.xread({stream_key: '0-0'}, block=1000, count=1) + if stream_data: + _, entries = stream_data[0] # Get the entries from the stream + for entry_id, entry_data in entries: + sensor_value = entry_data.get(b'value') + timestamp = entry_data.get(b'time') - # Normalize the sensor name to lowercase for lookup - sensor_name = sensor.type.name.lower() - sensor_value = data['sensors'].get(sensor_name) - - if sensor_value is not None and is_recent_data(data['time']): - return { - "time": data['time'], - "device": device.name, - "sensor_value": sensor_value - } - - print(data) + if sensor_value and timestamp: + return { + "time": timestamp.decode('utf-8'), + "device": device.name, + "sensor_value": float(sensor_value.decode('utf-8')) + } + except Exception as e: + print(f"Error fetching data from stream {stream_key}: {e}") return None @@ -121,7 +137,7 @@ def fetch_data_from_all_devices(): if device.protocol == 'http': data = fetch_data_http(device, sensor) elif device.protocol == 'mqtt': - data = fetch_data_mqtt(device, sensor) + data = fetch_data_mqtt_stream(device, sensor) if data and is_recent_data(data['time']): insert_data(data, sensor.type.name) diff --git a/iotDashboard/views.py b/iotDashboard/views.py index 8a7ad9d..e29c6ae 100644 --- a/iotDashboard/views.py +++ b/iotDashboard/views.py @@ -5,7 +5,7 @@ from django.http import JsonResponse, HttpResponse from django.shortcuts import render, redirect, get_object_or_404 from .forms import DeviceForm, SensorWithTypeForm -from .models import Device, Sensor +from iotDashboard.models import Device, Sensor redis_client = redis.StrictRedis(host='10.10.0.1', port=6379, db=0) diff --git a/iotDashboard/mqtt_service.py b/mqtt_service.py similarity index 75% rename from iotDashboard/mqtt_service.py rename to mqtt_service.py index b4319eb..51f7745 100644 --- a/iotDashboard/mqtt_service.py +++ b/mqtt_service.py @@ -10,12 +10,20 @@ from dotenv import load_dotenv load_dotenv() # Set up Redis client -REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') -redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0) -print("Connected to Redis Server") +REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') # Default to localhost if not set +try: + redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0) + print(redis_client) + redis_client.ping() + print('Connected!') +except Exception as ex: + print + 'Error:', ex + exit('Failed to connect, terminating.') + # MQTT broker address -MQTT_BROKER = os.getenv('MQTT_BROKER') +MQTT_BROKER = os.getenv('MQTT_BROKER', 'localhost') # Default to localhost if not set mqtt_data = {} @@ -30,8 +38,17 @@ def get_mqtt_devices(): def build_device_map(): """Build a mapping of device endpoints to friendly names.""" devices = get_mqtt_devices() - return {device['topic'].split('/')[0]: device['device_name'] for device in - devices} # Assuming topic starts with device name + return {device['topic'].split('/')[0]: device['device_name'] for device in devices} # Assuming topic starts with + # device name + + +def publish_to_stream(stream_name, data): + """Append a message to Redis Stream.""" + try: + redis_client.xadd(stream_name, data) + print(f"Published to Redis Stream '{stream_name}': {data}") + except redis.RedisError as e: + print(f"Error writing to Redis Stream: {e}") def on_message(client, userdata, msg): @@ -49,6 +66,7 @@ def on_message(client, userdata, msg): device_map = build_device_map() device_name = device_map.get(device_endpoint, device_endpoint) # Fallback to endpoint if not found + # Initialize device data if it's the first sensor reading if device_name not in mqtt_data: mqtt_data[device_name] = { "time": datetime.utcnow().isoformat(), @@ -60,8 +78,8 @@ def on_message(client, userdata, msg): mqtt_data[device_name]["sensors"][sensor_type] = sensor_value mqtt_data[device_name]["time"] = datetime.utcnow().isoformat() - # Store the updated data structure in Redis - redis_client.set(device_name, json.dumps(mqtt_data[device_name])) + # Publish to Redis Stream (adjust as needed to reflect the correct stream name) + publish_to_stream(device_name, mqtt_data[device_name]) print(f"Updated data for {device_name}: {mqtt_data[device_name]}") except ValueError as e: diff --git a/proba_publish.py b/proba_publish.py new file mode 100644 index 0000000..18c14f7 --- /dev/null +++ b/proba_publish.py @@ -0,0 +1,37 @@ +import os +from datetime import datetime + +import redis +from dotenv import load_dotenv + +load_dotenv() + +REDIS_HOST = os.getenv('REDIS_HOST') +try: + redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0) +except redis.RedisError as e: + raise e +STREAM_NAME = 'sensor_data_stream' + + +def publish_to_stream(stream_name, data): + """Publish a message to the Redis Stream.""" + try: + redis_client.xadd(stream_name, data) + print(f"Published to Redis Stream '{stream_name}': {data}") + except redis.RedisError as e: + print(f"Error writing to Redis Stream: {e}") + + +if __name__ == "__main__": + # Example sensor data to publish + mqtt_data = { + "time": datetime.utcnow().isoformat(), + "device": "Livingroom", + "metric": "temperature", + "value": 25.6 + } + + # Publish to the stream + publish_to_stream(STREAM_NAME, mqtt_data) +