From 39a9f91cfcc7af2b68b125c8da6a6e9939e6fd44 Mon Sep 17 00:00:00 2001 From: ferdzo Date: Sun, 12 Oct 2025 20:21:04 +0200 Subject: [PATCH] Rewrite start --- .env.sample | 20 ++++ .github/copilot-instructions.md | 66 +++++++++++++ .github/prompts/planing.prompt.md | 19 ++++ .gitignore | 1 + infrastrcture/compose.yml | 21 ++++ infrastrcture/mosquitto/mosquitto.conf | 0 requirements.txt | 8 ++ services/mqtt_ingestion/config.py | 47 +++++++++ services/mqtt_ingestion/main.py | 119 +++++++++++++++++++++++ services/mqtt_ingestion/mqtt_client.py | 88 +++++++++++++++++ services/mqtt_ingestion/redis_writer.py | 37 +++++++ services/mqtt_ingestion/requirements.txt | 2 + 12 files changed, 428 insertions(+) create mode 100644 .env.sample create mode 100644 .github/copilot-instructions.md create mode 100644 .github/prompts/planing.prompt.md create mode 100644 infrastrcture/compose.yml create mode 100644 infrastrcture/mosquitto/mosquitto.conf create mode 100644 requirements.txt create mode 100644 services/mqtt_ingestion/config.py create mode 100644 services/mqtt_ingestion/main.py create mode 100644 services/mqtt_ingestion/mqtt_client.py create mode 100644 services/mqtt_ingestion/redis_writer.py create mode 100644 services/mqtt_ingestion/requirements.txt diff --git a/.env.sample b/.env.sample new file mode 100644 index 0000000..d023000 --- /dev/null +++ b/.env.sample @@ -0,0 +1,20 @@ +# Django Settings +SECRET_KEY=your-secret-key-here +DEBUG=True + +# Database +CONNECTION_STRING=postgresql://postgres:password@localhost:5432/iotdashboard + +# Redis Configuration +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_DB=0 + +# MQTT Configuration +MQTT_BROKER=localhost +MQTT_PORT=1883 +MQTT_USER=mqtt_user +MQTT_PASS=mqtt_password + +# GPT/OpenAI (Optional) +OPENAI_API_KEY=your-openai-key-here \ No newline at end of file diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 0000000..3fc0b64 --- /dev/null +++ b/.github/copilot-instructions.md @@ -0,0 +1,66 @@ +The following concise instructions help AI coding agents become productive in this repository. + +Purpose +- This repo is a small Django-based IoT dashboard that ingests sensor data via MQTT, stores metadata in Django models, temporarily queues messages in Redis (streams/hashes), and persistently stores timeseries in Postgres/Timescale via background tasks (Huey). + +Big Picture +- Components: + - `iotDashboard/` — Django app (models, views, templates, tasks) + - `manage.py` — Django CLI + - `mqtt_service.py` — standalone MQTT client that subscribes to device topics and writes to Redis + - `tasks.py` — Huey periodic tasks that read Redis and write to Postgres + - Redis — used for device metadata (`mqtt_devices`), per-sensor streams and latest-value hashes + - Postgres/Timescale — final storage for `sensor_readings` table (raw SQL used in places) + +Key Files To Read First +- `iotDashboard/settings.py` — central settings; environment variables expected: `SECRET_KEY`, `CONNECTION_STRING`, `MQTT_BROKER`, `MQTT_USER`, `MQTT_PASS`, `REDIS_HOST`. +- `iotDashboard/models.py` — `Device`, `Sensor`, `SensorType`; these shape how devices and sensors are represented. +- `mqtt_service.py` — where MQTT messages are received and written to Redis. Important for stream naming and payload format. +- `iotDashboard/tasks.py` — Huey tasks that consume Redis and insert into the DB. Shows ingestion logic and timescale interactions. +- `iotDashboard/views.py` and `templates/chart.html` — how the UI reads `mqtt_latest`/Timescale data and what format it expects. + +Important Conventions & Patterns +- Redis usage: repo stores device metadata under `mqtt_devices` (JSON), and the code uses Redis streams and hashes inconsistently. When changing stream behavior, update both `mqtt_service.py` and `tasks.py` to remain compatible. +- Topic/Stream canonicalization: adopt a single convention: MQTT topic `devices/{device_id}/{sensor}` and Redis stream `mqtt_stream:{device_id}:{sensor}`. Latest-value hash pattern: `mqtt_latest:{device_id}`. +- No `requirements.txt` in repo; use `python-dotenv` + `redis`, `paho-mqtt`, `huey`, `psycopg2-binary`, `requests`, `Django` (4.2) — add a `requirements.txt` before running. +- Avoid import-time side-effects: `tasks.py` currently opens Redis and calls `devices_to_redis()` at import time — refactor to lazy init or a management command. + +Developer Workflows (commands & notes) +- Run Django dev server (use virtualenv and install deps): + - `pip install -r requirements.txt` (create this file if missing) + - `python manage.py migrate` + - `python manage.py runserver` +- Run MQTT service locally (requires Redis & MQTT broker): + - `python mqtt_service.py` + - Example publish: `mosquitto_pub -t "devices/esp32/test_temperature" -m "23.5"` +- Huey tasks: + - The project uses `huey.contrib.djhuey`; run workers with Django settings: `python manage.py run_huey` (ensure huey is installed and configured in `HUEY` setting). +- Inspect Redis during debugging: + - `redis-cli KEYS "mqtt*"` + - `redis-cli XREVRANGE mqtt_stream:mydevice:temperature + - COUNT 10` + - `redis-cli HGETALL mqtt_latest:mydevice` + +Integration Points & Gotchas +- Environment variables: many hosts/credentials are taken from `.env` via `python-dotenv`. If missing, code sometimes falls back to defaults or will raise at runtime. Add `.env` or set env vars in the system. +- DB access: `tasks.py` sometimes uses `psycopg2.connect(settings.CONNECTION_STRING)` while views use Django connections. If you change DB config, update both patterns or consolidate to Django connections. +- Topic parsing: `mqtt_service.py` expects at least 3 topic parts (it reads `topic_parts[2]`) — be defensive when editing. +- Stream payloads: `xadd` must receive simple string fields (no nested dicts). When changing stream layout, update the reader in `tasks.py` accordingly. +- Logging: repo uses `print` widely. Prefer converting prints to Python `logging` for maintainability. + +What AI agents should do first +- Do not change stream/topic names unless you update both `mqtt_service.py` and `tasks.py`. +- Remove import-time Redis initializations and `exit()` calls; move to lazily-created client getters or management commands. +- Centralize config in `settings.py` and import `from django.conf import settings` in scripts instead of hardcoded IPs. +- When making API/DB changes, prefer to update `views.py` and `tasks.py` together and add short integration tests using `pytest` and a Redis test double (or local docker-compose). + +Examples (copyable snippets) +- XADD to create canonical stream entry: + - `redis_client.xadd(f"mqtt_stream:{device_id}:{sensor}", {"value": str(sensor_value), "time": datetime.utcnow().isoformat()})` +- Create/read consumer group (ingest): + - `redis_client.xgroup_create(stream, "ingest", id="0", mkstream=True)` + - `entries = redis_client.xreadgroup("ingest", consumer_name, {stream: ">"}, count=10, block=5000)` + +If you add or change docs +- Update `README.md` with a simple `docker-compose.yml` recipe for Redis/Postgres/Mosquitto and document environment variables. Update `env.sample` with `REDIS_HOST`, `CONNECTION_STRING`, `MQTT_BROKER`, `MQTT_USER`, `MQTT_PASS`. + +If anything in these instructions looks off or incomplete for your current refactor, tell me what you'd like to focus on and I'll iterate. diff --git a/.github/prompts/planing.prompt.md b/.github/prompts/planing.prompt.md new file mode 100644 index 0000000..3875b7d --- /dev/null +++ b/.github/prompts/planing.prompt.md @@ -0,0 +1,19 @@ +--- +mode: agent +description: 'Start planning' +tools: ['getNotebookSummary', 'readNotebookCellOutput', 'search', 'getTerminalOutput', 'terminalSelection', 'terminalLastCommand', 'usages', 'vscodeAPI', 'think', 'problems', 'changes', 'testFailure', 'fetch', 'githubRepo', 'todos', 'get_issue', 'get_issue_comments', 'get_me','haystackFiles','haystackSearch'] +--- +Your goal is to prepare a detailed plan to fix the bug or add the new feature, for this you first need to: +* Understand the context of the bug or feature by reading the issue description and comments. +* Understand the codebase by reading the relevant instruction files. +* If its a bug, then identify the root cause of the bug, and explain this to the user. + +Based on your above understanding generate a plan to fix the bug or add the new feature. +Ensure the plan consists of a Markdown document that has the following sections: + +* Overview: A brief description of the bug/feature. +* Root Cause: A detailed explanation of the root cause of the bug, including any relevant code snippets or references to the codebase. (only if it's a bug) +* Requirements: A list of requirements to resolve the bug or add the new feature. +* Implementation Steps: A detailed list of steps to implement the bug fix or new feature. + +Remember, do not make any code edits, just generate a plan. Use thinking and reasoning skills to outline the steps needed to achieve the desired outcome. \ No newline at end of file diff --git a/.gitignore b/.gitignore index fd97a0b..18d88b3 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ demo.db-wal /iotDashboard/demo1.db **/__pycache__/ iotDashboard/db_create.py +.venv/ \ No newline at end of file diff --git a/infrastrcture/compose.yml b/infrastrcture/compose.yml new file mode 100644 index 0000000..b5d6c16 --- /dev/null +++ b/infrastrcture/compose.yml @@ -0,0 +1,21 @@ +services: + + redis: + image: redis:8 + ports: + - "6379:6379" + volumes: + - redis-data:/data + restart: unless-stopped + + mqtt: + image: eclipse-mosquitto:2.0 + ports: + - "1883:1883" + - "9001:9001" + volumes: + - ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf + restart: unless-stopped + +volumes: + redis-data: \ No newline at end of file diff --git a/infrastrcture/mosquitto/mosquitto.conf b/infrastrcture/mosquitto/mosquitto.conf new file mode 100644 index 0000000..e69de29 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..8dee5c0 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +Django==4.2.5 +redis==4.5.4 +paho-mqtt==1.6.1 +psycopg2-binary==2.9.7 +huey==2.4.5 +python-dotenv==1.0.0 +requests==2.31.0 +openai==1.3.0 \ No newline at end of file diff --git a/services/mqtt_ingestion/config.py b/services/mqtt_ingestion/config.py new file mode 100644 index 0000000..b928dcc --- /dev/null +++ b/services/mqtt_ingestion/config.py @@ -0,0 +1,47 @@ +import os +from dataclasses import dataclass +import dotenv +from typing import Optional + +dotenv.load_dotenv() + +@dataclass +class RedisConfig: + host: str + port: int = 6379 + db: int = 0 + password: Optional[str] = None + +@dataclass +class MQTTConfig: + broker: str + port: int = 1883 + username: Optional[str] = None + password: Optional[str] = None + topic: str = "#" + +@dataclass +class Payload: + device_id: str + sensor_type: str + value: float + timestamp: Optional[str] = None + + +class Config: + def __init__(self): + self.redis = RedisConfig( + host=os.getenv('REDIS_HOST', 'localhost'), + port=int(os.getenv('REDIS_PORT', 6379)), + db=int(os.getenv('REDIS_DB', 0)), + password=os.getenv('REDIS_PASSWORD', None) + ) + self.mqtt = MQTTConfig( + broker=os.getenv('MQTT_BROKER', 'localhost'), + port=int(os.getenv('MQTT_PORT', 1883)), + username=os.getenv('MQTT_USERNAME', None), + password=None, + topic="#" + ) + +config = Config() \ No newline at end of file diff --git a/services/mqtt_ingestion/main.py b/services/mqtt_ingestion/main.py new file mode 100644 index 0000000..caf0fbd --- /dev/null +++ b/services/mqtt_ingestion/main.py @@ -0,0 +1,119 @@ +import logging +import signal +import sys +from config import config +from mqtt_client import MQTTClient +from redis_writer import RedisWriter + +# Setup logging +logging.basicConfig( + level=getattr(logging,'INFO'), + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +class MQTTIngestionService: + def __init__(self): + self.running = False + self.redis_writer = None + self.mqtt_client = None + + # Setup signal handlers for graceful shutdown + signal.signal(signal.SIGTERM, self._signal_handler) + signal.signal(signal.SIGINT, self._signal_handler) + + def _signal_handler(self, signum, frame): + """Handle shutdown signals""" + logger.info(f"Received signal {signum}, shutting down...") + self.stop() + + def _handle_sensor_data(self, device_id: str, sensor_type: str, value: float): + """ + This function is called by MQTT client when a message arrives. + It just passes the data to Redis writer. + """ + success = self.redis_writer.write_sensor_data(device_id, sensor_type, value) + if success: + logger.info(f"Processed {device_id}/{sensor_type}: {value}") + else: + logger.error(f"Failed to process {device_id}/{sensor_type}: {value}") + + def start(self): + """Start the service""" + logger.info("Starting MQTT Ingestion Service...") + + try: + # Initialize Redis writer + self.redis_writer = RedisWriter() + + # Initialize MQTT client with our message handler + self.mqtt_client = MQTTClient(self._handle_sensor_data) + + # Connect to MQTT + if not self.mqtt_client.connect(): + logger.error("Failed to connect to MQTT, exiting") + return False + + self.running = True + logger.info("Service started successfully") + + # Start MQTT loop (this blocks) + self.mqtt_client.start_loop() + + except Exception as e: + logger.error(f"Service startup failed: {e}") + return False + + return True + + def stop(self): + """Stop the service gracefully""" + if not self.running: + return + + logger.info("Stopping service...") + self.running = False + + # Stop MQTT client + if self.mqtt_client: + self.mqtt_client.stop() + + # Close Redis connection + if self.redis_writer: + self.redis_writer.close() + + logger.info("Service stopped") + + def health_check(self) -> bool: + """Check if service is healthy""" + if not self.running: + return False + + # Check Redis connection + if not self.redis_writer or not self.redis_writer.health_check(): + return False + + return True + +def main(): + """Entry point""" + service = MQTTIngestionService( + redis_config=config.redis, + mqtt_config=config.mqtt + ) + + try: + # Start the service (blocks until shutdown) + success = service.start() + if not success: + sys.exit(1) + except KeyboardInterrupt: + logger.info("Received keyboard interrupt") + except Exception as e: + logger.error(f"Unexpected error: {e}") + sys.exit(1) + finally: + service.stop() + +if __name__ == "__main__": + main() diff --git a/services/mqtt_ingestion/mqtt_client.py b/services/mqtt_ingestion/mqtt_client.py new file mode 100644 index 0000000..6d4c882 --- /dev/null +++ b/services/mqtt_ingestion/mqtt_client.py @@ -0,0 +1,88 @@ +import logging +import paho.mqtt.client as mqtt +from typing import Callable +from config import config + +logger = logging.getLogger(__name__) + +class MQTTClient: + def __init__(self, message_handler: Callable[[str, str, float], None]): + """ + Args: + message_handler: Function that takes (device_id, sensor_type, value) + """ + self.message_handler = message_handler + self.client = mqtt.Client() + self._setup_callbacks() + + def _setup_callbacks(self): + self.client.on_connect = self._on_connect + self.client.on_message = self._on_message + self.client.on_disconnect = self._on_disconnect + + # Set credentials if provided + if config.mqtt.username: + self.client.username_pw_set( + config.mqtt.username, + config.mqtt.password + ) + + def _on_connect(self, client, userdata, flags, rc): + if rc == 0: + logger.info(f"Connected to MQTT broker {config.mqtt.broker}") + # Subscribe to all device topics: devices/+/+ + client.subscribe(config.mqtt.topic_pattern) + logger.info(f"Subscribed to {config.mqtt.topic_pattern}") + else: + logger.error(f"Failed to connect to MQTT broker, code: {rc}") + + def _on_message(self, client, userdata, msg): + try: + # Parse topic: devices/{device_id}/{sensor_type} + topic_parts = msg.topic.split('/') + if len(topic_parts) != 3 or topic_parts[0] != 'devices': + logger.warning(f"Invalid topic format: {msg.topic}") + return + + device_id = topic_parts[1] + sensor_type = topic_parts[2] + + # Parse payload as float + try: + value = float(msg.payload.decode()) + except ValueError: + logger.error(f"Invalid payload for {msg.topic}: {msg.payload}") + return + + # Call the handler (this will be our Redis writer) + self.message_handler(device_id, sensor_type, value) + + except Exception as e: + logger.error(f"Error processing MQTT message: {e}") + + def _on_disconnect(self, client, userdata, rc): + if rc != 0: + logger.warning("Unexpected MQTT disconnection") + else: + logger.info("MQTT client disconnected") + + def connect(self): + """Connect to MQTT broker""" + try: + self.client.connect( + config.mqtt.broker, + config.mqtt.port, + config.mqtt.keepalive + ) + return True + except Exception as e: + logger.error(f"Failed to connect to MQTT: {e}") + return False + + def start_loop(self): + """Start the MQTT loop (blocking)""" + self.client.loop_forever() + + def stop(self): + """Stop the MQTT client""" + self.client.disconnect() \ No newline at end of file diff --git a/services/mqtt_ingestion/redis_writer.py b/services/mqtt_ingestion/redis_writer.py new file mode 100644 index 0000000..1a58486 --- /dev/null +++ b/services/mqtt_ingestion/redis_writer.py @@ -0,0 +1,37 @@ +import redis +import logging +from datetime import datetime +from typing import Optional +from config import Payload + +class RedisWriter: + def __init__(self, host: str, port: int, db: int, password: Optional[str] = None): + self.logger = logging.getLogger(__name__) + self.redis_client = redis.StrictRedis(host=host, port=port, db=db, password=password) + try: + self.redis_client.ping() + self.logger.info("Connected to Redis server successfully.") + except redis.ConnectionError as e: + self.logger.error(f"Failed to connect to Redis server: {e}") + raise + + def write_message(self, topic: str, payload: Payload): + """ + Write a message to a Redis stream with the topic and payload. + - Stream: mqtt_stream: {device_id}:{sensor_type} + """ + device_id = payload.device_id + sensor_type = payload.sensor_type + timestamp = datetime.utcnow().isoformat() + stream_key= f"mqtt_stream:{device_id}:{sensor_type}" + stream_data = { + "value": str(payload), + "source": "mqtt", + "timestamp": timestamp + } + try: + message_id = self.redis_client.xadd(stream_key, stream_data,maxlen=1000) + self.logger.info(f"Message written to Redis: {stream_data}") + return message_id + except redis.RedisError as e: + self.logger.error(f"Failed to write message to Redis: {e}") \ No newline at end of file diff --git a/services/mqtt_ingestion/requirements.txt b/services/mqtt_ingestion/requirements.txt new file mode 100644 index 0000000..d58eb8d --- /dev/null +++ b/services/mqtt_ingestion/requirements.txt @@ -0,0 +1,2 @@ +redis +paho-mqtt