Added full support for MQTT. Added Redis as a way to share data between MQTT service and the database update service.

This commit is contained in:
ferdzo
2024-08-29 23:23:49 +02:00
parent f88200f8cc
commit 96a32d6ce6
3 changed files with 66 additions and 76 deletions

View File

@@ -9,7 +9,7 @@ https://docs.djangoproject.com/en/4.2/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/4.2/ref/settings/
"""
import environ
from dotenv import load_dotenv
from pathlib import Path
import os
from huey import SqliteHuey
@@ -18,16 +18,17 @@ from huey import SqliteHuey
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
env = environ.Env(DEBUG=True)
load_dotenv()
environ.Env.read_env()
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/4.2/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = env('SECRET_KEY')
CONNECTION_STRING = env('CONNECTION_STRING')
SECRET_KEY = os.getenv('SECRET_KEY')
CONNECTION_STRING = os.getenv('CONNECTION_STRING')
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
@@ -92,7 +93,7 @@ DATABASES = {
"ENGINE": "django.db.backends.postgresql",
"NAME" : "example",
"USER": "postgres",
"PASSWORD": env('PASSWORD'),
"PASSWORD": os.getenv('PASSWORD'),
"HOST": '10.10.0.1',
"PORT": '5555',
}

View File

@@ -1,37 +1,78 @@
import psycopg2
import requests
import redis
from huey import crontab
from huey.contrib.djhuey import periodic_task
from datetime import datetime
import psycopg2
import datetime
import requests
from django.conf import settings
from .models import Device # Import your Device model
from .models import Device
import json
# Fetch data from the device using REST API
def fetch_data_from_device(device):
data = dict()
data["time"] = datetime.now()
data["device"] = device.name # Use device name
redis_client = redis.StrictRedis(host='10.10.0.1', port=6379, db=0)
def devices_to_redis():
devices = Device.objects.all()
# Convert devices to a list of dictionaries
devices_list = []
for device in devices:
devices_list.append({
'id': device.id,
'name': device.name,
'protocol': device.protocol,
'ip': device.ip,
})
# Store in Redis
redis_client.set('devices', json.dumps(devices_list))
devices_to_redis()
def fetch_data_http(device):
data = {
"time": datetime.datetime.now(),
"device": device.name,
}
r = requests.get(f"http://{device.ip}/sensor/tempreature")
data["temperature"] = r.json()['value']
r = requests.get(f"http://{device.ip}/sensor/humidity")
data["humidity"] = r.json()['value']
return (data["time"], data["device"], data["temperature"], data["humidity"])
return data
# Insert data into the database
def insert_data(device):
data = fetch_data_from_device(device)
with psycopg2.connect(settings.CONNECTION_STRING) as conn: # Use Django's connection string
def fetch_data_mqtt(device):
data = redis_client.get(device).decode('utf-8')
data = json.loads(data).get(device)
if data:
print(data)
mqtt_data = data
# Ensure the data is recent
if datetime.datetime.fromisoformat(mqtt_data["time"]) > datetime.datetime.now() - datetime.timedelta(minutes=2):
return mqtt_data
return None
def insert_data(data):
with psycopg2.connect(settings.CONNECTION_STRING) as conn:
cursor = conn.cursor()
insert_query = """
INSERT INTO conditions (time, device, temperature, humidity)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(insert_query, data)
cursor.execute(insert_query, (data["time"], data["device"], data["temperature"], data["humidity"]))
conn.commit()
# Periodic task to fetch data from all devices every minute
@periodic_task(crontab(minute='*/1'))
def fetch_data_from_all_devices():
devices = Device.objects.all() # Fetch all devices from the database
devices = Device.objects.all()
for device in devices:
insert_data(device)
if device.protocol == 'http':
data = fetch_data_http(device)
insert_data(data)
elif device.protocol == 'mqtt':
# Assume data is already in mqtt_data dictionary
data = fetch_data_mqtt(device.name)
print(data)
if data and datetime.datetime.strptime(data["time"],"%Y-%m-%d %H:%M:%S.%f") > datetime.datetime.now() - datetime.timedelta(minutes=1):
insert_data(data)
else:
print(f"No complete data available for {device.name}. Skipping insertion.")

View File

@@ -1,52 +0,0 @@
import dotenv
import psycopg2
from psycopg2 import sql
from datetime import datetime
import requests
from huey import SqliteHuey, crontab
from dotenv import load_dotenv
from pathlib import Path
# Initialize scheduler
huey = SqliteHuey(filename='demo.db')
dotenv_path = Path("iotDashboard/.env")
load_dotenv(dotenv_path=dotenv_path)
CONNECTION = dotenv.dotenv_values(dotenv_path)["CONNECTION_STRING"]
# Database connection
conn = psycopg2.connect(CONNECTION)
# Devices
devices = {"livingroom": "192.168.244.131"}
# Func for fetching data from device using REST API
def fetch_data_from_device(device):
data = dict()
data["time"] = datetime.now()
data["device"] = device
r = requests.get("http://" + devices[device] + "/sensor/tempreature")
data["temperature"] = r.json()['value']
r = requests.get("http://" + devices[device] + "/sensor/humidity")
data["humidity"] = r.json()['value']
return (data["time"], data["device"], data["temperature"], data["humidity"])
# Func for inserting data to database
def insert_data(conn, device):
data = fetch_data_from_device(device)
cursor = conn.cursor()
insert_query = sql.SQL(
"INSERT INTO conditions (time, device, temperature, humidity) "
"VALUES (%s, %s, %s, %s)"
)
cursor.execute(insert_query, data)
conn.commit()
cursor.close()
print("Done")
@huey.periodic_task(crontab(minute='*/1'))
def test():
for device in devices:
insert_data(conn, device)