Added multi-protocol support for devices, improved models and updated readme.md and instructions

This commit is contained in:
2025-11-02 14:09:29 +01:00
parent ddbc588c77
commit 96e2377073
13 changed files with 730 additions and 375 deletions

View File

@@ -1,66 +1,114 @@
The following concise instructions help AI coding agents become productive in this repository. The following concise instructions help AI coding agents become productive in this repository.
Purpose Purpose
- This repo is a small Django-based IoT dashboard that ingests sensor data via MQTT, stores metadata in Django models, temporarily queues messages in Redis (streams/hashes), and persistently stores timeseries in Postgres/Timescale via background tasks (Huey). - This repo is a microservices-based IoT platform for device management, data ingestion, and telemetry storage. The system uses MQTT with mTLS authentication, Redis streams for message queuing, and PostgreSQL/TimescaleDB for persistent storage.
Big Picture Big Picture
- Architecture: Device → MQTT (mTLS) → mqtt_ingestion → Redis → db_write → PostgreSQL/TimescaleDB
- Components: - Components:
- `iotDashboard/` — Django app (models, views, templates, tasks) - `services/device_manager/` — FastAPI service for device registration, X.509 certificate issuance, and lifecycle management
- `manage.py` — Django CLI - `services/mqtt_ingestion/` — MQTT client that subscribes to device topics and writes to single Redis stream `mqtt:ingestion`
- `mqtt_service.py` — standalone MQTT client that subscribes to device topics and writes to Redis - `services/db_write/` — Consumer service that reads from Redis streams and writes to database using consumer groups
- `tasks.py` — Huey periodic tasks that read Redis and write to Postgres - `db_migrations/` — Alembic migrations for schema management (SQLAlchemy models)
- Redis — used for device metadata (`mqtt_devices`), per-sensor streams and latest-value hashes - `infrastructure/` — Docker Compose setup (PostgreSQL, Redis, Mosquitto MQTT broker)
- Postgres/Timescale — final storage for `sensor_readings` table (raw SQL used in places) - `iotDashboard/` — Legacy Django app (being phased out)
Key Files To Read First Key Files To Read First
- `iotDashboard/settings.py` — central settings; environment variables expected: `SECRET_KEY`, `CONNECTION_STRING`, `MQTT_BROKER`, `MQTT_USER`, `MQTT_PASS`, `REDIS_HOST`. - `db_migrations/models.py` — SQLAlchemy models: `Device`, `DeviceCertificate`, `Telemetry`. Canonical schema definition.
- `iotDashboard/models.py``Device`, `Sensor`, `SensorType`; these shape how devices and sensors are represented. - `services/device_manager/app/app.py` — FastAPI endpoints for device registration, certificate management, revocation, renewal.
- `mqtt_service.py` — where MQTT messages are received and written to Redis. Important for stream naming and payload format. - `services/device_manager/app/cert_manager.py` — X.509 certificate generation, CA management, CRL generation.
- `iotDashboard/tasks.py` — Huey tasks that consume Redis and insert into the DB. Shows ingestion logic and timescale interactions. - `services/mqtt_ingestion/src/mqtt_client.py` — MQTT subscriber that parses `devices/{device_id}/{metric}` topics.
- `iotDashboard/views.py` and `templates/chart.html` — how the UI reads `mqtt_latest`/Timescale data and what format it expects. - `services/mqtt_ingestion/src/redis_writer.py` — Writes to single stream `mqtt:ingestion` with device_id, metric, value, timestamp.
- `services/db_write/src/redis_reader.py` — Consumer group reader for `mqtt:ingestion` stream.
- `services/db_write/src/db_writer.py` — Batch writes to `telemetry` table using SQLAlchemy.
- `infrastructure/compose.yml` — Docker services: PostgreSQL/TimescaleDB, Redis, Mosquitto MQTT.
- `infrastructure/mosquitto/mosquitto.conf` — MQTT broker config with mTLS on port 8883, CRL checking enabled.
Important Conventions & Patterns Important Conventions & Patterns
- Redis usage: repo stores device metadata under `mqtt_devices` (JSON), and the code uses Redis streams and hashes inconsistently. When changing stream behavior, update both `mqtt_service.py` and `tasks.py` to remain compatible. - **Single stream architecture**: All MQTT data flows through one Redis stream `mqtt:ingestion`. Each message contains `device_id`, `metric`, `value`, `timestamp`.
- Topic/Stream canonicalization: adopt a single convention: MQTT topic `devices/{device_id}/{sensor}` and Redis stream `mqtt_stream:{device_id}:{sensor}`. Latest-value hash pattern: `mqtt_latest:{device_id}`. - **MQTT topics**: Standard format `devices/{device_id}/{metric}`. Examples: `devices/abc123/temperature`, `devices/xyz789/humidity`.
- No `requirements.txt` in repo; use `python-dotenv` + `redis`, `paho-mqtt`, `huey`, `psycopg2-binary`, `requests`, `Django` (4.2) — add a `requirements.txt` before running. - **Certificate IDs**: Use certificate serial number (hex format) as primary key in `device_certificates` table. Multiple certificates per device supported.
- Avoid import-time side-effects: `tasks.py` currently opens Redis and calls `devices_to_redis()` at import time — refactor to lazy init or a management command. - **Package manager**: All services use `uv` for dependency management (`pyproject.toml` not `requirements.txt`).
- **Database migrations**: Use Alembic for schema changes. Run migrations from `db_migrations/` directory.
- **Configuration**: All services use `.env` files. Never hardcode hosts/credentials.
- **Import organization**: Services have `app/` or `src/` package structure. Import as `from app.module import ...` or `from src.module import ...`.
- **Consumer groups**: `db_write` uses Redis consumer groups for at-least-once delivery. Consumer name must be unique per instance.
Developer Workflows (commands & notes) Developer Workflows (commands & notes)
- Run Django dev server (use virtualenv and install deps): - **Start infrastructure**: `cd infrastructure && docker compose up -d` (Postgres, Redis, Mosquitto)
- `pip install -r requirements.txt` (create this file if missing) - **Run database migrations**: `cd db_migrations && uv run alembic upgrade head`
- `python manage.py migrate` - **Generate CA certificate**: `cd services/device_manager && ./generate_ca.sh` (first time only)
- `python manage.py runserver` - **Run device_manager**: `cd services/device_manager && uv run uvicorn app.app:app --reload --port 8000`
- Run MQTT service locally (requires Redis & MQTT broker): - **Run mqtt_ingestion**: `cd services/mqtt_ingestion && uv run main.py`
- `python mqtt_service.py` - **Run db_write**: `cd services/db_write && uv run main.py`
- Example publish: `mosquitto_pub -t "devices/esp32/test_temperature" -m "23.5"` - **Register device**: `curl -X POST http://localhost:8000/devices/register -H "Content-Type: application/json" -d '{"name":"test","location":"lab"}'`
- Huey tasks: - **Test MQTT with mTLS**: `mosquitto_pub --cafile ca.crt --cert device.crt --key device.key -h localhost -p 8883 -t "devices/abc123/temperature" -m "23.5"`
- The project uses `huey.contrib.djhuey`; run workers with Django settings: `python manage.py run_huey` (ensure huey is installed and configured in `HUEY` setting). - **Inspect Redis stream**: `redis-cli XLEN mqtt:ingestion` and `redis-cli XRANGE mqtt:ingestion - + COUNT 10`
- Inspect Redis during debugging: - **Check consumer group**: `redis-cli XINFO GROUPS mqtt:ingestion`
- `redis-cli KEYS "mqtt*"` - **View CRL**: `openssl crl -in infrastructure/mosquitto/certs/ca.crl -text -noout`
- `redis-cli XREVRANGE mqtt_stream:mydevice:temperature + - COUNT 10`
- `redis-cli HGETALL mqtt_latest:mydevice`
Integration Points & Gotchas Integration Points & Gotchas
- Environment variables: many hosts/credentials are taken from `.env` via `python-dotenv`. If missing, code sometimes falls back to defaults or will raise at runtime. Add `.env` or set env vars in the system. - **Environment variables**: All services load from `.env` files. No defaults - service will fail if required vars missing. Copy `.env.example` first.
- DB access: `tasks.py` sometimes uses `psycopg2.connect(settings.CONNECTION_STRING)` while views use Django connections. If you change DB config, update both patterns or consolidate to Django connections. - **Certificate paths**: `device_manager` writes CRL to `infrastructure/mosquitto/certs/ca.crl`. Mosquitto must restart after CRL updates.
- Topic parsing: `mqtt_service.py` expects at least 3 topic parts (it reads `topic_parts[2]`) — be defensive when editing. - **Database schema**: Schema changes require Alembic migration. Never modify tables manually. Use `alembic revision --autogenerate`.
- Stream payloads: `xadd` must receive simple string fields (no nested dicts). When changing stream layout, update the reader in `tasks.py` accordingly. - **MQTT topic parsing**: `mqtt_ingestion` expects exactly `devices/{device_id}/{metric}` (3 parts). Invalid topics are logged and dropped.
- Logging: repo uses `print` widely. Prefer converting prints to Python `logging` for maintainability. - **Redis stream format**: `mqtt:ingestion` messages must have `device_id`, `metric`, `value`, `timestamp` fields (all strings).
- **Consumer groups**: `db_write` creates consumer group `db_writer` automatically. Don't delete it manually.
- **Certificate serial numbers**: Used as primary key in `device_certificates.id`. Extract with `format(cert.serial_number, 'x')`.
- **TimescaleDB hypertables**: `telemetry` table is a hypertable. Don't add constraints that break time partitioning.
- **File permissions**: Mosquitto directories may be owned by UID 1883. Fix with `sudo chown -R $USER:$USER infrastructure/mosquitto/`.
What AI agents should do first What AI agents should do first
- Do not change stream/topic names unless you update both `mqtt_service.py` and `tasks.py`. - **Read architecture first**: Check `README.md` for current architecture. System is microservices-based, not Django monolith.
- Remove import-time Redis initializations and `exit()` calls; move to lazily-created client getters or management commands. - **Check database schema**: Always start with `db_migrations/models.py` to understand data model.
- Centralize config in `settings.py` and import `from django.conf import settings` in scripts instead of hardcoded IPs. - **Don't change stream names**: Single stream `mqtt:ingestion` is used by mqtt_ingestion and db_write. Changing breaks both services.
- When making API/DB changes, prefer to update `views.py` and `tasks.py` together and add short integration tests using `pytest` and a Redis test double (or local docker-compose). - **Use proper imports**: Services use package structure. Import from `app.*` or `src.*`, not relative imports.
- **Create migrations**: Schema changes require `alembic revision --autogenerate`. Never modify models without migration.
- **Test with real infrastructure**: Use `docker compose up` for integration testing. Unit tests are insufficient for this architecture.
- **Check .env files**: Each service has `.env.example`. Copy and configure before running.
Examples (copyable snippets) Examples (copyable snippets)
- XADD to create canonical stream entry: - **Write to single stream** (mqtt_ingestion):
- `redis_client.xadd(f"mqtt_stream:{device_id}:{sensor}", {"value": str(sensor_value), "time": datetime.utcnow().isoformat()})` ```python
- Create/read consumer group (ingest): redis_client.xadd("mqtt:ingestion", {
- `redis_client.xgroup_create(stream, "ingest", id="0", mkstream=True)` "device_id": device_id,
- `entries = redis_client.xreadgroup("ingest", consumer_name, {stream: ">"}, count=10, block=5000)` "metric": sensor_type,
"value": str(value),
"timestamp": datetime.utcnow().isoformat()
})
```
- **Read from stream with consumer group** (db_write):
```python
results = redis_client.xreadgroup(
groupname="db_writer",
consumername="worker-01",
streams={"mqtt:ingestion": ">"},
count=100,
block=5000
)
```
- **Extract certificate serial number**:
```python
from cryptography import x509
cert = x509.load_pem_x509_certificate(cert_pem)
cert_id = format(cert.serial_number, 'x')
```
- **Query active certificates**:
```python
device_cert = db.query(DeviceCertificate).filter(
DeviceCertificate.device_id == device_id,
DeviceCertificate.revoked_at.is_(None)
).first()
```
If you add or change docs If you add or change docs
- Update `README.md` with a simple `docker-compose.yml` recipe for Redis/Postgres/Mosquitto and document environment variables. Update `env.sample` with `REDIS_HOST`, `CONNECTION_STRING`, `MQTT_BROKER`, `MQTT_USER`, `MQTT_PASS`. - Update `README.md` for architecture changes
- Update `.github/copilot-instructions.md` for development workflow changes
- Update service-specific READMEs (`services/*/README.md`) for API or configuration changes
- Document environment variables in `.env.example` files
- Add migration notes to Alembic revision if schema change is complex
If anything in these instructions looks off or incomplete for your current refactor, tell me what you'd like to focus on and I'll iterate. If anything in these instructions looks off or incomplete for your current refactor, tell me what you'd like to focus on and I'll iterate.

View File

@@ -1,2 +1,17 @@
# iotDashboard # IoT Dashboard
iotDashboard - IOT dashboard with Django, TimescaleDB and Redis
Microservices-based IoT platform with device management, mTLS authentication, and time-series data storage.
**Architecture:** Device → MQTT (mTLS) → mqtt_ingestion → Redis → db_write → PostgreSQL/TimescaleDB
## Services
- **device_manager** - Device registration & X.509 certificates (FastAPI)
- **mqtt_ingestion** - MQTT → Redis pipeline
- **db_write** - Redis → PostgreSQL writer
- **infrastructure** - Docker Compose (PostgreSQL, Redis, Mosquitto)
## License
MIT License

View File

@@ -0,0 +1,55 @@
"""add protocol and connection_config to devices
Revision ID: 4e405f1129b1
Revises: 4f152b34e800
Create Date: 2025-11-01 19:07:22.800918+00:00
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '4e405f1129b1'
down_revision: Union[str, Sequence[str], None] = '4f152b34e800'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('device_credentials',
sa.Column('id', sa.Text(), nullable=False),
sa.Column('device_id', sa.Text(), nullable=False),
sa.Column('credential_type', sa.Text(), nullable=False),
sa.Column('credential_hash', sa.Text(), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('expires_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('revoked_at', sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(['device_id'], ['devices.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_index('idx_device_credentials_active', 'device_credentials', ['device_id', 'revoked_at'], unique=False)
op.create_index('idx_device_credentials_device_id', 'device_credentials', ['device_id'], unique=False)
# Add protocol column as nullable first, set default for existing rows, then make NOT NULL
op.add_column('devices', sa.Column('protocol', sa.Text(), nullable=True))
op.execute("UPDATE devices SET protocol = 'mqtt' WHERE protocol IS NULL")
op.alter_column('devices', 'protocol', nullable=False)
op.add_column('devices', sa.Column('connection_config', sa.JSON(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('devices', 'connection_config')
op.drop_column('devices', 'protocol')
op.drop_index('idx_device_credentials_device_id', table_name='device_credentials')
op.drop_index('idx_device_credentials_active', table_name='device_credentials')
op.drop_table('device_credentials')
# ### end Alembic commands ###

View File

@@ -8,7 +8,7 @@ To modify schema:
4. Run: alembic upgrade head 4. Run: alembic upgrade head
""" """
from sqlalchemy import Boolean, Column, Float, ForeignKey, Index, Text, DateTime from sqlalchemy import Boolean, Column, Float, ForeignKey, Index, Text, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func from sqlalchemy.sql import func
@@ -23,13 +23,13 @@ class Device(Base):
id = Column(Text, primary_key=True) id = Column(Text, primary_key=True)
name = Column(Text, nullable=False) name = Column(Text, nullable=False)
location = Column(Text) location = Column(Text)
protocol = Column(Text, nullable=False, default="mqtt")
connection_config = Column(JSON)
is_active = Column(Boolean, default=True) is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now()) created_at = Column(DateTime(timezone=True), server_default=func.now())
def __repr__(self): def __repr__(self):
return f"<Device(id={self.id}, name={self.name})>" return f"<Device(id={self.id}, name={self.name}, protocol={self.protocol})>"
class DeviceCertificate(Base): class DeviceCertificate(Base):
"""X.509 certificates issued to devices for mTLS authentication.""" """X.509 certificates issued to devices for mTLS authentication."""
@@ -40,7 +40,7 @@ class DeviceCertificate(Base):
Text, ForeignKey("devices.id", ondelete="CASCADE"), nullable=False Text, ForeignKey("devices.id", ondelete="CASCADE"), nullable=False
) )
certificate_pem = Column(Text, nullable=False) certificate_pem = Column(Text, nullable=False)
private_key_pem = Column(Text) # Optional: for backup/escrow private_key_pem = Column(Text)
issued_at = Column(DateTime(timezone=True), nullable=False) issued_at = Column(DateTime(timezone=True), nullable=False)
expires_at = Column(DateTime(timezone=True), nullable=False) expires_at = Column(DateTime(timezone=True), nullable=False)
revoked_at = Column(DateTime(timezone=True)) revoked_at = Column(DateTime(timezone=True))
@@ -54,6 +54,30 @@ class DeviceCertificate(Base):
return f"<DeviceCertificate(id={self.id}, device_id={self.device_id}, expires={self.expires_at})>" return f"<DeviceCertificate(id={self.id}, device_id={self.device_id}, expires={self.expires_at})>"
class DeviceCredential(Base):
"""Authentication credentials for non-mTLS protocols (HTTP, webhook, etc)."""
__tablename__ = "device_credentials"
id = Column(Text, primary_key=True)
device_id = Column(
Text, ForeignKey("devices.id", ondelete="CASCADE"), nullable=False
)
credential_type = Column(Text, nullable=False)
credential_hash = Column(Text, nullable=False)
created_at = Column(DateTime(timezone=True), nullable=False)
expires_at = Column(DateTime(timezone=True))
revoked_at = Column(DateTime(timezone=True))
__table_args__ = (
Index("idx_device_credentials_device_id", "device_id"),
Index("idx_device_credentials_active", "device_id", "revoked_at"),
)
def __repr__(self):
return f"<DeviceCredential(id={self.id}, device_id={self.device_id}, type={self.credential_type})>"
class Telemetry(Base): class Telemetry(Base):
""" """
Time-series telemetry data from devices. Time-series telemetry data from devices.

View File

@@ -0,0 +1,157 @@
"""API client for the device_manager microservice."""
import os
import requests
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class DeviceRegistrationResponse:
device_id: str
certificate_id: str
certificate: str
private_key: str
ca_certificate: str
@dataclass
class DeviceInfo:
id: str
name: str
location: Optional[str]
is_active: bool
created_at: datetime
certificates: List[Dict[str, Any]]
class DeviceManagerAPIError(Exception):
def __init__(self, status_code: int, message: str, details: Optional[Dict] = None):
self.status_code = status_code
self.message = message
self.details = details or {}
super().__init__(f"API Error {status_code}: {message}")
class DeviceManagerClient:
def __init__(self, base_url: Optional[str] = None):
self.base_url = base_url or os.getenv("DEVICE_MANAGER_URL", "http://localhost:8000")
self.session = requests.Session()
self.session.headers.update({"Content-Type": "application/json"})
def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
url = f"{self.base_url}{endpoint}"
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response
except requests.exceptions.HTTPError as e:
try:
error_data = e.response.json()
message = error_data.get("detail", str(e))
except (ValueError, AttributeError):
message = str(e)
raise DeviceManagerAPIError(
status_code=e.response.status_code,
message=message,
details=error_data if "error_data" in locals() else {},
)
except requests.exceptions.RequestException as e:
raise DeviceManagerAPIError(
status_code=0, message=f"Connection error: {str(e)}"
)
def register_device(self, name: str, location: Optional[str] = None) -> DeviceRegistrationResponse:
payload = {"name": name}
if location:
payload["location"] = location
response = self._request("POST", "/devices/register", json=payload)
data = response.json()
return DeviceRegistrationResponse(
device_id=data["device_id"],
certificate_id=data["certificate_id"],
certificate=data["certificate"],
private_key=data["private_key"],
ca_certificate=data["ca_certificate"],
)
def get_device(self, device_id: str) -> DeviceInfo:
response = self._request("GET", f"/devices/{device_id}")
data = response.json()
return DeviceInfo(
id=data["id"],
name=data["name"],
location=data.get("location"),
is_active=data["is_active"],
created_at=datetime.fromisoformat(data["created_at"].replace("Z", "+00:00")),
certificates=data.get("certificates", []),
)
def list_devices(self) -> List[DeviceInfo]:
response = self._request("GET", "/devices/")
data = response.json()
return [
DeviceInfo(
id=device["id"],
name=device["name"],
location=device.get("location"),
is_active=device["is_active"],
created_at=datetime.fromisoformat(
device["created_at"].replace("Z", "+00:00")
),
certificates=device.get("certificates", []),
)
for device in data
]
def revoke_certificate(self, device_id: str) -> Dict[str, Any]:
response = self._request("POST", f"/devices/{device_id}/revoke")
return response.json()
def renew_certificate(self, device_id: str) -> Dict[str, Any]:
response = self._request("POST", f"/devices/{device_id}/renew")
return response.json()
def get_ca_certificate(self) -> str:
response = self._request("GET", "/ca_certificate")
return response.text
def get_crl(self) -> str:
response = self._request("GET", "/crl")
return response.text
def health_check(self) -> bool:
try:
response = self.session.get(f"{self.base_url}/docs", timeout=2)
return response.status_code == 200
except requests.exceptions.RequestException:
return False
default_client = DeviceManagerClient()
def register_device(name: str, location: Optional[str] = None) -> DeviceRegistrationResponse:
return default_client.register_device(name, location)
def get_device(device_id: str) -> DeviceInfo:
return default_client.get_device(device_id)
def list_devices() -> List[DeviceInfo]:
return default_client.list_devices()
def revoke_certificate(device_id: str) -> Dict[str, Any]:
return default_client.revoke_certificate(device_id)
def renew_certificate(device_id: str) -> Dict[str, Any]:
return default_client.renew_certificate(device_id)

View File

@@ -1,42 +1,175 @@
"""
Django models that mirror the SQLAlchemy schema from db_migrations/models.py.
These models are read-only (managed=False) and query the microservices database.
For write operations, use the device_manager API client instead.
"""
from django.db import models from django.db import models
class SensorType(models.Model): class Device(models.Model):
name = models.CharField( """IoT devices registered in the system."""
max_length=50, unique=True
) # Sensor name, e.g., "CO2", "Noise", etc. id = models.CharField(max_length=8, primary_key=True)
unit = models.CharField( name = models.CharField(max_length=255)
max_length=20 location = models.CharField(max_length=255, null=True, blank=True)
) # Unit of measurement, e.g., "ppm", "dB", "lux" protocol = models.CharField(max_length=50, default="mqtt")
protocol = models.CharField( connection_config = models.JSONField(null=True, blank=True)
max_length=20, choices=[("mqtt", "MQTT"), ("http", "HTTP")] is_active = models.BooleanField(default=True)
) # Protocol for communication created_at = models.DateTimeField(auto_now_add=True)
topic = models.CharField(
max_length=100, null=True, blank=True class Meta:
) # Topic for MQTT communication managed = False
endpoint = models.CharField( db_table = "devices"
max_length=100, null=True, blank=True
) # Endpoint for HTTP communication
def __str__(self): def __str__(self):
return f"{self.name} ({self.unit})" return f"{self.name} ({self.id}) [{self.protocol}]"
@property
def active_certificate(self):
"""Get the active (non-revoked) certificate for this device."""
return self.certificates.filter(revoked_at__isnull=True).first()
@property
def certificate_status(self):
"""Get human-readable certificate status for MQTT devices."""
if self.protocol != "mqtt":
return "N/A"
cert = self.active_certificate
if not cert:
return "No Certificate"
if cert.is_expired:
return "Expired"
if cert.is_expiring_soon:
return "Expiring Soon"
return "Valid"
class Device(models.Model): class DeviceCertificate(models.Model):
name = models.CharField(max_length=50) # Device name """X.509 certificates issued to devices for mTLS authentication."""
ip = models.CharField(max_length=20) # Device IP address
protocol = models.CharField( id = models.CharField(
max_length=20, choices=[("mqtt", "MQTT"), ("http", "HTTP")] max_length=255, primary_key=True
) # Certificate serial number (hex)
device = models.ForeignKey(
Device, on_delete=models.CASCADE, related_name="certificates", db_column="device_id"
)
certificate_pem = models.TextField()
private_key_pem = models.TextField(null=True, blank=True) # Optional backup
issued_at = models.DateTimeField()
expires_at = models.DateTimeField()
revoked_at = models.DateTimeField(null=True, blank=True)
class Meta:
managed = False # Don't create/modify this table
db_table = "device_certificates"
indexes = [
models.Index(fields=["device"]),
models.Index(fields=["device", "revoked_at"]),
]
def __str__(self):
status = "Revoked" if self.revoked_at else "Active"
return f"Certificate {self.id[:8]}... for {self.device.name} ({status})"
@property
def is_revoked(self):
"""Check if certificate is revoked."""
return self.revoked_at is not None
@property
def is_expired(self):
"""Check if certificate is expired."""
from django.utils import timezone
return timezone.now() > self.expires_at
@property
def is_expiring_soon(self):
"""Check if certificate expires within 30 days."""
from django.utils import timezone
from datetime import timedelta
return (
not self.is_expired
and self.expires_at < timezone.now() + timedelta(days=30)
) )
def __str__(self): @property
return self.name def is_valid(self):
"""Check if certificate is valid (not revoked and not expired)."""
return not self.is_revoked and not self.is_expired
@property
def days_until_expiry(self):
"""Calculate days until certificate expires."""
from django.utils import timezone
if self.is_expired:
return 0
delta = self.expires_at - timezone.now()
return delta.days
class Sensor(models.Model): class DeviceCredential(models.Model):
device = models.ForeignKey(Device, related_name="sensors", on_delete=models.CASCADE) """Authentication credentials for non-mTLS protocols (HTTP, webhook, etc)."""
type = models.ForeignKey(SensorType, on_delete=models.CASCADE)
enabled = models.BooleanField(default=True) id = models.CharField(max_length=255, primary_key=True)
device = models.ForeignKey(
Device, on_delete=models.CASCADE, related_name="credentials", db_column="device_id"
)
credential_type = models.CharField(max_length=50)
credential_hash = models.TextField()
created_at = models.DateTimeField()
expires_at = models.DateTimeField(null=True, blank=True)
revoked_at = models.DateTimeField(null=True, blank=True)
class Meta:
managed = False
db_table = "device_credentials"
indexes = [
models.Index(fields=["device"]),
models.Index(fields=["device", "revoked_at"]),
]
def __str__(self): def __str__(self):
return f"{self.type.name} Sensor on {self.device.name}" status = "Revoked" if self.revoked_at else "Active"
return f"{self.credential_type} for {self.device.name} ({status})"
@property
def is_revoked(self):
return self.revoked_at is not None
@property
def is_expired(self):
from django.utils import timezone
return self.expires_at and timezone.now() > self.expires_at
@property
def is_valid(self):
return not self.is_revoked and not self.is_expired
class Telemetry(models.Model):
"""Time-series telemetry data from devices."""
time = models.DateTimeField()
device = models.ForeignKey(
Device, on_delete=models.CASCADE, related_name="telemetry", db_column="device_id"
)
metric = models.CharField(max_length=255)
value = models.FloatField()
unit = models.CharField(max_length=50, null=True, blank=True)
class Meta:
managed = False
db_table = "telemetry"
unique_together = [["time", "device", "metric"]]
indexes = [
models.Index(fields=["device", "time"]),
]
def __str__(self):
return f"{self.device.name} - {self.metric}: {self.value} at {self.time}"

View File

@@ -13,7 +13,6 @@ https://docs.djangoproject.com/en/4.2/ref/settings/
from dotenv import load_dotenv from dotenv import load_dotenv
from pathlib import Path from pathlib import Path
import os import os
# from huey import SqliteHuey
# Build paths inside the project like this: BASE_DIR / 'subdir'. # Build paths inside the project like this: BASE_DIR / 'subdir'.
@@ -27,7 +26,12 @@ load_dotenv()
# SECURITY WARNING: keep the secret key used in production secret! # SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = os.getenv("SECRET_KEY") SECRET_KEY = os.getenv("SECRET_KEY")
CONNECTION_STRING = os.getenv("CONNECTION_STRING")
POSTGRES_HOST = os.getenv("POSTGRES_HOST")
POSTGRES_PORT = os.getenv("POSTGRES_PORT")
POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_DB = os.getenv("POSTGRES_DB")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
# SECURITY WARNING: don't run with debug turned on in production! # SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True DEBUG = True
@@ -45,7 +49,6 @@ INSTALLED_APPS = [
"django.contrib.messages", "django.contrib.messages",
"django.contrib.staticfiles", "django.contrib.staticfiles",
"iotDashboard", "iotDashboard",
# 'huey.contrib.djhuey',
] ]
MIDDLEWARE = [ MIDDLEWARE = [
@@ -84,17 +87,14 @@ WSGI_APPLICATION = "iotDashboard.wsgi.application"
DATABASES = { DATABASES = {
"default": { "default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": BASE_DIR / "db.sqlite3",
},
"data": {
"ENGINE": "django.db.backends.postgresql", "ENGINE": "django.db.backends.postgresql",
"NAME": "example", "NAME": POSTGRES_DB,
"USER": "postgres", "USER": POSTGRES_USER,
"PASSWORD": os.getenv("PASSWORD"), "PASSWORD": POSTGRES_PASSWORD,
"HOST": "10.10.0.1", "HOST": POSTGRES_HOST,
"PORT": "5555", "PORT": POSTGRES_PORT,
}, },
} }
@@ -139,12 +139,3 @@ STATIC_URL = "static/"
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField" DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
# HUEY = {
# 'huey_class': 'huey.SqliteHuey', # Or 'huey.RedisHuey' for Redis
# 'filename': 'demo.db', # SQLite file for task storage
# 'results': True,
# 'store_none': False,
# 'immediate': False,
# 'utc': True,
# }

View File

@@ -1,180 +0,0 @@
import json
import datetime
import os
import requests
import psycopg2
import redis
from django.conf import settings
from huey import crontab
from huey.contrib.djhuey import periodic_task
from .models import Device
from dotenv import load_dotenv
load_dotenv()
REDIS_HOST = os.getenv("REDIS_HOST", "localhost") # Default to localhost if not set
try:
redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0)
print(redis_client)
redis_client.ping()
print("Connected!")
except Exception as ex:
print
"Error:", ex
exit("Failed to connect, terminating.")
def devices_to_redis():
"""Fetch devices and their sensors' topics from Django and store them in Redis."""
devices = Device.objects.all()
devices_list = []
for device in devices:
for sensor in device.sensors.all():
sensor_data = {
"device_name": device.name,
"sensor_name": sensor.type.name,
"topic": sensor.type.topic, # Assuming the topic is stored in SensorType
}
devices_list.append(sensor_data)
redis_client.set("mqtt_devices", json.dumps(devices_list))
print("Devices with sensors stored in Redis.")
def fetch_data_http(device, sensor):
"""Fetch data from an HTTP sensor."""
sensor_type_name = sensor.type.name.lower()
try:
response = requests.get(
f"http://{device.ip}/sensor/{sensor_type_name}", timeout=5
)
response.raise_for_status()
sensor_value = response.json().get("value")
if sensor_value is not None:
return {
"time": datetime.datetime.utcnow().isoformat(),
"device": device.name,
"sensor": sensor_type_name,
"sensor_value": sensor_value,
}
else:
print(f"No value returned from {device.name} for {sensor_type_name}")
except requests.RequestException as e:
print(f"HTTP request failed for {device.name}: {e}")
return None
def fetch_data_mqtt_stream(device, sensor):
"""Fetch data from Redis Stream for a specific MQTT device and sensor."""
sensor_name = sensor.type.name.lower()
stream_key = f"mqtt_stream:{device.name}:{sensor_name}"
try:
stream_data = redis_client.xread({stream_key: "0-0"}, block=1000, count=1)
if stream_data:
_, entries = stream_data[0]
for entry_id, entry_data in entries:
sensor_value = entry_data.get(b"value")
timestamp = entry_data.get(b"time")
if sensor_value and timestamp:
return {
"time": timestamp.decode("utf-8"),
"device": device.name,
"sensor_value": float(sensor_value.decode("utf-8")),
}
except Exception as e:
print(f"Error fetching data from stream {stream_key}: {e}")
return None
def is_recent_data(timestamp):
"""Check if data is within a 1-minute freshness window."""
data_time = datetime.datetime.fromisoformat(timestamp)
return data_time > datetime.datetime.utcnow() - datetime.timedelta(minutes=1)
def insert_data(data, sensor_type):
"""Insert parsed data into the PostgreSQL database."""
if "sensor_value" not in data:
print(f"Missing 'sensor_value' in data: {data}. Skipping insertion.")
return
insert_data_dict = {
"time": data["time"],
"device": data["device"],
"metric": sensor_type.lower(),
"value": data["sensor_value"],
}
try:
with psycopg2.connect(settings.CONNECTION_STRING) as conn:
with conn.cursor() as cursor:
insert_query = """
INSERT INTO sensor_readings (time, device_name, metric, value)
VALUES (%s, %s, %s, %s);
"""
cursor.execute(
insert_query,
(
insert_data_dict["time"],
insert_data_dict["device"],
insert_data_dict["metric"],
insert_data_dict["value"],
),
)
conn.commit()
print(
f"Data inserted successfully for {insert_data_dict['device']}: {insert_data_dict}"
)
except Exception as e:
print(f"Failed to insert data: {e}")
@periodic_task(crontab(minute="*/1"))
def fetch_data_from_all_devices():
"""Fetch and insert data for all devices based on their protocol."""
devices = Device.objects.all()
for device in devices:
for sensor in device.sensors.all():
data = None
if device.protocol == "http":
data = fetch_data_http(device, sensor)
elif device.protocol == "mqtt":
data = fetch_data_mqtt_stream(device, sensor)
if data and is_recent_data(data["time"]):
insert_data(data, sensor.type.name)
else:
print(f"No recent or valid data for {device.name}. Skipping.")
@periodic_task(crontab(minute="*/5"))
def last_5_minutes():
"""Fetch the last 5 readings from TimescaleDB and store them in Redis."""
try:
with psycopg2.connect(settings.CONNECTION_STRING) as conn:
with conn.cursor() as cursor:
cursor.execute("""
SELECT time, device_name, metric, value
FROM sensor_readings
ORDER BY time DESC
LIMIT 5;
""")
results = cursor.fetchall()
data = [
{
"time": reading[0].isoformat(),
"device": reading[1],
"metric": reading[2],
"value": reading[3],
}
for reading in results
]
redis_client.set("last5", json.dumps(data))
print("Last 5 readings:", data)
except Exception as e:
print(f"Error fetching or storing the last 5 readings: {e}")
devices_to_redis()

View File

@@ -0,0 +1,32 @@
FROM ghcr.io/astral-sh/uv:python3.13-alpine AS builder
WORKDIR /app
ENV UV_COMPILE_BYTECODE=1
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-dev --no-install-project
COPY ./src/ ./src/
COPY main.py ./
RUN uv sync --frozen --no-dev
FROM python:3.13-alpine
WORKDIR /app
COPY --from=builder /app/.venv /app/.venv
COPY --from=builder /app/*.py /app/
RUN adduser -D -u 1000 appuser && \
chown -R appuser:appuser /app
USER appuser
ENV PATH="/app/.venv/bin:$PATH"
CMD ["python", "main.py"]

View File

@@ -5,8 +5,12 @@ from fastapi import FastAPI, HTTPException
from app.cert_manager import CertificateManager from app.cert_manager import CertificateManager
from app.database import get_db_context from app.database import get_db_context
from app.db_models import Device, DeviceCertificate # SQLAlchemy ORM models from app.db_models import Device, DeviceCertificate
from app.models import DeviceRegistrationRequest, DeviceRegistrationResponse, DeviceResponse from app.models import (
DeviceRegistrationRequest,
DeviceRegistrationResponse,
DeviceResponse,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -25,43 +29,62 @@ async def register_device(
request: DeviceRegistrationRequest, request: DeviceRegistrationRequest,
) -> DeviceRegistrationResponse: ) -> DeviceRegistrationResponse:
""" """
Register a new device and issue an X.509 certificate. Register a new device.
- MQTT devices: issues X.509 certificate for mTLS
- HTTP/webhook devices: generates API key or HMAC secret
""" """
try: try:
response = cert_manager.register_device( if request.protocol == "mqtt":
cert_response = cert_manager.register_device(
name=request.name, name=request.name,
location=request.location, location=request.location,
) )
with get_db_context() as db: with get_db_context() as db:
device = Device( device = Device(
id=response.device_id, id=cert_response.device_id,
name=request.name, name=request.name,
location=request.location, location=request.location,
protocol=request.protocol,
connection_config=request.connection_config,
created_at=datetime.datetime.now(datetime.UTC), created_at=datetime.datetime.now(datetime.UTC),
) )
db.add(device) db.add(device)
device_cert = DeviceCertificate( device_cert = DeviceCertificate(
id =response.certificate_id, id=cert_response.certificate_id,
device_id=response.device_id, device_id=cert_response.device_id,
certificate_pem=response.certificate_pem, certificate_pem=cert_response.certificate_pem,
private_key_pem=response.private_key_pem, private_key_pem=cert_response.private_key_pem,
issued_at=datetime.datetime.now(datetime.UTC), issued_at=datetime.datetime.now(datetime.UTC),
expires_at=response.expires_at, expires_at=cert_response.expires_at,
) )
db.add(device_cert) db.add(device_cert)
except Exception as e: return DeviceRegistrationResponse(
logger.error( device_id=cert_response.device_id,
f"Failed to register device {request.name}: {str(e)}", exc_info=True protocol=request.protocol,
certificate_id=cert_response.certificate_id,
ca_certificate_pem=cert_response.ca_certificate_pem,
certificate_pem=cert_response.certificate_pem,
private_key_pem=cert_response.private_key_pem,
expires_at=cert_response.expires_at,
) )
else:
raise HTTPException(
status_code=400,
detail=f"Protocol '{request.protocol}' not yet implemented. Only 'mqtt' is supported.",
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to register device {request.name}: {str(e)}", exc_info=True)
raise HTTPException( raise HTTPException(
status_code=500, detail="Failed to register device. Please try again." status_code=500, detail="Failed to register device. Please try again."
) from e ) from e
return response
@app.get("/ca_certificate") @app.get("/ca_certificate")
async def get_ca_certificate() -> str: async def get_ca_certificate() -> str:
@@ -73,9 +96,8 @@ async def get_ca_certificate() -> str:
return ca_cert_pem return ca_cert_pem
except Exception as e: except Exception as e:
logger.error(f"Failed to retrieve CA certificate: {str(e)}", exc_info=True) logger.error(f"Failed to retrieve CA certificate: {str(e)}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=500, detail="Failed to retrieve CA certificate.") from e
status_code=500, detail="Failed to retrieve CA certificate."
) from e
@app.get("/devices/{device_id}") @app.get("/devices/{device_id}")
async def get_device(device_id: str) -> DeviceResponse: async def get_device(device_id: str) -> DeviceResponse:
@@ -88,18 +110,19 @@ async def get_device(device_id: str) -> DeviceResponse:
if not device: if not device:
raise HTTPException(status_code=404, detail="Device not found") raise HTTPException(status_code=404, detail="Device not found")
return Device( return DeviceResponse(
id=device.id, id=device.id,
name=device.name, name=device.name,
location=device.location, location=device.location,
protocol=device.protocol,
connection_config=device.connection_config,
created_at=device.created_at, created_at=device.created_at,
) )
except Exception as e: except Exception as e:
logger.error(f"Failed to retrieve device {device_id}: {str(e)}", exc_info=True) logger.error(f"Failed to retrieve device {device_id}: {str(e)}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=500, detail="Failed to retrieve device information.") from e
status_code=500, detail="Failed to retrieve device information."
) from e
@app.get("/devices/") @app.get("/devices/")
async def list_devices() -> list[DeviceResponse]: async def list_devices() -> list[DeviceResponse]:
@@ -114,6 +137,8 @@ async def list_devices() -> list[DeviceResponse]:
id=device.id, id=device.id,
name=device.name, name=device.name,
location=device.location, location=device.location,
protocol=device.protocol,
connection_config=device.connection_config,
created_at=device.created_at, created_at=device.created_at,
) )
for device in devices for device in devices
@@ -121,9 +146,8 @@ async def list_devices() -> list[DeviceResponse]:
except Exception as e: except Exception as e:
logger.error(f"Failed to list devices: {str(e)}", exc_info=True) logger.error(f"Failed to list devices: {str(e)}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=500, detail="Failed to list devices.") from e
status_code=500, detail="Failed to list devices."
) from e
@app.post("/devices/{device_id}/revoke") @app.post("/devices/{device_id}/revoke")
async def revoke_device_certificate(device_id: str): async def revoke_device_certificate(device_id: str):
@@ -135,9 +159,7 @@ async def revoke_device_certificate(device_id: str):
try: try:
with get_db_context() as db: with get_db_context() as db:
device_cert = ( device_cert = (
db.query(DeviceCertificate) db.query(DeviceCertificate).filter(DeviceCertificate.device_id == device_id).first()
.filter(DeviceCertificate.device_id == device_id)
.first()
) )
if not device_cert: if not device_cert:
raise HTTPException(status_code=404, detail="Device certificate not found") raise HTTPException(status_code=404, detail="Device certificate not found")
@@ -155,16 +177,14 @@ async def revoke_device_certificate(device_id: str):
return { return {
"device_id": device_id, "device_id": device_id,
"revoked_at": device_cert.revoked_at.isoformat(), "revoked_at": device_cert.revoked_at.isoformat(),
"message": "Certificate revoked successfully" "message": "Certificate revoked successfully",
} }
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
logger.error(f"Failed to revoke device {device_id}: {str(e)}", exc_info=True) logger.error(f"Failed to revoke device {device_id}: {str(e)}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=500, detail="Failed to revoke device certificate.") from e
status_code=500, detail="Failed to revoke device certificate."
) from e
@app.get("/crl") @app.get("/crl")
@@ -180,9 +200,8 @@ async def get_crl():
return {"crl_pem": crl_pem} return {"crl_pem": crl_pem}
except Exception as e: except Exception as e:
logger.error(f"Failed to retrieve CRL: {str(e)}", exc_info=True) logger.error(f"Failed to retrieve CRL: {str(e)}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=500, detail="Failed to retrieve CRL.") from e
status_code=500, detail="Failed to retrieve CRL."
) from e
@app.post("/devices/{device_id}/renew") @app.post("/devices/{device_id}/renew")
async def renew_certificate(device_id: str): async def renew_certificate(device_id: str):
@@ -209,8 +228,7 @@ async def renew_certificate(device_id: str):
) )
if not device_cert: if not device_cert:
raise HTTPException( raise HTTPException(
status_code=404, status_code=404, detail="No active certificate found for device"
detail="No active certificate found for device"
) )
# Check if certificate is about to expire (optional warning) # Check if certificate is about to expire (optional warning)
@@ -227,15 +245,14 @@ async def renew_certificate(device_id: str):
# Generate new certificate with new keys # Generate new certificate with new keys
new_cert_pem, new_key_pem = cert_manager.renew_certificate( new_cert_pem, new_key_pem = cert_manager.renew_certificate(
current_cert_pem=device_cert.certificate_pem, current_cert_pem=device_cert.certificate_pem, validity_days=365, key_size=4096
validity_days=365,
key_size=4096
) )
# Extract certificate ID (serial number) from the new certificate # Extract certificate ID (serial number) from the new certificate
from cryptography import x509 from cryptography import x509
new_cert = x509.load_pem_x509_certificate(new_cert_pem) new_cert = x509.load_pem_x509_certificate(new_cert_pem)
new_cert_id = format(new_cert.serial_number, 'x') new_cert_id = format(new_cert.serial_number, "x")
# Create new certificate record in DB # Create new certificate record in DB
now = datetime.datetime.now(datetime.UTC) now = datetime.datetime.now(datetime.UTC)
@@ -252,9 +269,12 @@ async def renew_certificate(device_id: str):
logger.info(f"Successfully renewed certificate for device {device_id}") logger.info(f"Successfully renewed certificate for device {device_id}")
device = db.query(Device).filter(Device.id == device_id).first()
return DeviceRegistrationResponse( return DeviceRegistrationResponse(
certificate_id=new_cert_id,
device_id=device_id, device_id=device_id,
protocol=device.protocol if device else "mqtt",
certificate_id=new_cert_id,
ca_certificate_pem=cert_manager.get_ca_certificate_pem(), ca_certificate_pem=cert_manager.get_ca_certificate_pem(),
certificate_pem=new_device_cert.certificate_pem, certificate_pem=new_device_cert.certificate_pem,
private_key_pem=new_device_cert.private_key_pem, private_key_pem=new_device_cert.private_key_pem,
@@ -265,6 +285,4 @@ async def renew_certificate(device_id: str):
raise raise
except Exception as e: except Exception as e:
logger.error(f"Failed to renew certificate for device {device_id}: {str(e)}", exc_info=True) logger.error(f"Failed to renew certificate for device {device_id}: {str(e)}", exc_info=True)
raise HTTPException( raise HTTPException(status_code=500, detail="Failed to renew device certificate.") from e
status_code=500, detail="Failed to renew device certificate."
) from e

View File

@@ -8,7 +8,7 @@ from cryptography.x509.oid import NameOID
from nanoid import generate from nanoid import generate
from app.config import config from app.config import config
from app.models import DeviceRegistrationResponse from app.models import DeviceCertificateResponse, DeviceCredentials
lowercase_numbers = "abcdefghijklmnopqrstuvwxyz0123456789" lowercase_numbers = "abcdefghijklmnopqrstuvwxyz0123456789"
@@ -21,10 +21,12 @@ class CertificateManager:
self.ca_key: rsa.RSAPrivateKey = self.load_ca_private_key(config.CA_KEY_PATH) self.ca_key: rsa.RSAPrivateKey = self.load_ca_private_key(config.CA_KEY_PATH)
self.ca_cert_pem: bytes = self.ca_cert.public_bytes(serialization.Encoding.PEM) self.ca_cert_pem: bytes = self.ca_cert.public_bytes(serialization.Encoding.PEM)
def generate_device_id(self) -> str: def generate_device_id(self) -> str:
"""Generate a unique device ID using nanoid.""" """Generate a unique device ID using nanoid."""
return generate(alphabet=lowercase_numbers, size=config.DEVICE_ID_LENGTH) return generate(alphabet=lowercase_numbers, size=config.DEVICE_ID_LENGTH)
def load_ca_certificate(self, ca_cert_path: str) -> x509.Certificate: def load_ca_certificate(self, ca_cert_path: str) -> x509.Certificate:
"""Load a CA certificate from file.""" """Load a CA certificate from file."""
with open(ca_cert_path, "rb") as f: with open(ca_cert_path, "rb") as f:
@@ -32,6 +34,7 @@ class CertificateManager:
ca_cert = x509.load_pem_x509_certificate(ca_data) ca_cert = x509.load_pem_x509_certificate(ca_data)
return ca_cert return ca_cert
def load_ca_private_key(self, ca_key_path: str, password: bytes = None) -> rsa.RSAPrivateKey: def load_ca_private_key(self, ca_key_path: str, password: bytes = None) -> rsa.RSAPrivateKey:
"""Load a CA private key from file.""" """Load a CA private key from file."""
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
@@ -41,10 +44,12 @@ class CertificateManager:
ca_key = serialization.load_pem_private_key(key_data, password=password) ca_key = serialization.load_pem_private_key(key_data, password=password)
return ca_key return ca_key
def generate_device_key(self, key_size: int = 4096) -> rsa.RSAPrivateKey: def generate_device_key(self, key_size: int = 4096) -> rsa.RSAPrivateKey:
"""Generate an RSA private key for a device.""" """Generate an RSA private key for a device."""
return rsa.generate_private_key(public_exponent=65537, key_size=key_size) return rsa.generate_private_key(public_exponent=65537, key_size=key_size)
def generate_device_certificate( def generate_device_certificate(
self, self,
device_id: str, device_id: str,
@@ -88,12 +93,13 @@ class CertificateManager:
return cert_pem, key_pem return cert_pem, key_pem
def create_device_credentials( def create_device_credentials(
self, device_id: str, validity_days: int = 365, key_size: int = 4096 self, device_id: str, validity_days: int = 365, key_size: int = 4096
) -> dict: ) -> DeviceCredentials:
"""Create device credentials: private key and signed certificate. """Create device credentials: private key and signed certificate.
Returns: Returns:
dict with certificate_id, device_id, certificate_pem, private_key_pem, ca_certificate_pem, expires_at DeviceCredentials model with certificate_id, device_id, certificate_pem, private_key_pem, expires_at
""" """
device_key = self.generate_device_key(key_size=key_size) device_key = self.generate_device_key(key_size=key_size)
@@ -108,40 +114,42 @@ class CertificateManager:
# Extract serial number from certificate to use as ID # Extract serial number from certificate to use as ID
cert = x509.load_pem_x509_certificate(cert_pem) cert = x509.load_pem_x509_certificate(cert_pem)
cert_id = format(cert.serial_number, 'x') # Hex string of serial number cert_id = format(cert.serial_number, 'x')
expires_at = datetime.datetime.now(datetime.UTC) + datetime.timedelta(days=validity_days) expires_at = datetime.datetime.now(datetime.UTC) + datetime.timedelta(days=validity_days)
return { return DeviceCredentials(
"certificate_id": cert_id, certificate_id=cert_id,
"device_id": device_id, device_id=device_id,
"certificate_pem": cert_pem, certificate_pem=cert_pem,
"private_key_pem": key_pem, private_key_pem=key_pem,
"ca_certificate_pem": self.ca_cert_pem, expires_at=expires_at,
"expires_at": expires_at, )
}
def register_device(self, name: str, location: str | None = None) -> DeviceRegistrationResponse:
def register_device(self, name: str, location: str | None = None) -> DeviceCertificateResponse:
"""Register a new device and generate its credentials. """Register a new device and generate its credentials.
Returns: Returns:
DeviceRegistrationResponse DeviceCertificateResponse
""" """
device_id = self.generate_device_id() device_id = self.generate_device_id()
credentials = self.create_device_credentials(device_id=device_id) credentials = self.create_device_credentials(device_id=device_id)
return DeviceRegistrationResponse( return DeviceCertificateResponse(
certificate_id=credentials["certificate_id"], certificate_id=credentials.certificate_id,
device_id=credentials["device_id"], device_id=credentials.device_id,
ca_certificate_pem=credentials["ca_certificate_pem"].decode("utf-8"), ca_certificate_pem=self.ca_cert_pem.decode("utf-8"),
certificate_pem=credentials["certificate_pem"].decode("utf-8"), certificate_pem=credentials.certificate_pem.decode("utf-8"),
private_key_pem=credentials["private_key_pem"].decode("utf-8"), private_key_pem=credentials.private_key_pem.decode("utf-8"),
expires_at=credentials["expires_at"], expires_at=credentials.expires_at,
) )
def get_ca_certificate_pem(self) -> str: def get_ca_certificate_pem(self) -> str:
"""Get the CA certificate in PEM format as a string.""" """Get the CA certificate in PEM format as a string."""
return self.ca_cert_pem.decode("utf-8") return self.ca_cert_pem.decode("utf-8")
def revoke_certificate( def revoke_certificate(
self, certificate_pem: str, reason: x509.ReasonFlags = x509.ReasonFlags.unspecified self, certificate_pem: str, reason: x509.ReasonFlags = x509.ReasonFlags.unspecified
) -> None: ) -> None:
@@ -197,6 +205,7 @@ class CertificateManager:
with open(crl_path, "wb") as f: with open(crl_path, "wb") as f:
f.write(crl.public_bytes(serialization.Encoding.PEM)) f.write(crl.public_bytes(serialization.Encoding.PEM))
def get_crl_pem(self) -> str | None: def get_crl_pem(self) -> str | None:
"""Get the current CRL in PEM format.""" """Get the current CRL in PEM format."""
crl_path = Path(config.CRL_PATH) crl_path = Path(config.CRL_PATH)
@@ -206,6 +215,7 @@ class CertificateManager:
with open(crl_path, "rb") as f: with open(crl_path, "rb") as f:
return f.read().decode("utf-8") return f.read().decode("utf-8")
def renew_certificate( def renew_certificate(
self, self,
current_cert_pem: str, current_cert_pem: str,

View File

@@ -4,7 +4,7 @@ SQLAlchemy ORM models for device manager service.
These models mirror the database schema defined in db_migrations. These models mirror the database schema defined in db_migrations.
Kept separate to make the service independent. Kept separate to make the service independent.
""" """
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Text from sqlalchemy import JSON, Boolean, Column, DateTime, ForeignKey, Index, Text
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func from sqlalchemy.sql import func
@@ -19,11 +19,13 @@ class Device(Base):
id = Column(Text, primary_key=True) id = Column(Text, primary_key=True)
name = Column(Text, nullable=False) name = Column(Text, nullable=False)
location = Column(Text) location = Column(Text)
protocol = Column(Text, nullable=False, default="mqtt")
connection_config = Column(JSON)
is_active = Column(Boolean, default=True) is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now()) created_at = Column(DateTime(timezone=True), server_default=func.now())
def __repr__(self): def __repr__(self):
return f"<Device(id={self.id}, name={self.name})>" return f"<Device(id={self.id}, name={self.name}, protocol={self.protocol})>"
class DeviceCertificate(Base): class DeviceCertificate(Base):
@@ -33,7 +35,7 @@ class DeviceCertificate(Base):
id = Column(Text, primary_key=True) id = Column(Text, primary_key=True)
device_id = Column( device_id = Column(
Text, ForeignKey("devices.id", ondelete="CASCADE"), primary_key=True Text, ForeignKey("devices.id", ondelete="CASCADE"), nullable=False
) )
certificate_pem = Column(Text, nullable=False) certificate_pem = Column(Text, nullable=False)
private_key_pem = Column(Text) private_key_pem = Column(Text)
@@ -41,5 +43,34 @@ class DeviceCertificate(Base):
expires_at = Column(DateTime(timezone=True), nullable=False) expires_at = Column(DateTime(timezone=True), nullable=False)
revoked_at = Column(DateTime(timezone=True)) revoked_at = Column(DateTime(timezone=True))
__table_args__ = (
Index("idx_device_certificates_device_id", "device_id"),
Index("idx_device_certificates_active", "device_id", "revoked_at"),
)
def __repr__(self): def __repr__(self):
return f"<DeviceCertificate(device_id={self.device_id}, expires={self.expires_at})>" return f"<DeviceCertificate(id={self.id}, device_id={self.device_id}, expires={self.expires_at})>"
class DeviceCredential(Base):
"""Authentication credentials for non-mTLS protocols (HTTP, webhook, etc)."""
__tablename__ = "device_credentials"
id = Column(Text, primary_key=True)
device_id = Column(
Text, ForeignKey("devices.id", ondelete="CASCADE"), nullable=False
)
credential_type = Column(Text, nullable=False)
credential_hash = Column(Text, nullable=False)
created_at = Column(DateTime(timezone=True), nullable=False)
expires_at = Column(DateTime(timezone=True))
revoked_at = Column(DateTime(timezone=True))
__table_args__ = (
Index("idx_device_credentials_device_id", "device_id"),
Index("idx_device_credentials_active", "device_id", "revoked_at"),
)
def __repr__(self):
return f"<DeviceCredential(id={self.id}, device_id={self.device_id}, type={self.credential_type})>"

View File

@@ -1,17 +1,38 @@
import datetime import datetime
from typing import Any
from pydantic import BaseModel from pydantic import BaseModel
class DeviceRegistrationRequest(BaseModel): class DeviceRegistrationRequest(BaseModel):
"""Request model for registering a new device."""
name: str name: str
location: str | None = None location: str | None = None
protocol: str = "mqtt"
connection_config: dict[str, Any] | None = None
class DeviceRegistrationResponse(BaseModel): class DeviceRegistrationResponse(BaseModel):
"""Response model after registering a new device.""" device_id: str
protocol: str
certificate_id: str | None = None
ca_certificate_pem: str | None = None
certificate_pem: str | None = None
private_key_pem: str | None = None
expires_at: datetime.datetime | None = None
credential_id: str | None = None
api_key: str | None = None
webhook_secret: str | None = None
class DeviceResponse(BaseModel):
id: str
name: str
location: str | None = None
protocol: str
connection_config: dict[str, Any] | None = None
created_at: datetime.datetime
class DeviceCertificateResponse(BaseModel):
certificate_id: str certificate_id: str
device_id: str device_id: str
ca_certificate_pem: str ca_certificate_pem: str
@@ -19,10 +40,10 @@ class DeviceRegistrationResponse(BaseModel):
private_key_pem: str private_key_pem: str
expires_at: datetime.datetime expires_at: datetime.datetime
class DeviceResponse(BaseModel):
"""Response model for device information."""
id: str class DeviceCredentials(BaseModel):
name: str certificate_id: str
location: str | None = None device_id: str
created_at: datetime.datetime certificate_pem: bytes
private_key_pem: bytes
expires_at: datetime.datetime