From ed7d33e87f85be25692e4e905abafc27c207e2fa Mon Sep 17 00:00:00 2001 From: ferdzo Date: Thu, 10 Oct 2024 00:10:21 +0200 Subject: [PATCH 1/4] Update chart.html, experimenting with MQTT. --- iotDashboard/db_create.py | 54 +++++ iotDashboard/forms.py | 69 +++++- iotDashboard/gpt.py | 41 ++++ iotDashboard/models.py | 30 ++- iotDashboard/mqtt_service.py | 99 +++++--- iotDashboard/tasks.py | 179 +++++++++----- iotDashboard/templates/chart.html | 310 +++++++++--------------- iotDashboard/templates/device_form.html | 21 +- iotDashboard/templates/device_list.html | 42 ++-- iotDashboard/urls.py | 5 +- iotDashboard/views.py | 119 +++++++-- 11 files changed, 620 insertions(+), 349 deletions(-) diff --git a/iotDashboard/db_create.py b/iotDashboard/db_create.py index e69de29..d76df4c 100644 --- a/iotDashboard/db_create.py +++ b/iotDashboard/db_create.py @@ -0,0 +1,54 @@ +import psycopg2 +from psycopg2 import sql +import os +from dotenv import load_dotenv + +# Load environment variables +load_dotenv() + +# Define your database connection parameters +DATABASE_NAME = os.getenv('DB_NAME', 'example') +USER = os.getenv('DB_USER', 'postgres') +PASSWORD = os.getenv('DB_PASSWORD', 'coolermaster') +HOST = os.getenv('DB_HOST', '10.10.0.1') +PORT = os.getenv('DB_PORT', '5555') + +def create_sensor_readings_table(): + """Create the sensor_readings table if it does not exist.""" + try: + # Establish connection to the database + conn = psycopg2.connect( + dbname=DATABASE_NAME, + user=USER, + password=PASSWORD, + host=HOST, + port=PORT + ) + + with conn.cursor() as cursor: + # SQL command to create the sensor_readings table + create_table_query = """ + CREATE TABLE IF NOT EXISTS sensor_readings ( + time TIMESTAMPTZ NOT NULL, + device_name VARCHAR(255) NOT NULL, -- Use device_name as a string + metric VARCHAR(50) NOT NULL, -- Type of sensor + value DOUBLE PRECISION NOT NULL, -- The sensor's value + PRIMARY KEY (time, device_name, metric) -- Composite primary key + ); + """ + cursor.execute(create_table_query) + print("Table 'sensor_readings' created or already exists.") + + # Commit changes + conn.commit() + + except Exception as e: + print(f"Error during database operations: {e}") + + finally: + if conn: + conn.close() + print("Database connection closed.") + +if __name__ == "__main__": + create_sensor_readings_table() \ No newline at end of file diff --git a/iotDashboard/forms.py b/iotDashboard/forms.py index 315987a..8c09f2e 100644 --- a/iotDashboard/forms.py +++ b/iotDashboard/forms.py @@ -1,7 +1,72 @@ from django import forms -from .models import Device +from .models import Device, Sensor, SensorType class DeviceForm(forms.ModelForm): + # Optionally include sensors as choices in the form if relevant + sensors = forms.ModelMultipleChoiceField( + queryset=Sensor.objects.all(), + required=False, + widget=forms.CheckboxSelectMultiple, + label='Sensors' + ) + class Meta: model = Device - fields = ['name', 'ip', 'protocol', 'temperature', 'humidity'] + fields = ['name', 'ip', 'protocol'] + + def __init__(self, *args, **kwargs): + # Optionally pass initial sensors for editing an existing device + if 'instance' in kwargs: + initial_sensors = kwargs['instance'].sensors.all() if kwargs['instance'] else None + initial = kwargs.get('initial', {}) + initial['sensors'] = initial_sensors + kwargs['initial'] = initial + super(DeviceForm, self).__init__(*args, **kwargs) + + def save(self, commit=True): + # Save the device instance + device = super(DeviceForm, self).save(commit=False) + + if commit: + device.save() + self.save_m2m() # Ensure M2M save happens + + return device +class SensorWithTypeForm(forms.ModelForm): + # Add fields for SensorType directly in the form + type_name = forms.CharField(max_length=50, label="Sensor Type Name") + unit = forms.CharField(max_length=20, label="Unit", required=False) + protocol = forms.ChoiceField( + choices=[('mqtt', 'MQTT'), ('http', 'HTTP')], + label="Protocol" + ) + topic = forms.CharField(max_length=100, label="Topic", required=False) + endpoint = forms.CharField(max_length=100, label="Endpoint", required=False) + + class Meta: + model = Sensor + fields = ['device', 'enabled'] + + def save(self, commit=True): + # Create or get the SensorType + try: + sensor_type = SensorType.objects.get(name=self.cleaned_data['type_name']) + except SensorType.DoesNotExist: + sensor_type = SensorType( + name=self.cleaned_data['type_name'], + unit=self.cleaned_data['unit'], + protocol=self.cleaned_data['protocol'], + topic=self.cleaned_data['topic'], + endpoint=self.cleaned_data['endpoint'] + ) + if commit: + sensor_type.save() + + # Create Sensor with the SensorType found or created + sensor = super(SensorWithTypeForm, self).save(commit=False) + sensor.type = sensor_type + + if commit: + sensor.save() + + return sensor \ No newline at end of file diff --git a/iotDashboard/gpt.py b/iotDashboard/gpt.py index e69de29..9654bd9 100644 --- a/iotDashboard/gpt.py +++ b/iotDashboard/gpt.py @@ -0,0 +1,41 @@ +import json + +import redis +from dotenv import load_dotenv +from openai import OpenAI + +load_dotenv() +client = OpenAI() + +redis_client = redis.StrictRedis(host='10.10.0.1', port=6379, db=0) + +data = redis_client.get("last5").decode("utf-8") + + +def analysis(environment_data): + completion = client.chat.completions.create( + model="gpt-4o-mini", + messages=[ + {"role": "system", + "content": "You are an assistant that analyzes environmental data for an office working space and provides " + "concise numerical insights."}, + { + "role": "user", + "content": f"Analyze the following environmental data. The goal is maintaining optimal working " + f"conditions in the office and peak working brain. Focus on any outliers or necessary adjustments. The data is as following: {environment_data}." + f"The output should be only the recommendations in numerical form with postitive and negative " + f"numbers and also provide small summary in a sentence or two of the current conditions and " + f"easily computable in json format. Be consistent with the + and - signs and the summary" + } + ], + response_format={"type": "json_object"} + + ) + output = completion.choices[0].message.content + + return output + +output = analysis(data) +redis_client.set("gpt",json.dumps(output)) + +print(output) diff --git a/iotDashboard/models.py b/iotDashboard/models.py index 5e39517..34249c1 100644 --- a/iotDashboard/models.py +++ b/iotDashboard/models.py @@ -1,10 +1,26 @@ from django.db import models -class Device(models.Model): - name = models.CharField(max_length=50) - ip = models.CharField(max_length=20) - protocol = models.CharField(max_length=20) - temperature = models.BooleanField(default=False) - humidity = models.BooleanField(default=False) + +class SensorType(models.Model): + name = models.CharField(max_length=50, unique=True) # Sensor name, e.g., "CO2", "Noise", etc. + unit = models.CharField(max_length=20) # Unit of measurement, e.g., "ppm", "dB", "lux" + protocol = models.CharField(max_length=20, choices=[('mqtt', 'MQTT'), ('http', 'HTTP')]) # Protocol for communication + topic = models.CharField(max_length=100, null=True, blank=True) # Topic for MQTT communication + endpoint = models.CharField(max_length=100, null=True, blank=True) # Endpoint for HTTP communication def __str__(self): - return self.name \ No newline at end of file + return f"{self.name} ({self.unit})" + +class Device(models.Model): + name = models.CharField(max_length=50) # Device name + ip = models.CharField(max_length=20) # Device IP address + protocol = models.CharField(max_length=20, choices=[('mqtt', 'MQTT'), ('http', 'HTTP')]) + + def __str__(self): + return self.name + +class Sensor(models.Model): + device = models.ForeignKey(Device, related_name='sensors', on_delete=models.CASCADE) + type = models.ForeignKey(SensorType, on_delete=models.CASCADE) + enabled = models.BooleanField(default=True) + def __str__(self): + return f"{self.type.name} Sensor on {self.device.name}" diff --git a/iotDashboard/mqtt_service.py b/iotDashboard/mqtt_service.py index caa959b..b4319eb 100644 --- a/iotDashboard/mqtt_service.py +++ b/iotDashboard/mqtt_service.py @@ -6,83 +6,104 @@ import paho.mqtt.client as mqtt import redis from dotenv import load_dotenv +# Load environment variables load_dotenv() +# Set up Redis client REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0) - print("Connected to Redis Server") +# MQTT broker address MQTT_BROKER = os.getenv('MQTT_BROKER') - mqtt_data = {} -def get_devices(): - """" Simple method to get all devices """ - # Get devices from Redis - devices_json = redis_client.get('devices') +def get_mqtt_devices(): + """Retrieve MQTT devices and sensor details from Redis.""" + devices_json = redis_client.get('mqtt_devices') if devices_json: return json.loads(devices_json) return [] +def build_device_map(): + """Build a mapping of device endpoints to friendly names.""" + devices = get_mqtt_devices() + return {device['topic'].split('/')[0]: device['device_name'] for device in + devices} # Assuming topic starts with device name + + def on_message(client, userdata, msg): - """" Simple method to handle messages """ - topic = msg.topic.split('/') - device_name = topic[0] - sensor = topic[-2] + """Handle incoming messages from MQTT broker.""" + try: + # Parse the incoming message topic + topic_parts = msg.topic.split('/') + device_endpoint = topic_parts[0] # This is the actual endpoint name + sensor_type = topic_parts[2] # Assuming sensor type is in the third part - if device_name not in mqtt_data: - mqtt_data[device_name] = {"time": datetime.now(), - "device": device_name, - "temperature": None, - "humidity": None} + sensor_value = float(msg.payload.decode()) + print(f"Received message from {device_endpoint}, sensor {sensor_type}: {sensor_value}") - if sensor == "tempreature": - mqtt_data[device_name]["temperature"] = float(msg.payload.decode()) - elif sensor == "humidity": - mqtt_data[device_name]["humidity"] = float(msg.payload.decode()) + # Build the device map to get the friendly device name + device_map = build_device_map() + device_name = device_map.get(device_endpoint, device_endpoint) # Fallback to endpoint if not found + + if device_name not in mqtt_data: + mqtt_data[device_name] = { + "time": datetime.utcnow().isoformat(), + "device": device_name, + "sensors": {} + } + + # Update the sensor value in the mqtt_data dictionary + mqtt_data[device_name]["sensors"][sensor_type] = sensor_value + mqtt_data[device_name]["time"] = datetime.utcnow().isoformat() + + # Store the updated data structure in Redis + redis_client.set(device_name, json.dumps(mqtt_data[device_name])) + print(f"Updated data for {device_name}: {mqtt_data[device_name]}") + + except ValueError as e: + print(f"Error processing message payload: {e}") - mqtt_data[device_name]["time"] = str(datetime.now()) - redis_client.set(device_name, json.dumps(mqtt_data)) - print(f"Updated data for {device_name}: {mqtt_data[device_name]}") def on_connect(client, userdata, flags, rc): - """Handle successful connection.""" - print(f"Connected with result code {rc}") - devices = get_devices() - for device in devices: - client.subscribe(f"{device['name']}/sensor/+/state") + """Handle successful MQTT connection.""" + if rc == 0: + print("Connected to MQTT Broker") + devices = get_mqtt_devices() + for device in devices: + client.subscribe(device['topic']) # Subscribing to each device's topic + print(f"Subscribed to topic: {device['topic']}") + else: + print(f"Failed to connect, return code {rc}") + def on_disconnect(client, userdata, rc): - """Handle disconnection.""" - if rc != 0: - print(f"Unexpected disconnection. Result code: {rc}") + """Handle disconnection from MQTT broker.""" + print(f"Disconnected with result code: {rc}") + def start_mqtt_client(): - """ Start the MQTT client """ - devices = get_devices() - + """Start the MQTT client to begin listening to topics.""" client = mqtt.Client() - client.on_message = on_message client.on_connect = on_connect client.on_disconnect = on_disconnect + client.on_message = on_message client.connect(MQTT_BROKER) + client.loop_start() print("MQTT Client Started") - for device in devices: - client.subscribe(f"{device['name']}/sensor/+/state") - # Keep the script running try: while True: time.sleep(10) # Sleep to prevent high CPU usage except KeyboardInterrupt: print("Script interrupted by user") finally: - client.loop_stop() # Stop the loop when exiting + client.loop_stop() if __name__ == "__main__": - start_mqtt_client() \ No newline at end of file + start_mqtt_client() diff --git a/iotDashboard/tasks.py b/iotDashboard/tasks.py index 2ea6aef..459d904 100644 --- a/iotDashboard/tasks.py +++ b/iotDashboard/tasks.py @@ -2,91 +2,154 @@ import json import datetime import requests import psycopg2 +import redis from django.conf import settings from huey import crontab from huey.contrib.djhuey import periodic_task -from .models import Device -import redis - +from .models import Device, Sensor, SensorType # Initialize Redis client redis_client = redis.StrictRedis(host='10.10.0.1', port=6379, db=0) - def devices_to_redis(): - """Fetch all devices from Django and store them in Redis.""" + """Fetch devices and their sensors' topics from Django and store them in Redis.""" devices = Device.objects.all() - devices_list = [ - { - 'id': device.id, - 'name': device.name, - 'protocol': device.protocol, - 'ip': device.ip, - } - for device in devices - ] - redis_client.set('devices', json.dumps(devices_list)) + devices_list = [] + for device in devices: + for sensor in device.sensors.all(): + sensor_data = { + 'device_name': device.name, + 'sensor_name': sensor.type.name, + 'topic': sensor.type.topic # Assuming the topic is stored in SensorType + } + devices_list.append(sensor_data) + redis_client.set('mqtt_devices', json.dumps(devices_list)) + print("Devices with sensors stored in Redis.") - -def fetch_data_http(device): - """Fetch temperature and humidity data from an HTTP sensor.""" - data = { - "time": datetime.datetime.now().isoformat(), - "device": device.name, - } +def fetch_data_http(device, sensor): + """Fetch data from an HTTP sensor.""" + sensor_type_name = sensor.type.name.lower() try: - temperature_response = requests.get(f"http://{device.ip}/sensor/tempreature") - humidity_response = requests.get(f"http://{device.ip}/sensor/humidity") - data["temperature"] = temperature_response.json().get('value') - data["humidity"] = humidity_response.json().get('value') + # Make the request to the device's HTTP endpoint + response = requests.get(f"http://{device.ip}/sensor/{sensor_type_name}", timeout=5) + response.raise_for_status() # Raise an exception for any non-200 status codes + sensor_value = response.json().get('value') # Assuming the JSON response structure + if sensor_value is not None: + return { + "time": datetime.datetime.utcnow().isoformat(), + "device": device.name, + "sensor": sensor_type_name, + "sensor_value": sensor_value + } + else: + print(f"No value returned from {device.name} for {sensor_type_name}") except requests.RequestException as e: - print(f"HTTP request failed: {e}") - return data + print(f"HTTP request failed for {device.name}: {e}") + return None - -def fetch_data_mqtt(device_name): - """Fetch data from Redis for a specific MQTT device.""" - data = redis_client.get(device_name) +def fetch_data_mqtt(device, sensor): + """Fetch data from Redis for a specific MQTT device and sensor.""" + # Get the data for the specific device from Redis + data = redis_client.get(device.name) # Assumes device.name is the Redis key if data: - data = json.loads(data.decode('utf-8')).get(device_name) - if data and datetime.datetime.fromisoformat(data["time"]) > datetime.datetime.now() - datetime.timedelta( - minutes=2): - return data + data = json.loads(data.decode('utf-8')) + + # Normalize the sensor name to lowercase for lookup + sensor_name = sensor.type.name.lower() + sensor_value = data['sensors'].get(sensor_name) + + if sensor_value is not None and is_recent_data(data['time']): + return { + "time": data['time'], + "device": device.name, + "sensor_value": sensor_value + } + + print(data) return None -def insert_data(data): - """Insert data into the PostgreSQL database.""" - with psycopg2.connect(settings.CONNECTION_STRING) as conn: - with conn.cursor() as cursor: - insert_query = """ - INSERT INTO conditions (time, device, temperature, humidity) - VALUES (%s, %s, %s, %s) - """ - cursor.execute(insert_query, (data["time"], data["device"], data["temperature"], data["humidity"])) - conn.commit() +def is_recent_data(timestamp): + """Check if data is within a 2-minute freshness window.""" + data_time = datetime.datetime.fromisoformat(timestamp) + return data_time > datetime.datetime.utcnow() - datetime.timedelta(minutes=2) +def insert_data(data, sensor_type): + """Insert parsed data into the PostgreSQL database.""" + if 'sensor_value' not in data: + print(f"Missing 'sensor_value' in data: {data}. Skipping insertion.") + return + + insert_data_dict = { + "time": data['time'], + "device": data['device'], + "metric": sensor_type.lower(), + "value": data['sensor_value'], + } + + try: + with psycopg2.connect(settings.CONNECTION_STRING) as conn: + with conn.cursor() as cursor: + insert_query = """ + INSERT INTO sensor_readings (time, device_name, metric, value) + VALUES (%s, %s, %s, %s); + """ + cursor.execute(insert_query, ( + insert_data_dict["time"], + insert_data_dict["device"], + insert_data_dict["metric"], + insert_data_dict["value"] + )) + conn.commit() + print(f"Data inserted successfully for {insert_data_dict['device']}: {insert_data_dict}") + except Exception as e: + print(f"Failed to insert data: {e}") @periodic_task(crontab(minute='*/1')) def fetch_data_from_all_devices(): """Fetch and insert data for all devices based on their protocol.""" devices = Device.objects.all() for device in devices: - data = None - if device.protocol == 'http': - data = fetch_data_http(device) - elif device.protocol == 'mqtt': - data = fetch_data_mqtt(device.name) + for sensor in device.sensors.all(): + data = None - if data: - data_time = datetime.datetime.fromisoformat(data["time"]) - if data_time > datetime.datetime.now() - datetime.timedelta(minutes=1): - insert_data(data) + if device.protocol == 'http': + data = fetch_data_http(device, sensor) + elif device.protocol == 'mqtt': + data = fetch_data_mqtt(device, sensor) + + if data and is_recent_data(data['time']): + insert_data(data, sensor.type.name) else: - print(f"No recent data available for {device.name}. Skipping insertion.") - else: - print(f"No data available for {device.name}. Skipping insertion.") + print(f"No recent or valid data for {device.name}. Skipping.") +@periodic_task(crontab(minute='*/5')) +def last_5_minutes(): + """Fetch the last 5 readings from TimescaleDB and store them in Redis.""" + try: + with psycopg2.connect(settings.CONNECTION_STRING) as conn: + with conn.cursor() as cursor: + cursor.execute(""" + SELECT time, device_name, metric, value + FROM sensor_readings + ORDER BY time DESC + LIMIT 5; + """) + results = cursor.fetchall() + + data = [ + { + "time": reading[0].isoformat(), + "device": reading[1], + "metric": reading[2], + "value": reading[3] + } + for reading in results + ] + redis_client.set("last5", json.dumps(data)) + print("Last 5 readings:", data) + except Exception as e: + print(f"Error fetching or storing the last 5 readings: {e}") # Initialize device data in Redis devices_to_redis() diff --git a/iotDashboard/templates/chart.html b/iotDashboard/templates/chart.html index 09b3707..2079096 100644 --- a/iotDashboard/templates/chart.html +++ b/iotDashboard/templates/chart.html @@ -1,222 +1,144 @@ - - Conditions Chart with Chart.js - - + IoT Sensor Dashboard + + + +

{{ gpt }}

- - +

IoT Sensor Data Dashboard

-
-

Temperature and Humidity Over Time

+ +
+ +
- -
-

Current Conditions

-
Loading...
-
Loading...
-
- - -
-
- -
-
- - -
-
-
- - -
-
- - -
-
-
- - -
-
-
- -
-
-
+ +
+
- - - - diff --git a/iotDashboard/templates/device_form.html b/iotDashboard/templates/device_form.html index ff9c424..e296695 100644 --- a/iotDashboard/templates/device_form.html +++ b/iotDashboard/templates/device_form.html @@ -5,9 +5,11 @@ {% if form.instance.pk %}Edit{% else %}Add{% endif %} Device + + -