This commit is contained in:
ferdzo
2024-10-14 16:18:41 +02:00
parent 4f9c7e8feb
commit 175f54e14e
8 changed files with 107 additions and 88 deletions

3
.gitignore vendored
View File

@@ -1,9 +1,10 @@
/iotDashboard/.env /iotDashboard/.env
.env
/.idea /.idea
db.sqlite3 db.sqlite3
demo.db demo.db
demo.db-shm demo.db-shm
demo.db-wal demo.db-wal
/iotDashboard/demo1.db /iotDashboard/demo1.db
__pycache__/ /__pycache__/
iotDashboard/db_create.py iotDashboard/db_create.py

View File

@@ -1,5 +1,5 @@
import json import json
import os
import redis import redis
from dotenv import load_dotenv from dotenv import load_dotenv
from openai import OpenAI from openai import OpenAI
@@ -7,7 +7,8 @@ from openai import OpenAI
load_dotenv() load_dotenv()
client = OpenAI() client = OpenAI()
redis_client = redis.StrictRedis(host='10.10.0.1', port=6379, db=0) REDIS_HOST=os.getenv('REDIS_HOST')
redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0)
data = redis_client.get("last5").decode("utf-8") data = redis_client.get("last5").decode("utf-8")

View File

@@ -1,54 +0,0 @@
import psycopg2
from psycopg2 import sql
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Define your database connection parameters
DATABASE_NAME = os.getenv('DB_NAME', 'example')
USER = os.getenv('DB_USER', 'postgres')
PASSWORD = os.getenv('DB_PASSWORD', 'coolermaster')
HOST = os.getenv('DB_HOST', '10.10.0.1')
PORT = os.getenv('DB_PORT', '5555')
def create_sensor_readings_table():
"""Create the sensor_readings table if it does not exist."""
try:
# Establish connection to the database
conn = psycopg2.connect(
dbname=DATABASE_NAME,
user=USER,
password=PASSWORD,
host=HOST,
port=PORT
)
with conn.cursor() as cursor:
# SQL command to create the sensor_readings table
create_table_query = """
CREATE TABLE IF NOT EXISTS sensor_readings (
time TIMESTAMPTZ NOT NULL,
device_name VARCHAR(255) NOT NULL, -- Use device_name as a string
metric VARCHAR(50) NOT NULL, -- Type of sensor
value DOUBLE PRECISION NOT NULL, -- The sensor's value
PRIMARY KEY (time, device_name, metric) -- Composite primary key
);
"""
cursor.execute(create_table_query)
print("Table 'sensor_readings' created or already exists.")
# Commit changes
conn.commit()
except Exception as e:
print(f"Error during database operations: {e}")
finally:
if conn:
conn.close()
print("Database connection closed.")
if __name__ == "__main__":
create_sensor_readings_table()

View File

@@ -1,5 +1,5 @@
from django import forms from django import forms
from .models import Device, Sensor, SensorType from iotDashboard.models import Device, Sensor, SensorType
class DeviceForm(forms.ModelForm): class DeviceForm(forms.ModelForm):

View File

@@ -1,15 +1,29 @@
import json import json
import datetime import datetime
import os
import requests import requests
import psycopg2 import psycopg2
import redis import redis
from django.conf import settings from django.conf import settings
from huey import crontab from huey import crontab
from huey.contrib.djhuey import periodic_task from huey.contrib.djhuey import periodic_task
from .models import Device, Sensor, SensorType from .models import Device
from dotenv import load_dotenv
load_dotenv()
# Initialize Redis client # Initialize Redis client
redis_client = redis.StrictRedis(host='10.10.0.1', port=6379, db=0) REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') # Default to localhost if not set
try:
redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0)
print(redis_client)
redis_client.ping()
print('Connected!')
except Exception as ex:
print
'Error:', ex
exit('Failed to connect, terminating.')
def devices_to_redis(): def devices_to_redis():
@@ -50,25 +64,27 @@ def fetch_data_http(device, sensor):
return None return None
def fetch_data_mqtt(device, sensor): def fetch_data_mqtt_stream(device, sensor):
"""Fetch data from Redis for a specific MQTT device and sensor.""" """Fetch data from Redis Stream for a specific MQTT device and sensor."""
# Get the data for the specific device from Redis sensor_name = sensor.type.name.lower()
data = redis_client.get(device.name) # Assumes device.name is the Redis key stream_key = f"mqtt_stream:{device.name}:{sensor_name}" # Key format for the stream
if data: try:
data = json.loads(data.decode('utf-8')) # Read from the Redis stream, waiting for new data with blocking if necessary
stream_data = redis_client.xread({stream_key: '0-0'}, block=1000, count=1)
if stream_data:
_, entries = stream_data[0] # Get the entries from the stream
for entry_id, entry_data in entries:
sensor_value = entry_data.get(b'value')
timestamp = entry_data.get(b'time')
# Normalize the sensor name to lowercase for lookup if sensor_value and timestamp:
sensor_name = sensor.type.name.lower() return {
sensor_value = data['sensors'].get(sensor_name) "time": timestamp.decode('utf-8'),
"device": device.name,
if sensor_value is not None and is_recent_data(data['time']): "sensor_value": float(sensor_value.decode('utf-8'))
return { }
"time": data['time'], except Exception as e:
"device": device.name, print(f"Error fetching data from stream {stream_key}: {e}")
"sensor_value": sensor_value
}
print(data)
return None return None
@@ -121,7 +137,7 @@ def fetch_data_from_all_devices():
if device.protocol == 'http': if device.protocol == 'http':
data = fetch_data_http(device, sensor) data = fetch_data_http(device, sensor)
elif device.protocol == 'mqtt': elif device.protocol == 'mqtt':
data = fetch_data_mqtt(device, sensor) data = fetch_data_mqtt_stream(device, sensor)
if data and is_recent_data(data['time']): if data and is_recent_data(data['time']):
insert_data(data, sensor.type.name) insert_data(data, sensor.type.name)

View File

@@ -5,7 +5,7 @@ from django.http import JsonResponse, HttpResponse
from django.shortcuts import render, redirect, get_object_or_404 from django.shortcuts import render, redirect, get_object_or_404
from .forms import DeviceForm, SensorWithTypeForm from .forms import DeviceForm, SensorWithTypeForm
from .models import Device, Sensor from iotDashboard.models import Device, Sensor
redis_client = redis.StrictRedis(host='10.10.0.1', port=6379, db=0) redis_client = redis.StrictRedis(host='10.10.0.1', port=6379, db=0)

View File

@@ -10,12 +10,20 @@ from dotenv import load_dotenv
load_dotenv() load_dotenv()
# Set up Redis client # Set up Redis client
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') # Default to localhost if not set
redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0) try:
print("Connected to Redis Server") redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0)
print(redis_client)
redis_client.ping()
print('Connected!')
except Exception as ex:
print
'Error:', ex
exit('Failed to connect, terminating.')
# MQTT broker address # MQTT broker address
MQTT_BROKER = os.getenv('MQTT_BROKER') MQTT_BROKER = os.getenv('MQTT_BROKER', 'localhost') # Default to localhost if not set
mqtt_data = {} mqtt_data = {}
@@ -30,8 +38,17 @@ def get_mqtt_devices():
def build_device_map(): def build_device_map():
"""Build a mapping of device endpoints to friendly names.""" """Build a mapping of device endpoints to friendly names."""
devices = get_mqtt_devices() devices = get_mqtt_devices()
return {device['topic'].split('/')[0]: device['device_name'] for device in return {device['topic'].split('/')[0]: device['device_name'] for device in devices} # Assuming topic starts with
devices} # Assuming topic starts with device name # device name
def publish_to_stream(stream_name, data):
"""Append a message to Redis Stream."""
try:
redis_client.xadd(stream_name, data)
print(f"Published to Redis Stream '{stream_name}': {data}")
except redis.RedisError as e:
print(f"Error writing to Redis Stream: {e}")
def on_message(client, userdata, msg): def on_message(client, userdata, msg):
@@ -49,6 +66,7 @@ def on_message(client, userdata, msg):
device_map = build_device_map() device_map = build_device_map()
device_name = device_map.get(device_endpoint, device_endpoint) # Fallback to endpoint if not found device_name = device_map.get(device_endpoint, device_endpoint) # Fallback to endpoint if not found
# Initialize device data if it's the first sensor reading
if device_name not in mqtt_data: if device_name not in mqtt_data:
mqtt_data[device_name] = { mqtt_data[device_name] = {
"time": datetime.utcnow().isoformat(), "time": datetime.utcnow().isoformat(),
@@ -60,8 +78,8 @@ def on_message(client, userdata, msg):
mqtt_data[device_name]["sensors"][sensor_type] = sensor_value mqtt_data[device_name]["sensors"][sensor_type] = sensor_value
mqtt_data[device_name]["time"] = datetime.utcnow().isoformat() mqtt_data[device_name]["time"] = datetime.utcnow().isoformat()
# Store the updated data structure in Redis # Publish to Redis Stream (adjust as needed to reflect the correct stream name)
redis_client.set(device_name, json.dumps(mqtt_data[device_name])) publish_to_stream(device_name, mqtt_data[device_name])
print(f"Updated data for {device_name}: {mqtt_data[device_name]}") print(f"Updated data for {device_name}: {mqtt_data[device_name]}")
except ValueError as e: except ValueError as e:

37
proba_publish.py Normal file
View File

@@ -0,0 +1,37 @@
import os
from datetime import datetime
import redis
from dotenv import load_dotenv
load_dotenv()
REDIS_HOST = os.getenv('REDIS_HOST')
try:
redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0)
except redis.RedisError as e:
raise e
STREAM_NAME = 'sensor_data_stream'
def publish_to_stream(stream_name, data):
"""Publish a message to the Redis Stream."""
try:
redis_client.xadd(stream_name, data)
print(f"Published to Redis Stream '{stream_name}': {data}")
except redis.RedisError as e:
print(f"Error writing to Redis Stream: {e}")
if __name__ == "__main__":
# Example sensor data to publish
mqtt_data = {
"time": datetime.utcnow().isoformat(),
"device": "Livingroom",
"metric": "temperature",
"value": 25.6
}
# Publish to the stream
publish_to_stream(STREAM_NAME, mqtt_data)