Refactored and updated tasks.py

This commit is contained in:
ferdzo
2024-08-29 23:40:01 +02:00
parent a3b61d108c
commit 3e74468dee

View File

@@ -1,78 +1,91 @@
import json
import redis
from huey import crontab
from huey.contrib.djhuey import periodic_task
import requests
import psycopg2
import datetime
import requests
from django.conf import settings
from huey import crontab
from huey.contrib.djhuey import periodic_task
from .models import Device
import json
# Initialize Redis client
redis_client = redis.StrictRedis(host='10.10.0.1', port=6379, db=0)
def devices_to_redis():
devices = Device.objects.all()
# Convert devices to a list of dictionaries
devices_list = []
for device in devices:
devices_list.append({
def devices_to_redis():
"""Fetch all devices from Django and store them in Redis."""
devices = Device.objects.all()
devices_list = [
{
'id': device.id,
'name': device.name,
'protocol': device.protocol,
'ip': device.ip,
})
# Store in Redis
}
for device in devices
]
redis_client.set('devices', json.dumps(devices_list))
devices_to_redis()
def fetch_data_http(device):
"""Fetch temperature and humidity data from an HTTP sensor."""
data = {
"time": datetime.datetime.now(),
"time": datetime.datetime.now().isoformat(),
"device": device.name,
}
r = requests.get(f"http://{device.ip}/sensor/tempreature")
data["temperature"] = r.json()['value']
r = requests.get(f"http://{device.ip}/sensor/humidity")
data["humidity"] = r.json()['value']
try:
temperature_response = requests.get(f"http://{device.ip}/sensor/temperature")
humidity_response = requests.get(f"http://{device.ip}/sensor/humidity")
data["temperature"] = temperature_response.json().get('value')
data["humidity"] = humidity_response.json().get('value')
except requests.RequestException as e:
print(f"HTTP request failed: {e}")
return data
def fetch_data_mqtt(device):
data = redis_client.get(device).decode('utf-8')
data = json.loads(data).get(device)
def fetch_data_mqtt(device_name):
"""Fetch data from Redis for a specific MQTT device."""
data = redis_client.get(device_name)
if data:
print(data)
mqtt_data = data
# Ensure the data is recent
if datetime.datetime.fromisoformat(mqtt_data["time"]) > datetime.datetime.now() - datetime.timedelta(minutes=2):
return mqtt_data
data = json.loads(data.decode('utf-8')).get(device_name)
if data and datetime.datetime.fromisoformat(data["time"]) > datetime.datetime.now() - datetime.timedelta(
minutes=2):
return data
return None
def insert_data(data):
"""Insert data into the PostgreSQL database."""
with psycopg2.connect(settings.CONNECTION_STRING) as conn:
cursor = conn.cursor()
insert_query = """
INSERT INTO conditions (time, device, temperature, humidity)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(insert_query, (data["time"], data["device"], data["temperature"], data["humidity"]))
conn.commit()
with conn.cursor() as cursor:
insert_query = """
INSERT INTO conditions (time, device, temperature, humidity)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(insert_query, (data["time"], data["device"], data["temperature"], data["humidity"]))
conn.commit()
@periodic_task(crontab(minute='*/1'))
def fetch_data_from_all_devices():
"""Fetch and insert data for all devices based on their protocol."""
devices = Device.objects.all()
for device in devices:
data = None
if device.protocol == 'http':
data = fetch_data_http(device)
insert_data(data)
elif device.protocol == 'mqtt':
# Assume data is already in mqtt_data dictionary
data = fetch_data_mqtt(device.name)
print(data)
if data and datetime.datetime.strptime(data["time"],"%Y-%m-%d %H:%M:%S.%f") > datetime.datetime.now() - datetime.timedelta(minutes=1):
if data:
data_time = datetime.datetime.fromisoformat(data["time"])
if data_time > datetime.datetime.now() - datetime.timedelta(minutes=1):
insert_data(data)
else:
print(f"No complete data available for {device.name}. Skipping insertion.")
print(f"No recent data available for {device.name}. Skipping insertion.")
else:
print(f"No data available for {device.name}. Skipping insertion.")
# Initialize device data in Redis
devices_to_redis()