mirror of
https://github.com/ferdzo/iotDashboard.git
synced 2026-04-04 16:56:25 +00:00
Small changes, removed comments, added env sample
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -6,5 +6,5 @@ demo.db
|
|||||||
demo.db-shm
|
demo.db-shm
|
||||||
demo.db-wal
|
demo.db-wal
|
||||||
/iotDashboard/demo1.db
|
/iotDashboard/demo1.db
|
||||||
/__pycache__/
|
**/__pycache__/
|
||||||
iotDashboard/db_create.py
|
iotDashboard/db_create.py
|
||||||
|
|||||||
6
iotDashboard/.env.sample
Normal file
6
iotDashboard/.env.sample
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
PASSWORD='postgres'
|
||||||
|
SECRET_KEY='django_key'
|
||||||
|
CONNECTION_STRING='postgres://postgres:postgres@localhost:5555/example'
|
||||||
|
REDIS_HOST="localhost"
|
||||||
|
MQTT_BROKER="localhost"
|
||||||
|
OPENAI_API_KEY="example"
|
||||||
@@ -12,7 +12,6 @@ from dotenv import load_dotenv
|
|||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
# Initialize Redis client
|
|
||||||
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') # Default to localhost if not set
|
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') # Default to localhost if not set
|
||||||
try:
|
try:
|
||||||
redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0)
|
redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0)
|
||||||
@@ -46,10 +45,9 @@ def fetch_data_http(device, sensor):
|
|||||||
"""Fetch data from an HTTP sensor."""
|
"""Fetch data from an HTTP sensor."""
|
||||||
sensor_type_name = sensor.type.name.lower()
|
sensor_type_name = sensor.type.name.lower()
|
||||||
try:
|
try:
|
||||||
# Make the request to the device's HTTP endpoint
|
|
||||||
response = requests.get(f"http://{device.ip}/sensor/{sensor_type_name}", timeout=5)
|
response = requests.get(f"http://{device.ip}/sensor/{sensor_type_name}", timeout=5)
|
||||||
response.raise_for_status() # Raise an exception for any non-200 status codes
|
response.raise_for_status()
|
||||||
sensor_value = response.json().get('value') # Assuming the JSON response structure
|
sensor_value = response.json().get('value')
|
||||||
if sensor_value is not None:
|
if sensor_value is not None:
|
||||||
return {
|
return {
|
||||||
"time": datetime.datetime.utcnow().isoformat(),
|
"time": datetime.datetime.utcnow().isoformat(),
|
||||||
@@ -67,12 +65,11 @@ def fetch_data_http(device, sensor):
|
|||||||
def fetch_data_mqtt_stream(device, sensor):
|
def fetch_data_mqtt_stream(device, sensor):
|
||||||
"""Fetch data from Redis Stream for a specific MQTT device and sensor."""
|
"""Fetch data from Redis Stream for a specific MQTT device and sensor."""
|
||||||
sensor_name = sensor.type.name.lower()
|
sensor_name = sensor.type.name.lower()
|
||||||
stream_key = f"mqtt_stream:{device.name}:{sensor_name}" # Key format for the stream
|
stream_key = f"mqtt_stream:{device.name}:{sensor_name}"
|
||||||
try:
|
try:
|
||||||
# Read from the Redis stream, waiting for new data with blocking if necessary
|
|
||||||
stream_data = redis_client.xread({stream_key: '0-0'}, block=1000, count=1)
|
stream_data = redis_client.xread({stream_key: '0-0'}, block=1000, count=1)
|
||||||
if stream_data:
|
if stream_data:
|
||||||
_, entries = stream_data[0] # Get the entries from the stream
|
_, entries = stream_data[0]
|
||||||
for entry_id, entry_data in entries:
|
for entry_id, entry_data in entries:
|
||||||
sensor_value = entry_data.get(b'value')
|
sensor_value = entry_data.get(b'value')
|
||||||
timestamp = entry_data.get(b'time')
|
timestamp = entry_data.get(b'time')
|
||||||
@@ -174,5 +171,4 @@ def last_5_minutes():
|
|||||||
print(f"Error fetching or storing the last 5 readings: {e}")
|
print(f"Error fetching or storing the last 5 readings: {e}")
|
||||||
|
|
||||||
|
|
||||||
# Initialize device data in Redis
|
|
||||||
devices_to_redis()
|
devices_to_redis()
|
||||||
|
|||||||
@@ -10,7 +10,10 @@ from dotenv import load_dotenv
|
|||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
# Set up Redis client
|
# Set up Redis client
|
||||||
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') # Default to localhost if not set
|
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
|
||||||
|
|
||||||
|
MQTT_PASS=os.getenv("MQTT_PASS")
|
||||||
|
MQTT_USER=os.getenv("MQTT_USER")
|
||||||
try:
|
try:
|
||||||
redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0)
|
redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0)
|
||||||
print(redis_client)
|
print(redis_client)
|
||||||
@@ -22,8 +25,7 @@ except Exception as ex:
|
|||||||
exit('Failed to connect, terminating.')
|
exit('Failed to connect, terminating.')
|
||||||
|
|
||||||
|
|
||||||
# MQTT broker address
|
MQTT_BROKER = os.getenv('MQTT_BROKER', 'localhost')
|
||||||
MQTT_BROKER = os.getenv('MQTT_BROKER', 'localhost') # Default to localhost if not set
|
|
||||||
mqtt_data = {}
|
mqtt_data = {}
|
||||||
|
|
||||||
|
|
||||||
@@ -38,8 +40,7 @@ def get_mqtt_devices():
|
|||||||
def build_device_map():
|
def build_device_map():
|
||||||
"""Build a mapping of device endpoints to friendly names."""
|
"""Build a mapping of device endpoints to friendly names."""
|
||||||
devices = get_mqtt_devices()
|
devices = get_mqtt_devices()
|
||||||
return {device['topic'].split('/')[0]: device['device_name'] for device in devices} # Assuming topic starts with
|
return {device['topic'].split('/')[0]: device['device_name'] for device in devices}
|
||||||
# device name
|
|
||||||
|
|
||||||
|
|
||||||
def publish_to_stream(stream_name, data):
|
def publish_to_stream(stream_name, data):
|
||||||
@@ -54,19 +55,16 @@ def publish_to_stream(stream_name, data):
|
|||||||
def on_message(client, userdata, msg):
|
def on_message(client, userdata, msg):
|
||||||
"""Handle incoming messages from MQTT broker."""
|
"""Handle incoming messages from MQTT broker."""
|
||||||
try:
|
try:
|
||||||
# Parse the incoming message topic
|
|
||||||
topic_parts = msg.topic.split('/')
|
topic_parts = msg.topic.split('/')
|
||||||
device_endpoint = topic_parts[0] # This is the actual endpoint name
|
device_endpoint = topic_parts[0]
|
||||||
sensor_type = topic_parts[2] # Assuming sensor type is in the third part
|
sensor_type = topic_parts[2]
|
||||||
|
|
||||||
sensor_value = float(msg.payload.decode())
|
sensor_value = float(msg.payload.decode())
|
||||||
print(f"Received message from {device_endpoint}, sensor {sensor_type}: {sensor_value}")
|
print(f"Received message from {device_endpoint}, sensor {sensor_type}: {sensor_value}")
|
||||||
|
|
||||||
# Build the device map to get the friendly device name
|
|
||||||
device_map = build_device_map()
|
device_map = build_device_map()
|
||||||
device_name = device_map.get(device_endpoint, device_endpoint) # Fallback to endpoint if not found
|
device_name = device_map.get(device_endpoint, device_endpoint)
|
||||||
|
|
||||||
# Initialize device data if it's the first sensor reading
|
|
||||||
if device_name not in mqtt_data:
|
if device_name not in mqtt_data:
|
||||||
mqtt_data[device_name] = {
|
mqtt_data[device_name] = {
|
||||||
"time": datetime.utcnow().isoformat(),
|
"time": datetime.utcnow().isoformat(),
|
||||||
@@ -74,11 +72,9 @@ def on_message(client, userdata, msg):
|
|||||||
"sensors": {}
|
"sensors": {}
|
||||||
}
|
}
|
||||||
|
|
||||||
# Update the sensor value in the mqtt_data dictionary
|
|
||||||
mqtt_data[device_name]["sensors"][sensor_type] = sensor_value
|
mqtt_data[device_name]["sensors"][sensor_type] = sensor_value
|
||||||
mqtt_data[device_name]["time"] = datetime.utcnow().isoformat()
|
mqtt_data[device_name]["time"] = datetime.utcnow().isoformat()
|
||||||
|
|
||||||
# Publish to Redis Stream (adjust as needed to reflect the correct stream name)
|
|
||||||
publish_to_stream(device_name, mqtt_data[device_name])
|
publish_to_stream(device_name, mqtt_data[device_name])
|
||||||
print(f"Updated data for {device_name}: {mqtt_data[device_name]}")
|
print(f"Updated data for {device_name}: {mqtt_data[device_name]}")
|
||||||
|
|
||||||
@@ -109,6 +105,7 @@ def start_mqtt_client():
|
|||||||
client.on_connect = on_connect
|
client.on_connect = on_connect
|
||||||
client.on_disconnect = on_disconnect
|
client.on_disconnect = on_disconnect
|
||||||
client.on_message = on_message
|
client.on_message = on_message
|
||||||
|
client.username_pw_set(MQTT_USER,MQTT_PASS)
|
||||||
client.connect(MQTT_BROKER)
|
client.connect(MQTT_BROKER)
|
||||||
|
|
||||||
client.loop_start()
|
client.loop_start()
|
||||||
@@ -116,7 +113,7 @@ def start_mqtt_client():
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
time.sleep(10) # Sleep to prevent high CPU usage
|
time.sleep(10)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
print("Script interrupted by user")
|
print("Script interrupted by user")
|
||||||
finally:
|
finally:
|
||||||
|
|||||||
@@ -24,7 +24,6 @@ def publish_to_stream(stream_name, data):
|
|||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# Example sensor data to publish
|
|
||||||
mqtt_data = {
|
mqtt_data = {
|
||||||
"time": datetime.utcnow().isoformat(),
|
"time": datetime.utcnow().isoformat(),
|
||||||
"device": "Livingroom",
|
"device": "Livingroom",
|
||||||
@@ -32,6 +31,5 @@ if __name__ == "__main__":
|
|||||||
"value": 25.6
|
"value": 25.6
|
||||||
}
|
}
|
||||||
|
|
||||||
# Publish to the stream
|
|
||||||
publish_to_stream(STREAM_NAME, mqtt_data)
|
publish_to_stream(STREAM_NAME, mqtt_data)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user