diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 3fc0b64..b692b6a 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -1,66 +1,114 @@ The following concise instructions help AI coding agents become productive in this repository. Purpose -- This repo is a small Django-based IoT dashboard that ingests sensor data via MQTT, stores metadata in Django models, temporarily queues messages in Redis (streams/hashes), and persistently stores timeseries in Postgres/Timescale via background tasks (Huey). +- This repo is a microservices-based IoT platform for device management, data ingestion, and telemetry storage. The system uses MQTT with mTLS authentication, Redis streams for message queuing, and PostgreSQL/TimescaleDB for persistent storage. Big Picture +- Architecture: Device → MQTT (mTLS) → mqtt_ingestion → Redis → db_write → PostgreSQL/TimescaleDB - Components: - - `iotDashboard/` — Django app (models, views, templates, tasks) - - `manage.py` — Django CLI - - `mqtt_service.py` — standalone MQTT client that subscribes to device topics and writes to Redis - - `tasks.py` — Huey periodic tasks that read Redis and write to Postgres - - Redis — used for device metadata (`mqtt_devices`), per-sensor streams and latest-value hashes - - Postgres/Timescale — final storage for `sensor_readings` table (raw SQL used in places) + - `services/device_manager/` — FastAPI service for device registration, X.509 certificate issuance, and lifecycle management + - `services/mqtt_ingestion/` — MQTT client that subscribes to device topics and writes to single Redis stream `mqtt:ingestion` + - `services/db_write/` — Consumer service that reads from Redis streams and writes to database using consumer groups + - `db_migrations/` — Alembic migrations for schema management (SQLAlchemy models) + - `infrastructure/` — Docker Compose setup (PostgreSQL, Redis, Mosquitto MQTT broker) + - `iotDashboard/` — Legacy Django app (being phased out) Key Files To Read First -- `iotDashboard/settings.py` — central settings; environment variables expected: `SECRET_KEY`, `CONNECTION_STRING`, `MQTT_BROKER`, `MQTT_USER`, `MQTT_PASS`, `REDIS_HOST`. -- `iotDashboard/models.py` — `Device`, `Sensor`, `SensorType`; these shape how devices and sensors are represented. -- `mqtt_service.py` — where MQTT messages are received and written to Redis. Important for stream naming and payload format. -- `iotDashboard/tasks.py` — Huey tasks that consume Redis and insert into the DB. Shows ingestion logic and timescale interactions. -- `iotDashboard/views.py` and `templates/chart.html` — how the UI reads `mqtt_latest`/Timescale data and what format it expects. +- `db_migrations/models.py` — SQLAlchemy models: `Device`, `DeviceCertificate`, `Telemetry`. Canonical schema definition. +- `services/device_manager/app/app.py` — FastAPI endpoints for device registration, certificate management, revocation, renewal. +- `services/device_manager/app/cert_manager.py` — X.509 certificate generation, CA management, CRL generation. +- `services/mqtt_ingestion/src/mqtt_client.py` — MQTT subscriber that parses `devices/{device_id}/{metric}` topics. +- `services/mqtt_ingestion/src/redis_writer.py` — Writes to single stream `mqtt:ingestion` with device_id, metric, value, timestamp. +- `services/db_write/src/redis_reader.py` — Consumer group reader for `mqtt:ingestion` stream. +- `services/db_write/src/db_writer.py` — Batch writes to `telemetry` table using SQLAlchemy. +- `infrastructure/compose.yml` — Docker services: PostgreSQL/TimescaleDB, Redis, Mosquitto MQTT. +- `infrastructure/mosquitto/mosquitto.conf` — MQTT broker config with mTLS on port 8883, CRL checking enabled. Important Conventions & Patterns -- Redis usage: repo stores device metadata under `mqtt_devices` (JSON), and the code uses Redis streams and hashes inconsistently. When changing stream behavior, update both `mqtt_service.py` and `tasks.py` to remain compatible. -- Topic/Stream canonicalization: adopt a single convention: MQTT topic `devices/{device_id}/{sensor}` and Redis stream `mqtt_stream:{device_id}:{sensor}`. Latest-value hash pattern: `mqtt_latest:{device_id}`. -- No `requirements.txt` in repo; use `python-dotenv` + `redis`, `paho-mqtt`, `huey`, `psycopg2-binary`, `requests`, `Django` (4.2) — add a `requirements.txt` before running. -- Avoid import-time side-effects: `tasks.py` currently opens Redis and calls `devices_to_redis()` at import time — refactor to lazy init or a management command. +- **Single stream architecture**: All MQTT data flows through one Redis stream `mqtt:ingestion`. Each message contains `device_id`, `metric`, `value`, `timestamp`. +- **MQTT topics**: Standard format `devices/{device_id}/{metric}`. Examples: `devices/abc123/temperature`, `devices/xyz789/humidity`. +- **Certificate IDs**: Use certificate serial number (hex format) as primary key in `device_certificates` table. Multiple certificates per device supported. +- **Package manager**: All services use `uv` for dependency management (`pyproject.toml` not `requirements.txt`). +- **Database migrations**: Use Alembic for schema changes. Run migrations from `db_migrations/` directory. +- **Configuration**: All services use `.env` files. Never hardcode hosts/credentials. +- **Import organization**: Services have `app/` or `src/` package structure. Import as `from app.module import ...` or `from src.module import ...`. +- **Consumer groups**: `db_write` uses Redis consumer groups for at-least-once delivery. Consumer name must be unique per instance. Developer Workflows (commands & notes) -- Run Django dev server (use virtualenv and install deps): - - `pip install -r requirements.txt` (create this file if missing) - - `python manage.py migrate` - - `python manage.py runserver` -- Run MQTT service locally (requires Redis & MQTT broker): - - `python mqtt_service.py` - - Example publish: `mosquitto_pub -t "devices/esp32/test_temperature" -m "23.5"` -- Huey tasks: - - The project uses `huey.contrib.djhuey`; run workers with Django settings: `python manage.py run_huey` (ensure huey is installed and configured in `HUEY` setting). -- Inspect Redis during debugging: - - `redis-cli KEYS "mqtt*"` - - `redis-cli XREVRANGE mqtt_stream:mydevice:temperature + - COUNT 10` - - `redis-cli HGETALL mqtt_latest:mydevice` +- **Start infrastructure**: `cd infrastructure && docker compose up -d` (Postgres, Redis, Mosquitto) +- **Run database migrations**: `cd db_migrations && uv run alembic upgrade head` +- **Generate CA certificate**: `cd services/device_manager && ./generate_ca.sh` (first time only) +- **Run device_manager**: `cd services/device_manager && uv run uvicorn app.app:app --reload --port 8000` +- **Run mqtt_ingestion**: `cd services/mqtt_ingestion && uv run main.py` +- **Run db_write**: `cd services/db_write && uv run main.py` +- **Register device**: `curl -X POST http://localhost:8000/devices/register -H "Content-Type: application/json" -d '{"name":"test","location":"lab"}'` +- **Test MQTT with mTLS**: `mosquitto_pub --cafile ca.crt --cert device.crt --key device.key -h localhost -p 8883 -t "devices/abc123/temperature" -m "23.5"` +- **Inspect Redis stream**: `redis-cli XLEN mqtt:ingestion` and `redis-cli XRANGE mqtt:ingestion - + COUNT 10` +- **Check consumer group**: `redis-cli XINFO GROUPS mqtt:ingestion` +- **View CRL**: `openssl crl -in infrastructure/mosquitto/certs/ca.crl -text -noout` Integration Points & Gotchas -- Environment variables: many hosts/credentials are taken from `.env` via `python-dotenv`. If missing, code sometimes falls back to defaults or will raise at runtime. Add `.env` or set env vars in the system. -- DB access: `tasks.py` sometimes uses `psycopg2.connect(settings.CONNECTION_STRING)` while views use Django connections. If you change DB config, update both patterns or consolidate to Django connections. -- Topic parsing: `mqtt_service.py` expects at least 3 topic parts (it reads `topic_parts[2]`) — be defensive when editing. -- Stream payloads: `xadd` must receive simple string fields (no nested dicts). When changing stream layout, update the reader in `tasks.py` accordingly. -- Logging: repo uses `print` widely. Prefer converting prints to Python `logging` for maintainability. +- **Environment variables**: All services load from `.env` files. No defaults - service will fail if required vars missing. Copy `.env.example` first. +- **Certificate paths**: `device_manager` writes CRL to `infrastructure/mosquitto/certs/ca.crl`. Mosquitto must restart after CRL updates. +- **Database schema**: Schema changes require Alembic migration. Never modify tables manually. Use `alembic revision --autogenerate`. +- **MQTT topic parsing**: `mqtt_ingestion` expects exactly `devices/{device_id}/{metric}` (3 parts). Invalid topics are logged and dropped. +- **Redis stream format**: `mqtt:ingestion` messages must have `device_id`, `metric`, `value`, `timestamp` fields (all strings). +- **Consumer groups**: `db_write` creates consumer group `db_writer` automatically. Don't delete it manually. +- **Certificate serial numbers**: Used as primary key in `device_certificates.id`. Extract with `format(cert.serial_number, 'x')`. +- **TimescaleDB hypertables**: `telemetry` table is a hypertable. Don't add constraints that break time partitioning. +- **File permissions**: Mosquitto directories may be owned by UID 1883. Fix with `sudo chown -R $USER:$USER infrastructure/mosquitto/`. What AI agents should do first -- Do not change stream/topic names unless you update both `mqtt_service.py` and `tasks.py`. -- Remove import-time Redis initializations and `exit()` calls; move to lazily-created client getters or management commands. -- Centralize config in `settings.py` and import `from django.conf import settings` in scripts instead of hardcoded IPs. -- When making API/DB changes, prefer to update `views.py` and `tasks.py` together and add short integration tests using `pytest` and a Redis test double (or local docker-compose). +- **Read architecture first**: Check `README.md` for current architecture. System is microservices-based, not Django monolith. +- **Check database schema**: Always start with `db_migrations/models.py` to understand data model. +- **Don't change stream names**: Single stream `mqtt:ingestion` is used by mqtt_ingestion and db_write. Changing breaks both services. +- **Use proper imports**: Services use package structure. Import from `app.*` or `src.*`, not relative imports. +- **Create migrations**: Schema changes require `alembic revision --autogenerate`. Never modify models without migration. +- **Test with real infrastructure**: Use `docker compose up` for integration testing. Unit tests are insufficient for this architecture. +- **Check .env files**: Each service has `.env.example`. Copy and configure before running. Examples (copyable snippets) -- XADD to create canonical stream entry: - - `redis_client.xadd(f"mqtt_stream:{device_id}:{sensor}", {"value": str(sensor_value), "time": datetime.utcnow().isoformat()})` -- Create/read consumer group (ingest): - - `redis_client.xgroup_create(stream, "ingest", id="0", mkstream=True)` - - `entries = redis_client.xreadgroup("ingest", consumer_name, {stream: ">"}, count=10, block=5000)` +- **Write to single stream** (mqtt_ingestion): + ```python + redis_client.xadd("mqtt:ingestion", { + "device_id": device_id, + "metric": sensor_type, + "value": str(value), + "timestamp": datetime.utcnow().isoformat() + }) + ``` + +- **Read from stream with consumer group** (db_write): + ```python + results = redis_client.xreadgroup( + groupname="db_writer", + consumername="worker-01", + streams={"mqtt:ingestion": ">"}, + count=100, + block=5000 + ) + ``` + +- **Extract certificate serial number**: + ```python + from cryptography import x509 + cert = x509.load_pem_x509_certificate(cert_pem) + cert_id = format(cert.serial_number, 'x') + ``` + +- **Query active certificates**: + ```python + device_cert = db.query(DeviceCertificate).filter( + DeviceCertificate.device_id == device_id, + DeviceCertificate.revoked_at.is_(None) + ).first() + ``` If you add or change docs -- Update `README.md` with a simple `docker-compose.yml` recipe for Redis/Postgres/Mosquitto and document environment variables. Update `env.sample` with `REDIS_HOST`, `CONNECTION_STRING`, `MQTT_BROKER`, `MQTT_USER`, `MQTT_PASS`. +- Update `README.md` for architecture changes +- Update `.github/copilot-instructions.md` for development workflow changes +- Update service-specific READMEs (`services/*/README.md`) for API or configuration changes +- Document environment variables in `.env.example` files +- Add migration notes to Alembic revision if schema change is complex If anything in these instructions looks off or incomplete for your current refactor, tell me what you'd like to focus on and I'll iterate. diff --git a/README.md b/README.md index 45f74f4..144f7f2 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,17 @@ -# iotDashboard -iotDashboard - IOT dashboard with Django, TimescaleDB and Redis +# IoT Dashboard + +Microservices-based IoT platform with device management, mTLS authentication, and time-series data storage. + +**Architecture:** Device → MQTT (mTLS) → mqtt_ingestion → Redis → db_write → PostgreSQL/TimescaleDB + +## Services + +- **device_manager** - Device registration & X.509 certificates (FastAPI) +- **mqtt_ingestion** - MQTT → Redis pipeline +- **db_write** - Redis → PostgreSQL writer +- **infrastructure** - Docker Compose (PostgreSQL, Redis, Mosquitto) + + +## License + +MIT License diff --git a/db_migrations/alembic/versions/20251101_1907_4e405f1129b1_add_protocol_and_connection_config_to_.py b/db_migrations/alembic/versions/20251101_1907_4e405f1129b1_add_protocol_and_connection_config_to_.py new file mode 100644 index 0000000..ef4c1f9 --- /dev/null +++ b/db_migrations/alembic/versions/20251101_1907_4e405f1129b1_add_protocol_and_connection_config_to_.py @@ -0,0 +1,55 @@ +"""add protocol and connection_config to devices + +Revision ID: 4e405f1129b1 +Revises: 4f152b34e800 +Create Date: 2025-11-01 19:07:22.800918+00:00 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '4e405f1129b1' +down_revision: Union[str, Sequence[str], None] = '4f152b34e800' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('device_credentials', + sa.Column('id', sa.Text(), nullable=False), + sa.Column('device_id', sa.Text(), nullable=False), + sa.Column('credential_type', sa.Text(), nullable=False), + sa.Column('credential_hash', sa.Text(), nullable=False), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('expires_at', sa.DateTime(timezone=True), nullable=True), + sa.Column('revoked_at', sa.DateTime(timezone=True), nullable=True), + sa.ForeignKeyConstraint(['device_id'], ['devices.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id') + ) + op.create_index('idx_device_credentials_active', 'device_credentials', ['device_id', 'revoked_at'], unique=False) + op.create_index('idx_device_credentials_device_id', 'device_credentials', ['device_id'], unique=False) + + # Add protocol column as nullable first, set default for existing rows, then make NOT NULL + op.add_column('devices', sa.Column('protocol', sa.Text(), nullable=True)) + op.execute("UPDATE devices SET protocol = 'mqtt' WHERE protocol IS NULL") + op.alter_column('devices', 'protocol', nullable=False) + + op.add_column('devices', sa.Column('connection_config', sa.JSON(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('devices', 'connection_config') + op.drop_column('devices', 'protocol') + op.drop_index('idx_device_credentials_device_id', table_name='device_credentials') + op.drop_index('idx_device_credentials_active', table_name='device_credentials') + op.drop_table('device_credentials') + # ### end Alembic commands ### diff --git a/db_migrations/models.py b/db_migrations/models.py index 973d85d..edf8923 100644 --- a/db_migrations/models.py +++ b/db_migrations/models.py @@ -8,7 +8,7 @@ To modify schema: 4. Run: alembic upgrade head """ -from sqlalchemy import Boolean, Column, Float, ForeignKey, Index, Text, DateTime +from sqlalchemy import Boolean, Column, Float, ForeignKey, Index, Text, DateTime, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.sql import func @@ -23,13 +23,13 @@ class Device(Base): id = Column(Text, primary_key=True) name = Column(Text, nullable=False) location = Column(Text) + protocol = Column(Text, nullable=False, default="mqtt") + connection_config = Column(JSON) is_active = Column(Boolean, default=True) created_at = Column(DateTime(timezone=True), server_default=func.now()) def __repr__(self): - return f"" - - + return f"" class DeviceCertificate(Base): """X.509 certificates issued to devices for mTLS authentication.""" @@ -40,7 +40,7 @@ class DeviceCertificate(Base): Text, ForeignKey("devices.id", ondelete="CASCADE"), nullable=False ) certificate_pem = Column(Text, nullable=False) - private_key_pem = Column(Text) # Optional: for backup/escrow + private_key_pem = Column(Text) issued_at = Column(DateTime(timezone=True), nullable=False) expires_at = Column(DateTime(timezone=True), nullable=False) revoked_at = Column(DateTime(timezone=True)) @@ -54,6 +54,30 @@ class DeviceCertificate(Base): return f"" +class DeviceCredential(Base): + """Authentication credentials for non-mTLS protocols (HTTP, webhook, etc).""" + + __tablename__ = "device_credentials" + + id = Column(Text, primary_key=True) + device_id = Column( + Text, ForeignKey("devices.id", ondelete="CASCADE"), nullable=False + ) + credential_type = Column(Text, nullable=False) + credential_hash = Column(Text, nullable=False) + created_at = Column(DateTime(timezone=True), nullable=False) + expires_at = Column(DateTime(timezone=True)) + revoked_at = Column(DateTime(timezone=True)) + + __table_args__ = ( + Index("idx_device_credentials_device_id", "device_id"), + Index("idx_device_credentials_active", "device_id", "revoked_at"), + ) + + def __repr__(self): + return f"" + + class Telemetry(Base): """ Time-series telemetry data from devices. diff --git a/iotDashboard/device_manager_client.py b/iotDashboard/device_manager_client.py new file mode 100644 index 0000000..4032af1 --- /dev/null +++ b/iotDashboard/device_manager_client.py @@ -0,0 +1,157 @@ +"""API client for the device_manager microservice.""" + +import os +import requests +from typing import Optional, Dict, Any, List +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class DeviceRegistrationResponse: + device_id: str + certificate_id: str + certificate: str + private_key: str + ca_certificate: str + + +@dataclass +class DeviceInfo: + id: str + name: str + location: Optional[str] + is_active: bool + created_at: datetime + certificates: List[Dict[str, Any]] + + +class DeviceManagerAPIError(Exception): + def __init__(self, status_code: int, message: str, details: Optional[Dict] = None): + self.status_code = status_code + self.message = message + self.details = details or {} + super().__init__(f"API Error {status_code}: {message}") + + +class DeviceManagerClient: + def __init__(self, base_url: Optional[str] = None): + self.base_url = base_url or os.getenv("DEVICE_MANAGER_URL", "http://localhost:8000") + self.session = requests.Session() + self.session.headers.update({"Content-Type": "application/json"}) + + def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response: + url = f"{self.base_url}{endpoint}" + try: + response = self.session.request(method, url, **kwargs) + response.raise_for_status() + return response + except requests.exceptions.HTTPError as e: + try: + error_data = e.response.json() + message = error_data.get("detail", str(e)) + except (ValueError, AttributeError): + message = str(e) + + raise DeviceManagerAPIError( + status_code=e.response.status_code, + message=message, + details=error_data if "error_data" in locals() else {}, + ) + except requests.exceptions.RequestException as e: + raise DeviceManagerAPIError( + status_code=0, message=f"Connection error: {str(e)}" + ) + + def register_device(self, name: str, location: Optional[str] = None) -> DeviceRegistrationResponse: + payload = {"name": name} + if location: + payload["location"] = location + + response = self._request("POST", "/devices/register", json=payload) + data = response.json() + + return DeviceRegistrationResponse( + device_id=data["device_id"], + certificate_id=data["certificate_id"], + certificate=data["certificate"], + private_key=data["private_key"], + ca_certificate=data["ca_certificate"], + ) + + def get_device(self, device_id: str) -> DeviceInfo: + response = self._request("GET", f"/devices/{device_id}") + data = response.json() + + return DeviceInfo( + id=data["id"], + name=data["name"], + location=data.get("location"), + is_active=data["is_active"], + created_at=datetime.fromisoformat(data["created_at"].replace("Z", "+00:00")), + certificates=data.get("certificates", []), + ) + + def list_devices(self) -> List[DeviceInfo]: + response = self._request("GET", "/devices/") + data = response.json() + + return [ + DeviceInfo( + id=device["id"], + name=device["name"], + location=device.get("location"), + is_active=device["is_active"], + created_at=datetime.fromisoformat( + device["created_at"].replace("Z", "+00:00") + ), + certificates=device.get("certificates", []), + ) + for device in data + ] + + def revoke_certificate(self, device_id: str) -> Dict[str, Any]: + response = self._request("POST", f"/devices/{device_id}/revoke") + return response.json() + + def renew_certificate(self, device_id: str) -> Dict[str, Any]: + response = self._request("POST", f"/devices/{device_id}/renew") + return response.json() + + def get_ca_certificate(self) -> str: + response = self._request("GET", "/ca_certificate") + return response.text + + def get_crl(self) -> str: + response = self._request("GET", "/crl") + return response.text + + def health_check(self) -> bool: + try: + response = self.session.get(f"{self.base_url}/docs", timeout=2) + return response.status_code == 200 + except requests.exceptions.RequestException: + return False + + +default_client = DeviceManagerClient() + + +def register_device(name: str, location: Optional[str] = None) -> DeviceRegistrationResponse: + return default_client.register_device(name, location) + + +def get_device(device_id: str) -> DeviceInfo: + return default_client.get_device(device_id) + + +def list_devices() -> List[DeviceInfo]: + return default_client.list_devices() + + +def revoke_certificate(device_id: str) -> Dict[str, Any]: + return default_client.revoke_certificate(device_id) + + +def renew_certificate(device_id: str) -> Dict[str, Any]: + return default_client.renew_certificate(device_id) diff --git a/iotDashboard/models.py b/iotDashboard/models.py index a6cb43d..16dd272 100644 --- a/iotDashboard/models.py +++ b/iotDashboard/models.py @@ -1,42 +1,175 @@ +""" +Django models that mirror the SQLAlchemy schema from db_migrations/models.py. + +These models are read-only (managed=False) and query the microservices database. +For write operations, use the device_manager API client instead. +""" + from django.db import models -class SensorType(models.Model): - name = models.CharField( - max_length=50, unique=True - ) # Sensor name, e.g., "CO2", "Noise", etc. - unit = models.CharField( - max_length=20 - ) # Unit of measurement, e.g., "ppm", "dB", "lux" - protocol = models.CharField( - max_length=20, choices=[("mqtt", "MQTT"), ("http", "HTTP")] - ) # Protocol for communication - topic = models.CharField( - max_length=100, null=True, blank=True - ) # Topic for MQTT communication - endpoint = models.CharField( - max_length=100, null=True, blank=True - ) # Endpoint for HTTP communication - - def __str__(self): - return f"{self.name} ({self.unit})" - - class Device(models.Model): - name = models.CharField(max_length=50) # Device name - ip = models.CharField(max_length=20) # Device IP address - protocol = models.CharField( - max_length=20, choices=[("mqtt", "MQTT"), ("http", "HTTP")] + """IoT devices registered in the system.""" + + id = models.CharField(max_length=8, primary_key=True) + name = models.CharField(max_length=255) + location = models.CharField(max_length=255, null=True, blank=True) + protocol = models.CharField(max_length=50, default="mqtt") + connection_config = models.JSONField(null=True, blank=True) + is_active = models.BooleanField(default=True) + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + managed = False + db_table = "devices" + + def __str__(self): + return f"{self.name} ({self.id}) [{self.protocol}]" + + @property + def active_certificate(self): + """Get the active (non-revoked) certificate for this device.""" + return self.certificates.filter(revoked_at__isnull=True).first() + + @property + def certificate_status(self): + """Get human-readable certificate status for MQTT devices.""" + if self.protocol != "mqtt": + return "N/A" + cert = self.active_certificate + if not cert: + return "No Certificate" + if cert.is_expired: + return "Expired" + if cert.is_expiring_soon: + return "Expiring Soon" + return "Valid" + + +class DeviceCertificate(models.Model): + """X.509 certificates issued to devices for mTLS authentication.""" + + id = models.CharField( + max_length=255, primary_key=True + ) # Certificate serial number (hex) + device = models.ForeignKey( + Device, on_delete=models.CASCADE, related_name="certificates", db_column="device_id" ) + certificate_pem = models.TextField() + private_key_pem = models.TextField(null=True, blank=True) # Optional backup + issued_at = models.DateTimeField() + expires_at = models.DateTimeField() + revoked_at = models.DateTimeField(null=True, blank=True) + + class Meta: + managed = False # Don't create/modify this table + db_table = "device_certificates" + indexes = [ + models.Index(fields=["device"]), + models.Index(fields=["device", "revoked_at"]), + ] def __str__(self): - return self.name + status = "Revoked" if self.revoked_at else "Active" + return f"Certificate {self.id[:8]}... for {self.device.name} ({status})" + + @property + def is_revoked(self): + """Check if certificate is revoked.""" + return self.revoked_at is not None + + @property + def is_expired(self): + """Check if certificate is expired.""" + from django.utils import timezone + + return timezone.now() > self.expires_at + + @property + def is_expiring_soon(self): + """Check if certificate expires within 30 days.""" + from django.utils import timezone + from datetime import timedelta + + return ( + not self.is_expired + and self.expires_at < timezone.now() + timedelta(days=30) + ) + + @property + def is_valid(self): + """Check if certificate is valid (not revoked and not expired).""" + return not self.is_revoked and not self.is_expired + + @property + def days_until_expiry(self): + """Calculate days until certificate expires.""" + from django.utils import timezone + + if self.is_expired: + return 0 + delta = self.expires_at - timezone.now() + return delta.days -class Sensor(models.Model): - device = models.ForeignKey(Device, related_name="sensors", on_delete=models.CASCADE) - type = models.ForeignKey(SensorType, on_delete=models.CASCADE) - enabled = models.BooleanField(default=True) +class DeviceCredential(models.Model): + """Authentication credentials for non-mTLS protocols (HTTP, webhook, etc).""" + + id = models.CharField(max_length=255, primary_key=True) + device = models.ForeignKey( + Device, on_delete=models.CASCADE, related_name="credentials", db_column="device_id" + ) + credential_type = models.CharField(max_length=50) + credential_hash = models.TextField() + created_at = models.DateTimeField() + expires_at = models.DateTimeField(null=True, blank=True) + revoked_at = models.DateTimeField(null=True, blank=True) + + class Meta: + managed = False + db_table = "device_credentials" + indexes = [ + models.Index(fields=["device"]), + models.Index(fields=["device", "revoked_at"]), + ] def __str__(self): - return f"{self.type.name} Sensor on {self.device.name}" + status = "Revoked" if self.revoked_at else "Active" + return f"{self.credential_type} for {self.device.name} ({status})" + + @property + def is_revoked(self): + return self.revoked_at is not None + + @property + def is_expired(self): + from django.utils import timezone + return self.expires_at and timezone.now() > self.expires_at + + @property + def is_valid(self): + return not self.is_revoked and not self.is_expired + + +class Telemetry(models.Model): + """Time-series telemetry data from devices.""" + + time = models.DateTimeField() + device = models.ForeignKey( + Device, on_delete=models.CASCADE, related_name="telemetry", db_column="device_id" + ) + metric = models.CharField(max_length=255) + value = models.FloatField() + unit = models.CharField(max_length=50, null=True, blank=True) + + class Meta: + managed = False + db_table = "telemetry" + unique_together = [["time", "device", "metric"]] + indexes = [ + models.Index(fields=["device", "time"]), + ] + + def __str__(self): + return f"{self.device.name} - {self.metric}: {self.value} at {self.time}" + diff --git a/iotDashboard/settings.py b/iotDashboard/settings.py index 06afa78..c863994 100644 --- a/iotDashboard/settings.py +++ b/iotDashboard/settings.py @@ -13,7 +13,6 @@ https://docs.djangoproject.com/en/4.2/ref/settings/ from dotenv import load_dotenv from pathlib import Path import os -# from huey import SqliteHuey # Build paths inside the project like this: BASE_DIR / 'subdir'. @@ -27,7 +26,12 @@ load_dotenv() # SECURITY WARNING: keep the secret key used in production secret! SECRET_KEY = os.getenv("SECRET_KEY") -CONNECTION_STRING = os.getenv("CONNECTION_STRING") + +POSTGRES_HOST = os.getenv("POSTGRES_HOST") +POSTGRES_PORT = os.getenv("POSTGRES_PORT") +POSTGRES_USER = os.getenv("POSTGRES_USER") +POSTGRES_DB = os.getenv("POSTGRES_DB") +POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD") # SECURITY WARNING: don't run with debug turned on in production! DEBUG = True @@ -45,7 +49,6 @@ INSTALLED_APPS = [ "django.contrib.messages", "django.contrib.staticfiles", "iotDashboard", - # 'huey.contrib.djhuey', ] MIDDLEWARE = [ @@ -84,17 +87,14 @@ WSGI_APPLICATION = "iotDashboard.wsgi.application" DATABASES = { "default": { - "ENGINE": "django.db.backends.sqlite3", - "NAME": BASE_DIR / "db.sqlite3", - }, - "data": { - "ENGINE": "django.db.backends.postgresql", - "NAME": "example", - "USER": "postgres", - "PASSWORD": os.getenv("PASSWORD"), - "HOST": "10.10.0.1", - "PORT": "5555", + "ENGINE": "django.db.backends.postgresql", + "NAME": POSTGRES_DB, + "USER": POSTGRES_USER, + "PASSWORD": POSTGRES_PASSWORD, + "HOST": POSTGRES_HOST, + "PORT": POSTGRES_PORT, }, + } @@ -139,12 +139,3 @@ STATIC_URL = "static/" DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField" - -# HUEY = { -# 'huey_class': 'huey.SqliteHuey', # Or 'huey.RedisHuey' for Redis -# 'filename': 'demo.db', # SQLite file for task storage -# 'results': True, -# 'store_none': False, -# 'immediate': False, -# 'utc': True, -# } diff --git a/iotDashboard/tasks.py b/iotDashboard/tasks.py deleted file mode 100644 index a21b6ad..0000000 --- a/iotDashboard/tasks.py +++ /dev/null @@ -1,180 +0,0 @@ -import json -import datetime -import os -import requests -import psycopg2 -import redis -from django.conf import settings -from huey import crontab -from huey.contrib.djhuey import periodic_task -from .models import Device -from dotenv import load_dotenv - -load_dotenv() - -REDIS_HOST = os.getenv("REDIS_HOST", "localhost") # Default to localhost if not set -try: - redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0) - print(redis_client) - redis_client.ping() - print("Connected!") -except Exception as ex: - print - "Error:", ex - exit("Failed to connect, terminating.") - - -def devices_to_redis(): - """Fetch devices and their sensors' topics from Django and store them in Redis.""" - devices = Device.objects.all() - devices_list = [] - for device in devices: - for sensor in device.sensors.all(): - sensor_data = { - "device_name": device.name, - "sensor_name": sensor.type.name, - "topic": sensor.type.topic, # Assuming the topic is stored in SensorType - } - devices_list.append(sensor_data) - redis_client.set("mqtt_devices", json.dumps(devices_list)) - print("Devices with sensors stored in Redis.") - - -def fetch_data_http(device, sensor): - """Fetch data from an HTTP sensor.""" - sensor_type_name = sensor.type.name.lower() - try: - response = requests.get( - f"http://{device.ip}/sensor/{sensor_type_name}", timeout=5 - ) - response.raise_for_status() - sensor_value = response.json().get("value") - if sensor_value is not None: - return { - "time": datetime.datetime.utcnow().isoformat(), - "device": device.name, - "sensor": sensor_type_name, - "sensor_value": sensor_value, - } - else: - print(f"No value returned from {device.name} for {sensor_type_name}") - except requests.RequestException as e: - print(f"HTTP request failed for {device.name}: {e}") - return None - - -def fetch_data_mqtt_stream(device, sensor): - """Fetch data from Redis Stream for a specific MQTT device and sensor.""" - sensor_name = sensor.type.name.lower() - stream_key = f"mqtt_stream:{device.name}:{sensor_name}" - try: - stream_data = redis_client.xread({stream_key: "0-0"}, block=1000, count=1) - if stream_data: - _, entries = stream_data[0] - for entry_id, entry_data in entries: - sensor_value = entry_data.get(b"value") - timestamp = entry_data.get(b"time") - - if sensor_value and timestamp: - return { - "time": timestamp.decode("utf-8"), - "device": device.name, - "sensor_value": float(sensor_value.decode("utf-8")), - } - except Exception as e: - print(f"Error fetching data from stream {stream_key}: {e}") - return None - - -def is_recent_data(timestamp): - """Check if data is within a 1-minute freshness window.""" - data_time = datetime.datetime.fromisoformat(timestamp) - return data_time > datetime.datetime.utcnow() - datetime.timedelta(minutes=1) - - -def insert_data(data, sensor_type): - """Insert parsed data into the PostgreSQL database.""" - if "sensor_value" not in data: - print(f"Missing 'sensor_value' in data: {data}. Skipping insertion.") - return - - insert_data_dict = { - "time": data["time"], - "device": data["device"], - "metric": sensor_type.lower(), - "value": data["sensor_value"], - } - - try: - with psycopg2.connect(settings.CONNECTION_STRING) as conn: - with conn.cursor() as cursor: - insert_query = """ - INSERT INTO sensor_readings (time, device_name, metric, value) - VALUES (%s, %s, %s, %s); - """ - cursor.execute( - insert_query, - ( - insert_data_dict["time"], - insert_data_dict["device"], - insert_data_dict["metric"], - insert_data_dict["value"], - ), - ) - conn.commit() - print( - f"Data inserted successfully for {insert_data_dict['device']}: {insert_data_dict}" - ) - except Exception as e: - print(f"Failed to insert data: {e}") - - -@periodic_task(crontab(minute="*/1")) -def fetch_data_from_all_devices(): - """Fetch and insert data for all devices based on their protocol.""" - devices = Device.objects.all() - for device in devices: - for sensor in device.sensors.all(): - data = None - - if device.protocol == "http": - data = fetch_data_http(device, sensor) - elif device.protocol == "mqtt": - data = fetch_data_mqtt_stream(device, sensor) - - if data and is_recent_data(data["time"]): - insert_data(data, sensor.type.name) - else: - print(f"No recent or valid data for {device.name}. Skipping.") - - -@periodic_task(crontab(minute="*/5")) -def last_5_minutes(): - """Fetch the last 5 readings from TimescaleDB and store them in Redis.""" - try: - with psycopg2.connect(settings.CONNECTION_STRING) as conn: - with conn.cursor() as cursor: - cursor.execute(""" - SELECT time, device_name, metric, value - FROM sensor_readings - ORDER BY time DESC - LIMIT 5; - """) - results = cursor.fetchall() - - data = [ - { - "time": reading[0].isoformat(), - "device": reading[1], - "metric": reading[2], - "value": reading[3], - } - for reading in results - ] - redis_client.set("last5", json.dumps(data)) - print("Last 5 readings:", data) - except Exception as e: - print(f"Error fetching or storing the last 5 readings: {e}") - - -devices_to_redis() diff --git a/services/db_write/Dockerfile b/services/db_write/Dockerfile new file mode 100644 index 0000000..9942af5 --- /dev/null +++ b/services/db_write/Dockerfile @@ -0,0 +1,32 @@ +FROM ghcr.io/astral-sh/uv:python3.13-alpine AS builder + +WORKDIR /app + +ENV UV_COMPILE_BYTECODE=1 + +COPY pyproject.toml uv.lock ./ + +RUN uv sync --frozen --no-dev --no-install-project + +COPY ./src/ ./src/ +COPY main.py ./ + +RUN uv sync --frozen --no-dev + + +FROM python:3.13-alpine + +WORKDIR /app + +COPY --from=builder /app/.venv /app/.venv + +COPY --from=builder /app/*.py /app/ + +RUN adduser -D -u 1000 appuser && \ + chown -R appuser:appuser /app + +USER appuser + +ENV PATH="/app/.venv/bin:$PATH" + +CMD ["python", "main.py"] \ No newline at end of file diff --git a/services/device_manager/app/app.py b/services/device_manager/app/app.py index ca2782b..978684e 100644 --- a/services/device_manager/app/app.py +++ b/services/device_manager/app/app.py @@ -5,8 +5,12 @@ from fastapi import FastAPI, HTTPException from app.cert_manager import CertificateManager from app.database import get_db_context -from app.db_models import Device, DeviceCertificate # SQLAlchemy ORM models -from app.models import DeviceRegistrationRequest, DeviceRegistrationResponse, DeviceResponse +from app.db_models import Device, DeviceCertificate +from app.models import ( + DeviceRegistrationRequest, + DeviceRegistrationResponse, + DeviceResponse, +) logger = logging.getLogger(__name__) @@ -25,43 +29,62 @@ async def register_device( request: DeviceRegistrationRequest, ) -> DeviceRegistrationResponse: """ - Register a new device and issue an X.509 certificate. + Register a new device. + - MQTT devices: issues X.509 certificate for mTLS + - HTTP/webhook devices: generates API key or HMAC secret """ try: - response = cert_manager.register_device( - name=request.name, - location=request.location, - ) - - with get_db_context() as db: - device = Device( - id=response.device_id, + if request.protocol == "mqtt": + cert_response = cert_manager.register_device( name=request.name, location=request.location, - created_at=datetime.datetime.now(datetime.UTC), ) - db.add(device) - device_cert = DeviceCertificate( - id =response.certificate_id, - device_id=response.device_id, - certificate_pem=response.certificate_pem, - private_key_pem=response.private_key_pem, - issued_at=datetime.datetime.now(datetime.UTC), - expires_at=response.expires_at, + with get_db_context() as db: + device = Device( + id=cert_response.device_id, + name=request.name, + location=request.location, + protocol=request.protocol, + connection_config=request.connection_config, + created_at=datetime.datetime.now(datetime.UTC), + ) + db.add(device) + + device_cert = DeviceCertificate( + id=cert_response.certificate_id, + device_id=cert_response.device_id, + certificate_pem=cert_response.certificate_pem, + private_key_pem=cert_response.private_key_pem, + issued_at=datetime.datetime.now(datetime.UTC), + expires_at=cert_response.expires_at, + ) + db.add(device_cert) + + return DeviceRegistrationResponse( + device_id=cert_response.device_id, + protocol=request.protocol, + certificate_id=cert_response.certificate_id, + ca_certificate_pem=cert_response.ca_certificate_pem, + certificate_pem=cert_response.certificate_pem, + private_key_pem=cert_response.private_key_pem, + expires_at=cert_response.expires_at, ) - db.add(device_cert) + else: + raise HTTPException( + status_code=400, + detail=f"Protocol '{request.protocol}' not yet implemented. Only 'mqtt' is supported.", + ) + + except HTTPException: + raise except Exception as e: - logger.error( - f"Failed to register device {request.name}: {str(e)}", exc_info=True - ) + logger.error(f"Failed to register device {request.name}: {str(e)}", exc_info=True) raise HTTPException( status_code=500, detail="Failed to register device. Please try again." ) from e - return response - @app.get("/ca_certificate") async def get_ca_certificate() -> str: @@ -73,9 +96,8 @@ async def get_ca_certificate() -> str: return ca_cert_pem except Exception as e: logger.error(f"Failed to retrieve CA certificate: {str(e)}", exc_info=True) - raise HTTPException( - status_code=500, detail="Failed to retrieve CA certificate." - ) from e + raise HTTPException(status_code=500, detail="Failed to retrieve CA certificate.") from e + @app.get("/devices/{device_id}") async def get_device(device_id: str) -> DeviceResponse: @@ -88,18 +110,19 @@ async def get_device(device_id: str) -> DeviceResponse: if not device: raise HTTPException(status_code=404, detail="Device not found") - return Device( + return DeviceResponse( id=device.id, name=device.name, location=device.location, + protocol=device.protocol, + connection_config=device.connection_config, created_at=device.created_at, ) except Exception as e: logger.error(f"Failed to retrieve device {device_id}: {str(e)}", exc_info=True) - raise HTTPException( - status_code=500, detail="Failed to retrieve device information." - ) from e + raise HTTPException(status_code=500, detail="Failed to retrieve device information.") from e + @app.get("/devices/") async def list_devices() -> list[DeviceResponse]: @@ -114,6 +137,8 @@ async def list_devices() -> list[DeviceResponse]: id=device.id, name=device.name, location=device.location, + protocol=device.protocol, + connection_config=device.connection_config, created_at=device.created_at, ) for device in devices @@ -121,9 +146,8 @@ async def list_devices() -> list[DeviceResponse]: except Exception as e: logger.error(f"Failed to list devices: {str(e)}", exc_info=True) - raise HTTPException( - status_code=500, detail="Failed to list devices." - ) from e + raise HTTPException(status_code=500, detail="Failed to list devices.") from e + @app.post("/devices/{device_id}/revoke") async def revoke_device_certificate(device_id: str): @@ -135,9 +159,7 @@ async def revoke_device_certificate(device_id: str): try: with get_db_context() as db: device_cert = ( - db.query(DeviceCertificate) - .filter(DeviceCertificate.device_id == device_id) - .first() + db.query(DeviceCertificate).filter(DeviceCertificate.device_id == device_id).first() ) if not device_cert: raise HTTPException(status_code=404, detail="Device certificate not found") @@ -155,16 +177,14 @@ async def revoke_device_certificate(device_id: str): return { "device_id": device_id, "revoked_at": device_cert.revoked_at.isoformat(), - "message": "Certificate revoked successfully" + "message": "Certificate revoked successfully", } except HTTPException: raise except Exception as e: logger.error(f"Failed to revoke device {device_id}: {str(e)}", exc_info=True) - raise HTTPException( - status_code=500, detail="Failed to revoke device certificate." - ) from e + raise HTTPException(status_code=500, detail="Failed to revoke device certificate.") from e @app.get("/crl") @@ -180,15 +200,14 @@ async def get_crl(): return {"crl_pem": crl_pem} except Exception as e: logger.error(f"Failed to retrieve CRL: {str(e)}", exc_info=True) - raise HTTPException( - status_code=500, detail="Failed to retrieve CRL." - ) from e + raise HTTPException(status_code=500, detail="Failed to retrieve CRL.") from e + @app.post("/devices/{device_id}/renew") async def renew_certificate(device_id: str): """ Renew a device's certificate by issuing a new one and revoking the old one. - + This endpoint: 1. Retrieves the current certificate from DB 2. Generates a new certificate with new keys @@ -209,8 +228,7 @@ async def renew_certificate(device_id: str): ) if not device_cert: raise HTTPException( - status_code=404, - detail="No active certificate found for device" + status_code=404, detail="No active certificate found for device" ) # Check if certificate is about to expire (optional warning) @@ -227,15 +245,14 @@ async def renew_certificate(device_id: str): # Generate new certificate with new keys new_cert_pem, new_key_pem = cert_manager.renew_certificate( - current_cert_pem=device_cert.certificate_pem, - validity_days=365, - key_size=4096 + current_cert_pem=device_cert.certificate_pem, validity_days=365, key_size=4096 ) # Extract certificate ID (serial number) from the new certificate from cryptography import x509 + new_cert = x509.load_pem_x509_certificate(new_cert_pem) - new_cert_id = format(new_cert.serial_number, 'x') + new_cert_id = format(new_cert.serial_number, "x") # Create new certificate record in DB now = datetime.datetime.now(datetime.UTC) @@ -252,9 +269,12 @@ async def renew_certificate(device_id: str): logger.info(f"Successfully renewed certificate for device {device_id}") + device = db.query(Device).filter(Device.id == device_id).first() + return DeviceRegistrationResponse( - certificate_id=new_cert_id, device_id=device_id, + protocol=device.protocol if device else "mqtt", + certificate_id=new_cert_id, ca_certificate_pem=cert_manager.get_ca_certificate_pem(), certificate_pem=new_device_cert.certificate_pem, private_key_pem=new_device_cert.private_key_pem, @@ -265,6 +285,4 @@ async def renew_certificate(device_id: str): raise except Exception as e: logger.error(f"Failed to renew certificate for device {device_id}: {str(e)}", exc_info=True) - raise HTTPException( - status_code=500, detail="Failed to renew device certificate." - ) from e + raise HTTPException(status_code=500, detail="Failed to renew device certificate.") from e diff --git a/services/device_manager/app/cert_manager.py b/services/device_manager/app/cert_manager.py index 9ef37d3..fc6d4bd 100644 --- a/services/device_manager/app/cert_manager.py +++ b/services/device_manager/app/cert_manager.py @@ -8,7 +8,7 @@ from cryptography.x509.oid import NameOID from nanoid import generate from app.config import config -from app.models import DeviceRegistrationResponse +from app.models import DeviceCertificateResponse, DeviceCredentials lowercase_numbers = "abcdefghijklmnopqrstuvwxyz0123456789" @@ -21,10 +21,12 @@ class CertificateManager: self.ca_key: rsa.RSAPrivateKey = self.load_ca_private_key(config.CA_KEY_PATH) self.ca_cert_pem: bytes = self.ca_cert.public_bytes(serialization.Encoding.PEM) + def generate_device_id(self) -> str: """Generate a unique device ID using nanoid.""" return generate(alphabet=lowercase_numbers, size=config.DEVICE_ID_LENGTH) + def load_ca_certificate(self, ca_cert_path: str) -> x509.Certificate: """Load a CA certificate from file.""" with open(ca_cert_path, "rb") as f: @@ -32,6 +34,7 @@ class CertificateManager: ca_cert = x509.load_pem_x509_certificate(ca_data) return ca_cert + def load_ca_private_key(self, ca_key_path: str, password: bytes = None) -> rsa.RSAPrivateKey: """Load a CA private key from file.""" from cryptography.hazmat.primitives import serialization @@ -41,10 +44,12 @@ class CertificateManager: ca_key = serialization.load_pem_private_key(key_data, password=password) return ca_key + def generate_device_key(self, key_size: int = 4096) -> rsa.RSAPrivateKey: """Generate an RSA private key for a device.""" return rsa.generate_private_key(public_exponent=65537, key_size=key_size) + def generate_device_certificate( self, device_id: str, @@ -88,12 +93,13 @@ class CertificateManager: return cert_pem, key_pem + def create_device_credentials( self, device_id: str, validity_days: int = 365, key_size: int = 4096 - ) -> dict: + ) -> DeviceCredentials: """Create device credentials: private key and signed certificate. Returns: - dict with certificate_id, device_id, certificate_pem, private_key_pem, ca_certificate_pem, expires_at + DeviceCredentials model with certificate_id, device_id, certificate_pem, private_key_pem, expires_at """ device_key = self.generate_device_key(key_size=key_size) @@ -108,40 +114,42 @@ class CertificateManager: # Extract serial number from certificate to use as ID cert = x509.load_pem_x509_certificate(cert_pem) - cert_id = format(cert.serial_number, 'x') # Hex string of serial number + cert_id = format(cert.serial_number, 'x') expires_at = datetime.datetime.now(datetime.UTC) + datetime.timedelta(days=validity_days) - return { - "certificate_id": cert_id, - "device_id": device_id, - "certificate_pem": cert_pem, - "private_key_pem": key_pem, - "ca_certificate_pem": self.ca_cert_pem, - "expires_at": expires_at, - } + return DeviceCredentials( + certificate_id=cert_id, + device_id=device_id, + certificate_pem=cert_pem, + private_key_pem=key_pem, + expires_at=expires_at, + ) - def register_device(self, name: str, location: str | None = None) -> DeviceRegistrationResponse: + + def register_device(self, name: str, location: str | None = None) -> DeviceCertificateResponse: """Register a new device and generate its credentials. Returns: - DeviceRegistrationResponse + DeviceCertificateResponse """ device_id = self.generate_device_id() credentials = self.create_device_credentials(device_id=device_id) - return DeviceRegistrationResponse( - certificate_id=credentials["certificate_id"], - device_id=credentials["device_id"], - ca_certificate_pem=credentials["ca_certificate_pem"].decode("utf-8"), - certificate_pem=credentials["certificate_pem"].decode("utf-8"), - private_key_pem=credentials["private_key_pem"].decode("utf-8"), - expires_at=credentials["expires_at"], + return DeviceCertificateResponse( + certificate_id=credentials.certificate_id, + device_id=credentials.device_id, + ca_certificate_pem=self.ca_cert_pem.decode("utf-8"), + certificate_pem=credentials.certificate_pem.decode("utf-8"), + private_key_pem=credentials.private_key_pem.decode("utf-8"), + expires_at=credentials.expires_at, ) + def get_ca_certificate_pem(self) -> str: """Get the CA certificate in PEM format as a string.""" return self.ca_cert_pem.decode("utf-8") + def revoke_certificate( self, certificate_pem: str, reason: x509.ReasonFlags = x509.ReasonFlags.unspecified ) -> None: @@ -197,6 +205,7 @@ class CertificateManager: with open(crl_path, "wb") as f: f.write(crl.public_bytes(serialization.Encoding.PEM)) + def get_crl_pem(self) -> str | None: """Get the current CRL in PEM format.""" crl_path = Path(config.CRL_PATH) @@ -206,6 +215,7 @@ class CertificateManager: with open(crl_path, "rb") as f: return f.read().decode("utf-8") + def renew_certificate( self, current_cert_pem: str, diff --git a/services/device_manager/app/db_models.py b/services/device_manager/app/db_models.py index e20e2f4..12024cb 100644 --- a/services/device_manager/app/db_models.py +++ b/services/device_manager/app/db_models.py @@ -4,7 +4,7 @@ SQLAlchemy ORM models for device manager service. These models mirror the database schema defined in db_migrations. Kept separate to make the service independent. """ -from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Text +from sqlalchemy import JSON, Boolean, Column, DateTime, ForeignKey, Index, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.sql import func @@ -19,11 +19,13 @@ class Device(Base): id = Column(Text, primary_key=True) name = Column(Text, nullable=False) location = Column(Text) + protocol = Column(Text, nullable=False, default="mqtt") + connection_config = Column(JSON) is_active = Column(Boolean, default=True) created_at = Column(DateTime(timezone=True), server_default=func.now()) def __repr__(self): - return f"" + return f"" class DeviceCertificate(Base): @@ -33,7 +35,7 @@ class DeviceCertificate(Base): id = Column(Text, primary_key=True) device_id = Column( - Text, ForeignKey("devices.id", ondelete="CASCADE"), primary_key=True + Text, ForeignKey("devices.id", ondelete="CASCADE"), nullable=False ) certificate_pem = Column(Text, nullable=False) private_key_pem = Column(Text) @@ -41,5 +43,34 @@ class DeviceCertificate(Base): expires_at = Column(DateTime(timezone=True), nullable=False) revoked_at = Column(DateTime(timezone=True)) + __table_args__ = ( + Index("idx_device_certificates_device_id", "device_id"), + Index("idx_device_certificates_active", "device_id", "revoked_at"), + ) + def __repr__(self): - return f"" + return f"" + + +class DeviceCredential(Base): + """Authentication credentials for non-mTLS protocols (HTTP, webhook, etc).""" + + __tablename__ = "device_credentials" + + id = Column(Text, primary_key=True) + device_id = Column( + Text, ForeignKey("devices.id", ondelete="CASCADE"), nullable=False + ) + credential_type = Column(Text, nullable=False) + credential_hash = Column(Text, nullable=False) + created_at = Column(DateTime(timezone=True), nullable=False) + expires_at = Column(DateTime(timezone=True)) + revoked_at = Column(DateTime(timezone=True)) + + __table_args__ = ( + Index("idx_device_credentials_device_id", "device_id"), + Index("idx_device_credentials_active", "device_id", "revoked_at"), + ) + + def __repr__(self): + return f"" diff --git a/services/device_manager/app/models.py b/services/device_manager/app/models.py index 01d6f83..63b5678 100644 --- a/services/device_manager/app/models.py +++ b/services/device_manager/app/models.py @@ -1,17 +1,38 @@ import datetime +from typing import Any from pydantic import BaseModel class DeviceRegistrationRequest(BaseModel): - """Request model for registering a new device.""" - name: str location: str | None = None + protocol: str = "mqtt" + connection_config: dict[str, Any] | None = None + class DeviceRegistrationResponse(BaseModel): - """Response model after registering a new device.""" + device_id: str + protocol: str + certificate_id: str | None = None + ca_certificate_pem: str | None = None + certificate_pem: str | None = None + private_key_pem: str | None = None + expires_at: datetime.datetime | None = None + credential_id: str | None = None + api_key: str | None = None + webhook_secret: str | None = None + +class DeviceResponse(BaseModel): + id: str + name: str + location: str | None = None + protocol: str + connection_config: dict[str, Any] | None = None + created_at: datetime.datetime + +class DeviceCertificateResponse(BaseModel): certificate_id: str device_id: str ca_certificate_pem: str @@ -19,10 +40,10 @@ class DeviceRegistrationResponse(BaseModel): private_key_pem: str expires_at: datetime.datetime -class DeviceResponse(BaseModel): - """Response model for device information.""" - id: str - name: str - location: str | None = None - created_at: datetime.datetime +class DeviceCredentials(BaseModel): + certificate_id: str + device_id: str + certificate_pem: bytes + private_key_pem: bytes + expires_at: datetime.datetime