From 4df582b3308c0e814c9ef18569ee53ccf9d7424b Mon Sep 17 00:00:00 2001 From: ferdzo Date: Thu, 30 Oct 2025 23:00:57 +0100 Subject: [PATCH 1/6] Functioning device manager with renew,revoke, updated model for cert id --- ...2b34e800_add_certificate_id_and_indices.py | 60 ++++ db_migrations/models.py | 10 +- infrastructure/.gitignore | 3 + infrastructure/compose.yml | 2 +- .../mosquitto/{ => config}/mosquitto.conf | 11 +- services/device_manager/app/app.py | 270 ++++++++++++++++++ .../device_manager/{ => app}/cert_manager.py | 116 +++++++- services/device_manager/{ => app}/config.py | 1 + services/device_manager/{ => app}/database.py | 0 .../device_manager/{ => app}/db_models.py | 1 + services/device_manager/{ => app}/models.py | 9 + services/device_manager/main.py | 77 ----- services/device_manager/pyproject.toml | 2 +- 13 files changed, 468 insertions(+), 94 deletions(-) create mode 100644 db_migrations/alembic/versions/20251030_2129_4f152b34e800_add_certificate_id_and_indices.py create mode 100644 infrastructure/.gitignore rename infrastructure/mosquitto/{ => config}/mosquitto.conf (72%) create mode 100644 services/device_manager/app/app.py rename services/device_manager/{ => app}/cert_manager.py (54%) rename services/device_manager/{ => app}/config.py (93%) rename services/device_manager/{ => app}/database.py (100%) rename services/device_manager/{ => app}/db_models.py (97%) rename services/device_manager/{ => app}/models.py (68%) delete mode 100644 services/device_manager/main.py diff --git a/db_migrations/alembic/versions/20251030_2129_4f152b34e800_add_certificate_id_and_indices.py b/db_migrations/alembic/versions/20251030_2129_4f152b34e800_add_certificate_id_and_indices.py new file mode 100644 index 0000000..1024221 --- /dev/null +++ b/db_migrations/alembic/versions/20251030_2129_4f152b34e800_add_certificate_id_and_indices.py @@ -0,0 +1,60 @@ +"""add_certificate_id_and_indices + +Revision ID: 4f152b34e800 +Revises: f94393f57c35 +Create Date: 2025-10-30 21:29:43.843375+00:00 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '4f152b34e800' +down_revision: Union[str, Sequence[str], None] = 'f94393f57c35' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # Step 1: Add id column as nullable first + op.add_column('device_certificates', sa.Column('id', sa.Text(), nullable=True)) + + # Step 2: Generate IDs for existing records (use device_id as temporary ID) + op.execute(""" + UPDATE device_certificates + SET id = device_id || '-' || EXTRACT(EPOCH FROM issued_at)::text + WHERE id IS NULL + """) + + # Step 3: Drop old primary key constraint + op.drop_constraint('device_certificates_pkey', 'device_certificates', type_='primary') + + # Step 4: Make id NOT NULL now that all rows have values + op.alter_column('device_certificates', 'id', nullable=False) + + # Step 5: Create new primary key on id + op.create_primary_key('device_certificates_pkey', 'device_certificates', ['id']) + + # Step 6: Create indices + op.create_index('idx_device_certificates_active', 'device_certificates', ['device_id', 'revoked_at'], unique=False) + op.create_index('idx_device_certificates_device_id', 'device_certificates', ['device_id'], unique=False) + + +def downgrade() -> None: + """Downgrade schema.""" + # Drop indices + op.drop_index('idx_device_certificates_device_id', table_name='device_certificates') + op.drop_index('idx_device_certificates_active', table_name='device_certificates') + + # Drop new primary key + op.drop_constraint('device_certificates_pkey', 'device_certificates', type_='primary') + + # Recreate old primary key on device_id + op.create_primary_key('device_certificates_pkey', 'device_certificates', ['device_id']) + + # Drop id column + op.drop_column('device_certificates', 'id') diff --git a/db_migrations/models.py b/db_migrations/models.py index de6692a..973d85d 100644 --- a/db_migrations/models.py +++ b/db_migrations/models.py @@ -35,8 +35,9 @@ class DeviceCertificate(Base): __tablename__ = "device_certificates" + id = Column(Text, primary_key=True) device_id = Column( - Text, ForeignKey("devices.id", ondelete="CASCADE"), primary_key=True + Text, ForeignKey("devices.id", ondelete="CASCADE"), nullable=False ) certificate_pem = Column(Text, nullable=False) private_key_pem = Column(Text) # Optional: for backup/escrow @@ -44,8 +45,13 @@ class DeviceCertificate(Base): expires_at = Column(DateTime(timezone=True), nullable=False) revoked_at = Column(DateTime(timezone=True)) + __table_args__ = ( + Index("idx_device_certificates_device_id", "device_id"), + Index("idx_device_certificates_active", "device_id", "revoked_at"), + ) + def __repr__(self): - return f"" + return f"" class Telemetry(Base): diff --git a/infrastructure/.gitignore b/infrastructure/.gitignore new file mode 100644 index 0000000..fe44ad6 --- /dev/null +++ b/infrastructure/.gitignore @@ -0,0 +1,3 @@ +mosquitto/certs/ +mosquitto/data/ +mosquitto/logs/ \ No newline at end of file diff --git a/infrastructure/compose.yml b/infrastructure/compose.yml index c076c29..7b9c76a 100644 --- a/infrastructure/compose.yml +++ b/infrastructure/compose.yml @@ -15,7 +15,7 @@ services: - "9001:9001" - "8883:8883" volumes: - - ./mosquitto/:/mosquitto/config/ + - ./mosquitto/:/mosquitto/ restart: unless-stopped timescaledb: diff --git a/infrastructure/mosquitto/mosquitto.conf b/infrastructure/mosquitto/config/mosquitto.conf similarity index 72% rename from infrastructure/mosquitto/mosquitto.conf rename to infrastructure/mosquitto/config/mosquitto.conf index 7b3c566..4e8efcb 100644 --- a/infrastructure/mosquitto/mosquitto.conf +++ b/infrastructure/mosquitto/config/mosquitto.conf @@ -3,7 +3,7 @@ persistence true persistence_location /mosquitto/data/ # Logging -log_dest file /mosquitto/log/mosquitto.log +#log_dest file /mosquitto/log/mosquitto.log # Standard MQTT listener (for testing without certs) listener 1883 @@ -15,11 +15,14 @@ allow_anonymous true protocol mqtt # Server certificates (mosquitto's identity) -certfile /mosquitto/config/server.crt -keyfile /mosquitto/config/server.key +certfile /mosquitto/certs/server.crt +keyfile /mosquitto/certs/server.key # CA certificate to verify client certificates -cafile /mosquitto/config/ca.crt +cafile /mosquitto/certs/ca.crt + +# CRL file +crlfile /mosquitto/certs/ca.crl # Certificate-based authentication require_certificate true diff --git a/services/device_manager/app/app.py b/services/device_manager/app/app.py new file mode 100644 index 0000000..f38a3ae --- /dev/null +++ b/services/device_manager/app/app.py @@ -0,0 +1,270 @@ +import datetime +import logging + +from db_models import Device, DeviceCertificate # SQLAlchemy ORM models +from fastapi import FastAPI, HTTPException + +from cert_manager import CertificateManager +from database import get_db_context +from models import DeviceRegistrationRequest, DeviceRegistrationResponse, DeviceResponse + +logger = logging.getLogger(__name__) + +app = FastAPI() + +cert_manager = CertificateManager() + + +@app.get("/") +async def hello(): + return {"Hello": "World"} + + +@app.post("/devices/register") +async def register_device( + request: DeviceRegistrationRequest, +) -> DeviceRegistrationResponse: + """ + Register a new device and issue an X.509 certificate. + """ + try: + response = cert_manager.register_device( + name=request.name, + location=request.location, + ) + + with get_db_context() as db: + device = Device( + id=response.device_id, + name=request.name, + location=request.location, + created_at=datetime.datetime.now(datetime.UTC), + ) + db.add(device) + + device_cert = DeviceCertificate( + id =response.certificate_id, + device_id=response.device_id, + certificate_pem=response.certificate_pem, + private_key_pem=response.private_key_pem, + issued_at=datetime.datetime.now(datetime.UTC), + expires_at=response.expires_at, + ) + db.add(device_cert) + + except Exception as e: + logger.error( + f"Failed to register device {request.name}: {str(e)}", exc_info=True + ) + raise HTTPException( + status_code=500, detail="Failed to register device. Please try again." + ) from e + + return response + + +@app.get("/ca_certificate") +async def get_ca_certificate() -> str: + """ + Retrieve the CA certificate in PEM format. + """ + try: + ca_cert_pem = cert_manager.get_ca_certificate_pem() + return ca_cert_pem + except Exception as e: + logger.error(f"Failed to retrieve CA certificate: {str(e)}", exc_info=True) + raise HTTPException( + status_code=500, detail="Failed to retrieve CA certificate." + ) from e + +@app.get("/devices/{device_id}") +async def get_device(device_id: str) -> DeviceResponse: + """ + Retrieve device information by ID. + """ + try: + with get_db_context() as db: + device = db.query(Device).filter(Device.id == device_id).first() + if not device: + raise HTTPException(status_code=404, detail="Device not found") + + return Device( + id=device.id, + name=device.name, + location=device.location, + created_at=device.created_at, + ) + + except Exception as e: + logger.error(f"Failed to retrieve device {device_id}: {str(e)}", exc_info=True) + raise HTTPException( + status_code=500, detail="Failed to retrieve device information." + ) from e + +@app.get("/devices/") +async def list_devices() -> list[DeviceResponse]: + """ + List all registered devices. + """ + try: + with get_db_context() as db: + devices = db.query(Device).all() + return [ + DeviceResponse( + id=device.id, + name=device.name, + location=device.location, + created_at=device.created_at, + ) + for device in devices + ] + + except Exception as e: + logger.error(f"Failed to list devices: {str(e)}", exc_info=True) + raise HTTPException( + status_code=500, detail="Failed to list devices." + ) from e + +@app.post("/devices/{device_id}/revoke") +async def revoke_device_certificate(device_id: str): + """ + Revoke a device's certificate by: + 1. Marking it as revoked in the database + 2. Adding it to the Certificate Revocation List (CRL) + """ + try: + with get_db_context() as db: + device_cert = ( + db.query(DeviceCertificate) + .filter(DeviceCertificate.device_id == device_id) + .first() + ) + if not device_cert: + raise HTTPException(status_code=404, detail="Device certificate not found") + + if device_cert.revoked_at: + raise HTTPException(status_code=400, detail="Certificate already revoked") + + cert_manager.revoke_certificate(device_cert.certificate_pem) + + device_cert.revoked_at = datetime.datetime.now(datetime.UTC) + db.commit() + + logger.info(f"Successfully revoked certificate for device {device_id}") + + return { + "device_id": device_id, + "revoked_at": device_cert.revoked_at.isoformat(), + "message": "Certificate revoked successfully" + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to revoke device {device_id}: {str(e)}", exc_info=True) + raise HTTPException( + status_code=500, detail="Failed to revoke device certificate." + ) from e + + +@app.get("/crl") +async def get_crl(): + """ + Get the Certificate Revocation List (CRL) in PEM format. + Mosquitto and other MQTT clients can check this to validate certificates. + """ + try: + crl_pem = cert_manager.get_crl_pem() + if not crl_pem: + return {"message": "No certificates have been revoked yet"} + return {"crl_pem": crl_pem} + except Exception as e: + logger.error(f"Failed to retrieve CRL: {str(e)}", exc_info=True) + raise HTTPException( + status_code=500, detail="Failed to retrieve CRL." + ) from e + +@app.post("/devices/{device_id}/renew") +async def renew_certificate(device_id: str): + """ + Renew a device's certificate by issuing a new one and revoking the old one. + + This endpoint: + 1. Retrieves the current certificate from DB + 2. Generates a new certificate with new keys + 3. Revokes the old certificate (adds to CRL) + 4. Updates the database with the new certificate + 5. Returns the new credentials + """ + try: + with get_db_context() as db: + # Get current certificate + device_cert = ( + db.query(DeviceCertificate) + .filter( + DeviceCertificate.device_id == device_id, + # DeviceCertificate.revoked_at.is_(None) + ) + .first() + ) + if not device_cert: + raise HTTPException( + status_code=404, + detail="No active certificate found for device" + ) + + # Check if certificate is about to expire (optional warning) + days_until_expiry = (device_cert.expires_at - datetime.datetime.now(datetime.UTC)).days + if days_until_expiry > 30: + logger.warning( + f"Certificate for device {device_id} renewed early " + f"({days_until_expiry} days remaining)" + ) + + # Revoke old certificate and add to CRL + cert_manager.revoke_certificate(device_cert.certificate_pem) + device_cert.revoked_at = datetime.datetime.now(datetime.UTC) + + # Generate new certificate with new keys + new_cert_pem, new_key_pem = cert_manager.renew_certificate( + current_cert_pem=device_cert.certificate_pem, + validity_days=365, + key_size=4096 + ) + + # Extract certificate ID (serial number) from the new certificate + from cryptography import x509 + new_cert = x509.load_pem_x509_certificate(new_cert_pem) + new_cert_id = format(new_cert.serial_number, 'x') + + # Create new certificate record in DB + now = datetime.datetime.now(datetime.UTC) + new_device_cert = DeviceCertificate( + id=new_cert_id, + device_id=device_id, + certificate_pem=new_cert_pem.decode("utf-8"), + private_key_pem=new_key_pem.decode("utf-8"), + issued_at=now, + expires_at=now + datetime.timedelta(days=365), + ) + db.add(new_device_cert) + db.commit() + + logger.info(f"Successfully renewed certificate for device {device_id}") + + return DeviceRegistrationResponse( + certificate_id=new_cert_id, + device_id=device_id, + ca_certificate_pem=cert_manager.get_ca_certificate_pem(), + certificate_pem=new_device_cert.certificate_pem, + private_key_pem=new_device_cert.private_key_pem, + expires_at=new_device_cert.expires_at, + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to renew certificate for device {device_id}: {str(e)}", exc_info=True) + raise HTTPException( + status_code=500, detail="Failed to renew device certificate." + ) from e diff --git a/services/device_manager/cert_manager.py b/services/device_manager/app/cert_manager.py similarity index 54% rename from services/device_manager/cert_manager.py rename to services/device_manager/app/cert_manager.py index 0b624bb..0968cc9 100644 --- a/services/device_manager/cert_manager.py +++ b/services/device_manager/app/cert_manager.py @@ -1,4 +1,5 @@ import datetime +from pathlib import Path from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization @@ -55,12 +56,12 @@ class CertificateManager: ) -> tuple[bytes, bytes]: """Generate an X.509 certificate for a device signed by the CA.""" - # Build device certificate subject = x509.Name( [ x509.NameAttribute(NameOID.COMMON_NAME, device_id), ] ) + issuer = ca_cert.subject now = datetime.datetime.now(datetime.UTC) device_cert = ( @@ -78,7 +79,6 @@ class CertificateManager: .sign(private_key=ca_key, algorithm=hashes.SHA256()) ) - # Serialize certificate and key to PEM format cert_pem = device_cert.public_bytes(serialization.Encoding.PEM) key_pem = device_key.private_bytes( encoding=serialization.Encoding.PEM, @@ -93,7 +93,7 @@ class CertificateManager: ) -> dict: """Create device credentials: private key and signed certificate. Returns: - dict with device_id, certificate_pem, private_key_pem, ca_certificate_pem, expires_at + dict with certificate_id, device_id, certificate_pem, private_key_pem, ca_certificate_pem, expires_at """ device_key = self.generate_device_key(key_size=key_size) @@ -106,11 +106,14 @@ class CertificateManager: key_size=key_size, ) - expires_at = datetime.datetime.now(datetime.UTC) + datetime.timedelta( - days=validity_days - ) + # Extract serial number from certificate to use as ID + cert = x509.load_pem_x509_certificate(cert_pem) + cert_id = format(cert.serial_number, 'x') # Hex string of serial number + + expires_at = datetime.datetime.now(datetime.UTC) + datetime.timedelta(days=validity_days) return { + "certificate_id": cert_id, "device_id": device_id, "certificate_pem": cert_pem, "private_key_pem": key_pem, @@ -118,9 +121,7 @@ class CertificateManager: "expires_at": expires_at, } - def register_device( - self, name: str, location: str | None = None - ) -> DeviceRegistrationResponse: + def register_device(self, name: str, location: str | None = None) -> DeviceRegistrationResponse: """Register a new device and generate its credentials. Returns: DeviceRegistrationResponse @@ -129,6 +130,7 @@ class CertificateManager: credentials = self.create_device_credentials(device_id=device_id) return DeviceRegistrationResponse( + certificate_id=credentials["certificate_id"], device_id=credentials["device_id"], ca_certificate_pem=credentials["ca_certificate_pem"].decode("utf-8"), certificate_pem=credentials["certificate_pem"].decode("utf-8"), @@ -140,3 +142,99 @@ class CertificateManager: """Get the CA certificate in PEM format as a string.""" return self.ca_cert_pem.decode("utf-8") + def revoke_certificate( + self, certificate_pem: str, reason: x509.ReasonFlags = x509.ReasonFlags.unspecified + ) -> None: + """ + Revoke a device certificate by adding it to the CRL. + + Args: + certificate_pem: PEM-encoded certificate to revoke + reason: Revocation reason (default: unspecified) + """ + # Load the certificate to get serial number + cert = x509.load_pem_x509_certificate(certificate_pem.encode()) + + # Load existing CRL or create new one + crl_path = Path(config.CRL_PATH) + revoked_certs = [] + + if crl_path.exists(): + with open(crl_path, "rb") as f: + existing_crl = x509.load_pem_x509_crl(f.read()) + # Copy existing revoked certificates + revoked_certs = list(existing_crl) + + # Add the new revoked certificate + revoked_cert = ( + x509.RevokedCertificateBuilder() + .serial_number(cert.serial_number) + .revocation_date(datetime.datetime.now(datetime.UTC)) + .add_extension( + x509.CRLReason(reason), + critical=False, + ) + .build() + ) + revoked_certs.append(revoked_cert) + + # Build new CRL with all revoked certificates + crl_builder = x509.CertificateRevocationListBuilder() + crl_builder = crl_builder.issuer_name(self.ca_cert.subject) + crl_builder = crl_builder.last_update(datetime.datetime.now(datetime.UTC)) + crl_builder = crl_builder.next_update( + datetime.datetime.now(datetime.UTC) + datetime.timedelta(days=30) + ) + + for revoked in revoked_certs: + crl_builder = crl_builder.add_revoked_certificate(revoked) + + # Sign the CRL with CA key + crl = crl_builder.sign(private_key=self.ca_key, algorithm=hashes.SHA256()) + + # Write CRL to file + crl_path.parent.mkdir(parents=True, exist_ok=True) + with open(crl_path, "wb") as f: + f.write(crl.public_bytes(serialization.Encoding.PEM)) + + def get_crl_pem(self) -> str | None: + """Get the current CRL in PEM format.""" + crl_path = Path(config.CRL_PATH) + if not crl_path.exists(): + return None + + with open(crl_path, "rb") as f: + return f.read().decode("utf-8") + + def renew_certificate( + self, + current_cert_pem: str, + validity_days: int = 365, + key_size: int = 4096, + ) -> tuple[bytes, bytes]: + """Renew a device certificate before expiration. + Args: + current_cert_pem: PEM-encoded current certificate + validity_days: Validity period for new certificate + key_size: Key size for new device key + Returns: + tuple of (new_cert_pem, new_key_pem) + """ + # Load current certificate + current_cert = x509.load_pem_x509_certificate(current_cert_pem.encode()) + device_id = current_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + + # Generate new device key + new_device_key = self.generate_device_key(key_size=key_size) + + # Generate new device certificate + new_cert_pem, new_key_pem = self.generate_device_certificate( + device_id=device_id, + ca_cert=self.ca_cert, + ca_key=self.ca_key, + device_key=new_device_key, + validity_days=validity_days, + key_size=key_size, + ) + + return new_cert_pem, new_key_pem diff --git a/services/device_manager/config.py b/services/device_manager/app/config.py similarity index 93% rename from services/device_manager/config.py rename to services/device_manager/app/config.py index e299b5b..0223718 100644 --- a/services/device_manager/config.py +++ b/services/device_manager/app/config.py @@ -15,6 +15,7 @@ class Config: CERTS_DIR = SERVICE_DIR / "certs" CA_CERT_PATH = os.getenv("CA_CERT_PATH", str(CERTS_DIR / "ca.crt")) CA_KEY_PATH = os.getenv("CA_KEY_PATH", str(CERTS_DIR / "ca.key")) + CRL_PATH = os.getenv("CRL_PATH", str(CERTS_DIR / "ca.crl")) # Certificate settings CERT_VALIDITY_DAYS = int(os.getenv("CERT_VALIDITY_DAYS", "365")) diff --git a/services/device_manager/database.py b/services/device_manager/app/database.py similarity index 100% rename from services/device_manager/database.py rename to services/device_manager/app/database.py diff --git a/services/device_manager/db_models.py b/services/device_manager/app/db_models.py similarity index 97% rename from services/device_manager/db_models.py rename to services/device_manager/app/db_models.py index 4bec069..e20e2f4 100644 --- a/services/device_manager/db_models.py +++ b/services/device_manager/app/db_models.py @@ -31,6 +31,7 @@ class DeviceCertificate(Base): __tablename__ = "device_certificates" + id = Column(Text, primary_key=True) device_id = Column( Text, ForeignKey("devices.id", ondelete="CASCADE"), primary_key=True ) diff --git a/services/device_manager/models.py b/services/device_manager/app/models.py similarity index 68% rename from services/device_manager/models.py rename to services/device_manager/app/models.py index ea82836..ec997a5 100644 --- a/services/device_manager/models.py +++ b/services/device_manager/app/models.py @@ -11,8 +11,17 @@ class DeviceRegistrationRequest(BaseModel): class DeviceRegistrationResponse(BaseModel): """Response model after registering a new device.""" + certificate_id: str device_id: str ca_certificate_pem: str certificate_pem: str private_key_pem: str expires_at: datetime.datetime + +class DeviceResponse(BaseModel): + """Response model for device information.""" + + id: str + name: str + location: str | None = None + created_at: datetime.datetime diff --git a/services/device_manager/main.py b/services/device_manager/main.py deleted file mode 100644 index 57b591d..0000000 --- a/services/device_manager/main.py +++ /dev/null @@ -1,77 +0,0 @@ -import datetime -import logging - -from fastapi import FastAPI, HTTPException - -from cert_manager import CertificateManager -from database import get_db_context -from db_models import Device, DeviceCertificate # SQLAlchemy ORM models -from models import DeviceRegistrationRequest, DeviceRegistrationResponse # Pydantic API models - -logger = logging.getLogger(__name__) - -app = FastAPI() - -cert_manager = CertificateManager() - - -@app.get("/") -async def hello(): - return {"Hello": "World"} - - -@app.post("/devices/register") -async def register_device( - request: DeviceRegistrationRequest, -) -> DeviceRegistrationResponse: - """ - Register a new device and issue an X.509 certificate. - """ - try: - response = cert_manager.register_device( - name=request.name, - location=request.location, - ) - - with get_db_context() as db: - device = Device( - id=response.device_id, - name=request.name, - location=request.location, - created_at=datetime.datetime.now(datetime.UTC), - ) - db.add(device) - - device_cert = DeviceCertificate( - device_id=response.device_id, - certificate_pem=response.certificate_pem, - private_key_pem=response.private_key_pem, - issued_at=datetime.datetime.now(datetime.UTC), - expires_at=response.expires_at, - ) - db.add(device_cert) - - except Exception as e: - logger.error( - f"Failed to register device {request.name}: {str(e)}", exc_info=True - ) - raise HTTPException( - status_code=500, detail="Failed to register device. Please try again." - ) from e - - return response - - -@app.get("/ca_certificate") -async def get_ca_certificate() -> str: - """ - Retrieve the CA certificate in PEM format. - """ - try: - ca_cert_pem = cert_manager.get_ca_certificate_pem() - return ca_cert_pem - except Exception as e: - logger.error(f"Failed to retrieve CA certificate: {str(e)}", exc_info=True) - raise HTTPException( - status_code=500, detail="Failed to retrieve CA certificate." - ) from e \ No newline at end of file diff --git a/services/device_manager/pyproject.toml b/services/device_manager/pyproject.toml index 3ce8a5a..cc454e6 100644 --- a/services/device_manager/pyproject.toml +++ b/services/device_manager/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "device-manager" version = "0.1.0" -description = "Add your description here" +description = "Device Manager" readme = "README.md" requires-python = ">=3.13" dependencies = [ From ddbc588c77d1589440b0fd189e6c1a8cba6d8709 Mon Sep 17 00:00:00 2001 From: ferdzo Date: Thu, 30 Oct 2025 23:20:51 +0100 Subject: [PATCH 2/6] Add Dockerfile and .dockerignore; refactor import paths in app modules --- services/device_manager/.dockerignore | 9 +++++++++ services/device_manager/Dockerfile | 11 +++++++++++ services/device_manager/app/app.py | 8 ++++---- services/device_manager/app/cert_manager.py | 4 ++-- services/device_manager/app/database.py | 2 +- services/device_manager/app/models.py | 1 + 6 files changed, 28 insertions(+), 7 deletions(-) create mode 100644 services/device_manager/.dockerignore create mode 100644 services/device_manager/Dockerfile diff --git a/services/device_manager/.dockerignore b/services/device_manager/.dockerignore new file mode 100644 index 0000000..01415b9 --- /dev/null +++ b/services/device_manager/.dockerignore @@ -0,0 +1,9 @@ +.env +.venv/ +__pycache__/ +*.pyc +*.pyo +*.pyd +*.crl +*.crt +*.pem \ No newline at end of file diff --git a/services/device_manager/Dockerfile b/services/device_manager/Dockerfile new file mode 100644 index 0000000..d468070 --- /dev/null +++ b/services/device_manager/Dockerfile @@ -0,0 +1,11 @@ +FROM ghcr.io/astral-sh/uv:python3.13-alpine + +COPY ./pyproject.toml ./ + +COPY ./uv.lock ./ + +RUN uv sync + +COPY ./app/ ./app/ + +ENTRYPOINT [ "uv", "run", "uvicorn", "app.app:app", "--host", "0.0.0.0", "--port", "8000" ] \ No newline at end of file diff --git a/services/device_manager/app/app.py b/services/device_manager/app/app.py index f38a3ae..ca2782b 100644 --- a/services/device_manager/app/app.py +++ b/services/device_manager/app/app.py @@ -1,12 +1,12 @@ import datetime import logging -from db_models import Device, DeviceCertificate # SQLAlchemy ORM models from fastapi import FastAPI, HTTPException -from cert_manager import CertificateManager -from database import get_db_context -from models import DeviceRegistrationRequest, DeviceRegistrationResponse, DeviceResponse +from app.cert_manager import CertificateManager +from app.database import get_db_context +from app.db_models import Device, DeviceCertificate # SQLAlchemy ORM models +from app.models import DeviceRegistrationRequest, DeviceRegistrationResponse, DeviceResponse logger = logging.getLogger(__name__) diff --git a/services/device_manager/app/cert_manager.py b/services/device_manager/app/cert_manager.py index 0968cc9..9ef37d3 100644 --- a/services/device_manager/app/cert_manager.py +++ b/services/device_manager/app/cert_manager.py @@ -7,8 +7,8 @@ from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.x509.oid import NameOID from nanoid import generate -from config import config -from models import DeviceRegistrationResponse +from app.config import config +from app.models import DeviceRegistrationResponse lowercase_numbers = "abcdefghijklmnopqrstuvwxyz0123456789" diff --git a/services/device_manager/app/database.py b/services/device_manager/app/database.py index dbc954b..5b74523 100644 --- a/services/device_manager/app/database.py +++ b/services/device_manager/app/database.py @@ -9,7 +9,7 @@ from contextlib import contextmanager from sqlalchemy import create_engine from sqlalchemy.orm import Session, sessionmaker -from config import config +from app.config import config # Create engine with connection pooling engine = create_engine( diff --git a/services/device_manager/app/models.py b/services/device_manager/app/models.py index ec997a5..01d6f83 100644 --- a/services/device_manager/app/models.py +++ b/services/device_manager/app/models.py @@ -2,6 +2,7 @@ import datetime from pydantic import BaseModel + class DeviceRegistrationRequest(BaseModel): """Request model for registering a new device.""" From 96e23770737dbdfdfcd0d671abcaa73c5c2f143f Mon Sep 17 00:00:00 2001 From: ferdzo Date: Sun, 2 Nov 2025 14:09:29 +0100 Subject: [PATCH 3/6] Added multi-protocol support for devices, improved models and updated readme.md and instructions --- .github/copilot-instructions.md | 136 ++++++++---- README.md | 19 +- ..._add_protocol_and_connection_config_to_.py | 55 +++++ db_migrations/models.py | 34 ++- iotDashboard/device_manager_client.py | 157 ++++++++++++++ iotDashboard/models.py | 195 +++++++++++++++--- iotDashboard/settings.py | 35 ++-- iotDashboard/tasks.py | 180 ---------------- services/db_write/Dockerfile | 32 +++ services/device_manager/app/app.py | 132 +++++++----- services/device_manager/app/cert_manager.py | 52 +++-- services/device_manager/app/db_models.py | 39 +++- services/device_manager/app/models.py | 39 +++- 13 files changed, 730 insertions(+), 375 deletions(-) create mode 100644 db_migrations/alembic/versions/20251101_1907_4e405f1129b1_add_protocol_and_connection_config_to_.py create mode 100644 iotDashboard/device_manager_client.py delete mode 100644 iotDashboard/tasks.py create mode 100644 services/db_write/Dockerfile diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 3fc0b64..b692b6a 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -1,66 +1,114 @@ The following concise instructions help AI coding agents become productive in this repository. Purpose -- This repo is a small Django-based IoT dashboard that ingests sensor data via MQTT, stores metadata in Django models, temporarily queues messages in Redis (streams/hashes), and persistently stores timeseries in Postgres/Timescale via background tasks (Huey). +- This repo is a microservices-based IoT platform for device management, data ingestion, and telemetry storage. The system uses MQTT with mTLS authentication, Redis streams for message queuing, and PostgreSQL/TimescaleDB for persistent storage. Big Picture +- Architecture: Device → MQTT (mTLS) → mqtt_ingestion → Redis → db_write → PostgreSQL/TimescaleDB - Components: - - `iotDashboard/` — Django app (models, views, templates, tasks) - - `manage.py` — Django CLI - - `mqtt_service.py` — standalone MQTT client that subscribes to device topics and writes to Redis - - `tasks.py` — Huey periodic tasks that read Redis and write to Postgres - - Redis — used for device metadata (`mqtt_devices`), per-sensor streams and latest-value hashes - - Postgres/Timescale — final storage for `sensor_readings` table (raw SQL used in places) + - `services/device_manager/` — FastAPI service for device registration, X.509 certificate issuance, and lifecycle management + - `services/mqtt_ingestion/` — MQTT client that subscribes to device topics and writes to single Redis stream `mqtt:ingestion` + - `services/db_write/` — Consumer service that reads from Redis streams and writes to database using consumer groups + - `db_migrations/` — Alembic migrations for schema management (SQLAlchemy models) + - `infrastructure/` — Docker Compose setup (PostgreSQL, Redis, Mosquitto MQTT broker) + - `iotDashboard/` — Legacy Django app (being phased out) Key Files To Read First -- `iotDashboard/settings.py` — central settings; environment variables expected: `SECRET_KEY`, `CONNECTION_STRING`, `MQTT_BROKER`, `MQTT_USER`, `MQTT_PASS`, `REDIS_HOST`. -- `iotDashboard/models.py` — `Device`, `Sensor`, `SensorType`; these shape how devices and sensors are represented. -- `mqtt_service.py` — where MQTT messages are received and written to Redis. Important for stream naming and payload format. -- `iotDashboard/tasks.py` — Huey tasks that consume Redis and insert into the DB. Shows ingestion logic and timescale interactions. -- `iotDashboard/views.py` and `templates/chart.html` — how the UI reads `mqtt_latest`/Timescale data and what format it expects. +- `db_migrations/models.py` — SQLAlchemy models: `Device`, `DeviceCertificate`, `Telemetry`. Canonical schema definition. +- `services/device_manager/app/app.py` — FastAPI endpoints for device registration, certificate management, revocation, renewal. +- `services/device_manager/app/cert_manager.py` — X.509 certificate generation, CA management, CRL generation. +- `services/mqtt_ingestion/src/mqtt_client.py` — MQTT subscriber that parses `devices/{device_id}/{metric}` topics. +- `services/mqtt_ingestion/src/redis_writer.py` — Writes to single stream `mqtt:ingestion` with device_id, metric, value, timestamp. +- `services/db_write/src/redis_reader.py` — Consumer group reader for `mqtt:ingestion` stream. +- `services/db_write/src/db_writer.py` — Batch writes to `telemetry` table using SQLAlchemy. +- `infrastructure/compose.yml` — Docker services: PostgreSQL/TimescaleDB, Redis, Mosquitto MQTT. +- `infrastructure/mosquitto/mosquitto.conf` — MQTT broker config with mTLS on port 8883, CRL checking enabled. Important Conventions & Patterns -- Redis usage: repo stores device metadata under `mqtt_devices` (JSON), and the code uses Redis streams and hashes inconsistently. When changing stream behavior, update both `mqtt_service.py` and `tasks.py` to remain compatible. -- Topic/Stream canonicalization: adopt a single convention: MQTT topic `devices/{device_id}/{sensor}` and Redis stream `mqtt_stream:{device_id}:{sensor}`. Latest-value hash pattern: `mqtt_latest:{device_id}`. -- No `requirements.txt` in repo; use `python-dotenv` + `redis`, `paho-mqtt`, `huey`, `psycopg2-binary`, `requests`, `Django` (4.2) — add a `requirements.txt` before running. -- Avoid import-time side-effects: `tasks.py` currently opens Redis and calls `devices_to_redis()` at import time — refactor to lazy init or a management command. +- **Single stream architecture**: All MQTT data flows through one Redis stream `mqtt:ingestion`. Each message contains `device_id`, `metric`, `value`, `timestamp`. +- **MQTT topics**: Standard format `devices/{device_id}/{metric}`. Examples: `devices/abc123/temperature`, `devices/xyz789/humidity`. +- **Certificate IDs**: Use certificate serial number (hex format) as primary key in `device_certificates` table. Multiple certificates per device supported. +- **Package manager**: All services use `uv` for dependency management (`pyproject.toml` not `requirements.txt`). +- **Database migrations**: Use Alembic for schema changes. Run migrations from `db_migrations/` directory. +- **Configuration**: All services use `.env` files. Never hardcode hosts/credentials. +- **Import organization**: Services have `app/` or `src/` package structure. Import as `from app.module import ...` or `from src.module import ...`. +- **Consumer groups**: `db_write` uses Redis consumer groups for at-least-once delivery. Consumer name must be unique per instance. Developer Workflows (commands & notes) -- Run Django dev server (use virtualenv and install deps): - - `pip install -r requirements.txt` (create this file if missing) - - `python manage.py migrate` - - `python manage.py runserver` -- Run MQTT service locally (requires Redis & MQTT broker): - - `python mqtt_service.py` - - Example publish: `mosquitto_pub -t "devices/esp32/test_temperature" -m "23.5"` -- Huey tasks: - - The project uses `huey.contrib.djhuey`; run workers with Django settings: `python manage.py run_huey` (ensure huey is installed and configured in `HUEY` setting). -- Inspect Redis during debugging: - - `redis-cli KEYS "mqtt*"` - - `redis-cli XREVRANGE mqtt_stream:mydevice:temperature + - COUNT 10` - - `redis-cli HGETALL mqtt_latest:mydevice` +- **Start infrastructure**: `cd infrastructure && docker compose up -d` (Postgres, Redis, Mosquitto) +- **Run database migrations**: `cd db_migrations && uv run alembic upgrade head` +- **Generate CA certificate**: `cd services/device_manager && ./generate_ca.sh` (first time only) +- **Run device_manager**: `cd services/device_manager && uv run uvicorn app.app:app --reload --port 8000` +- **Run mqtt_ingestion**: `cd services/mqtt_ingestion && uv run main.py` +- **Run db_write**: `cd services/db_write && uv run main.py` +- **Register device**: `curl -X POST http://localhost:8000/devices/register -H "Content-Type: application/json" -d '{"name":"test","location":"lab"}'` +- **Test MQTT with mTLS**: `mosquitto_pub --cafile ca.crt --cert device.crt --key device.key -h localhost -p 8883 -t "devices/abc123/temperature" -m "23.5"` +- **Inspect Redis stream**: `redis-cli XLEN mqtt:ingestion` and `redis-cli XRANGE mqtt:ingestion - + COUNT 10` +- **Check consumer group**: `redis-cli XINFO GROUPS mqtt:ingestion` +- **View CRL**: `openssl crl -in infrastructure/mosquitto/certs/ca.crl -text -noout` Integration Points & Gotchas -- Environment variables: many hosts/credentials are taken from `.env` via `python-dotenv`. If missing, code sometimes falls back to defaults or will raise at runtime. Add `.env` or set env vars in the system. -- DB access: `tasks.py` sometimes uses `psycopg2.connect(settings.CONNECTION_STRING)` while views use Django connections. If you change DB config, update both patterns or consolidate to Django connections. -- Topic parsing: `mqtt_service.py` expects at least 3 topic parts (it reads `topic_parts[2]`) — be defensive when editing. -- Stream payloads: `xadd` must receive simple string fields (no nested dicts). When changing stream layout, update the reader in `tasks.py` accordingly. -- Logging: repo uses `print` widely. Prefer converting prints to Python `logging` for maintainability. +- **Environment variables**: All services load from `.env` files. No defaults - service will fail if required vars missing. Copy `.env.example` first. +- **Certificate paths**: `device_manager` writes CRL to `infrastructure/mosquitto/certs/ca.crl`. Mosquitto must restart after CRL updates. +- **Database schema**: Schema changes require Alembic migration. Never modify tables manually. Use `alembic revision --autogenerate`. +- **MQTT topic parsing**: `mqtt_ingestion` expects exactly `devices/{device_id}/{metric}` (3 parts). Invalid topics are logged and dropped. +- **Redis stream format**: `mqtt:ingestion` messages must have `device_id`, `metric`, `value`, `timestamp` fields (all strings). +- **Consumer groups**: `db_write` creates consumer group `db_writer` automatically. Don't delete it manually. +- **Certificate serial numbers**: Used as primary key in `device_certificates.id`. Extract with `format(cert.serial_number, 'x')`. +- **TimescaleDB hypertables**: `telemetry` table is a hypertable. Don't add constraints that break time partitioning. +- **File permissions**: Mosquitto directories may be owned by UID 1883. Fix with `sudo chown -R $USER:$USER infrastructure/mosquitto/`. What AI agents should do first -- Do not change stream/topic names unless you update both `mqtt_service.py` and `tasks.py`. -- Remove import-time Redis initializations and `exit()` calls; move to lazily-created client getters or management commands. -- Centralize config in `settings.py` and import `from django.conf import settings` in scripts instead of hardcoded IPs. -- When making API/DB changes, prefer to update `views.py` and `tasks.py` together and add short integration tests using `pytest` and a Redis test double (or local docker-compose). +- **Read architecture first**: Check `README.md` for current architecture. System is microservices-based, not Django monolith. +- **Check database schema**: Always start with `db_migrations/models.py` to understand data model. +- **Don't change stream names**: Single stream `mqtt:ingestion` is used by mqtt_ingestion and db_write. Changing breaks both services. +- **Use proper imports**: Services use package structure. Import from `app.*` or `src.*`, not relative imports. +- **Create migrations**: Schema changes require `alembic revision --autogenerate`. Never modify models without migration. +- **Test with real infrastructure**: Use `docker compose up` for integration testing. Unit tests are insufficient for this architecture. +- **Check .env files**: Each service has `.env.example`. Copy and configure before running. Examples (copyable snippets) -- XADD to create canonical stream entry: - - `redis_client.xadd(f"mqtt_stream:{device_id}:{sensor}", {"value": str(sensor_value), "time": datetime.utcnow().isoformat()})` -- Create/read consumer group (ingest): - - `redis_client.xgroup_create(stream, "ingest", id="0", mkstream=True)` - - `entries = redis_client.xreadgroup("ingest", consumer_name, {stream: ">"}, count=10, block=5000)` +- **Write to single stream** (mqtt_ingestion): + ```python + redis_client.xadd("mqtt:ingestion", { + "device_id": device_id, + "metric": sensor_type, + "value": str(value), + "timestamp": datetime.utcnow().isoformat() + }) + ``` + +- **Read from stream with consumer group** (db_write): + ```python + results = redis_client.xreadgroup( + groupname="db_writer", + consumername="worker-01", + streams={"mqtt:ingestion": ">"}, + count=100, + block=5000 + ) + ``` + +- **Extract certificate serial number**: + ```python + from cryptography import x509 + cert = x509.load_pem_x509_certificate(cert_pem) + cert_id = format(cert.serial_number, 'x') + ``` + +- **Query active certificates**: + ```python + device_cert = db.query(DeviceCertificate).filter( + DeviceCertificate.device_id == device_id, + DeviceCertificate.revoked_at.is_(None) + ).first() + ``` If you add or change docs -- Update `README.md` with a simple `docker-compose.yml` recipe for Redis/Postgres/Mosquitto and document environment variables. Update `env.sample` with `REDIS_HOST`, `CONNECTION_STRING`, `MQTT_BROKER`, `MQTT_USER`, `MQTT_PASS`. +- Update `README.md` for architecture changes +- Update `.github/copilot-instructions.md` for development workflow changes +- Update service-specific READMEs (`services/*/README.md`) for API or configuration changes +- Document environment variables in `.env.example` files +- Add migration notes to Alembic revision if schema change is complex If anything in these instructions looks off or incomplete for your current refactor, tell me what you'd like to focus on and I'll iterate. diff --git a/README.md b/README.md index 45f74f4..144f7f2 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,17 @@ -# iotDashboard -iotDashboard - IOT dashboard with Django, TimescaleDB and Redis +# IoT Dashboard + +Microservices-based IoT platform with device management, mTLS authentication, and time-series data storage. + +**Architecture:** Device → MQTT (mTLS) → mqtt_ingestion → Redis → db_write → PostgreSQL/TimescaleDB + +## Services + +- **device_manager** - Device registration & X.509 certificates (FastAPI) +- **mqtt_ingestion** - MQTT → Redis pipeline +- **db_write** - Redis → PostgreSQL writer +- **infrastructure** - Docker Compose (PostgreSQL, Redis, Mosquitto) + + +## License + +MIT License diff --git a/db_migrations/alembic/versions/20251101_1907_4e405f1129b1_add_protocol_and_connection_config_to_.py b/db_migrations/alembic/versions/20251101_1907_4e405f1129b1_add_protocol_and_connection_config_to_.py new file mode 100644 index 0000000..ef4c1f9 --- /dev/null +++ b/db_migrations/alembic/versions/20251101_1907_4e405f1129b1_add_protocol_and_connection_config_to_.py @@ -0,0 +1,55 @@ +"""add protocol and connection_config to devices + +Revision ID: 4e405f1129b1 +Revises: 4f152b34e800 +Create Date: 2025-11-01 19:07:22.800918+00:00 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '4e405f1129b1' +down_revision: Union[str, Sequence[str], None] = '4f152b34e800' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('device_credentials', + sa.Column('id', sa.Text(), nullable=False), + sa.Column('device_id', sa.Text(), nullable=False), + sa.Column('credential_type', sa.Text(), nullable=False), + sa.Column('credential_hash', sa.Text(), nullable=False), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('expires_at', sa.DateTime(timezone=True), nullable=True), + sa.Column('revoked_at', sa.DateTime(timezone=True), nullable=True), + sa.ForeignKeyConstraint(['device_id'], ['devices.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id') + ) + op.create_index('idx_device_credentials_active', 'device_credentials', ['device_id', 'revoked_at'], unique=False) + op.create_index('idx_device_credentials_device_id', 'device_credentials', ['device_id'], unique=False) + + # Add protocol column as nullable first, set default for existing rows, then make NOT NULL + op.add_column('devices', sa.Column('protocol', sa.Text(), nullable=True)) + op.execute("UPDATE devices SET protocol = 'mqtt' WHERE protocol IS NULL") + op.alter_column('devices', 'protocol', nullable=False) + + op.add_column('devices', sa.Column('connection_config', sa.JSON(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('devices', 'connection_config') + op.drop_column('devices', 'protocol') + op.drop_index('idx_device_credentials_device_id', table_name='device_credentials') + op.drop_index('idx_device_credentials_active', table_name='device_credentials') + op.drop_table('device_credentials') + # ### end Alembic commands ### diff --git a/db_migrations/models.py b/db_migrations/models.py index 973d85d..edf8923 100644 --- a/db_migrations/models.py +++ b/db_migrations/models.py @@ -8,7 +8,7 @@ To modify schema: 4. Run: alembic upgrade head """ -from sqlalchemy import Boolean, Column, Float, ForeignKey, Index, Text, DateTime +from sqlalchemy import Boolean, Column, Float, ForeignKey, Index, Text, DateTime, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.sql import func @@ -23,13 +23,13 @@ class Device(Base): id = Column(Text, primary_key=True) name = Column(Text, nullable=False) location = Column(Text) + protocol = Column(Text, nullable=False, default="mqtt") + connection_config = Column(JSON) is_active = Column(Boolean, default=True) created_at = Column(DateTime(timezone=True), server_default=func.now()) def __repr__(self): - return f"" - - + return f"" class DeviceCertificate(Base): """X.509 certificates issued to devices for mTLS authentication.""" @@ -40,7 +40,7 @@ class DeviceCertificate(Base): Text, ForeignKey("devices.id", ondelete="CASCADE"), nullable=False ) certificate_pem = Column(Text, nullable=False) - private_key_pem = Column(Text) # Optional: for backup/escrow + private_key_pem = Column(Text) issued_at = Column(DateTime(timezone=True), nullable=False) expires_at = Column(DateTime(timezone=True), nullable=False) revoked_at = Column(DateTime(timezone=True)) @@ -54,6 +54,30 @@ class DeviceCertificate(Base): return f"" +class DeviceCredential(Base): + """Authentication credentials for non-mTLS protocols (HTTP, webhook, etc).""" + + __tablename__ = "device_credentials" + + id = Column(Text, primary_key=True) + device_id = Column( + Text, ForeignKey("devices.id", ondelete="CASCADE"), nullable=False + ) + credential_type = Column(Text, nullable=False) + credential_hash = Column(Text, nullable=False) + created_at = Column(DateTime(timezone=True), nullable=False) + expires_at = Column(DateTime(timezone=True)) + revoked_at = Column(DateTime(timezone=True)) + + __table_args__ = ( + Index("idx_device_credentials_device_id", "device_id"), + Index("idx_device_credentials_active", "device_id", "revoked_at"), + ) + + def __repr__(self): + return f"" + + class Telemetry(Base): """ Time-series telemetry data from devices. diff --git a/iotDashboard/device_manager_client.py b/iotDashboard/device_manager_client.py new file mode 100644 index 0000000..4032af1 --- /dev/null +++ b/iotDashboard/device_manager_client.py @@ -0,0 +1,157 @@ +"""API client for the device_manager microservice.""" + +import os +import requests +from typing import Optional, Dict, Any, List +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class DeviceRegistrationResponse: + device_id: str + certificate_id: str + certificate: str + private_key: str + ca_certificate: str + + +@dataclass +class DeviceInfo: + id: str + name: str + location: Optional[str] + is_active: bool + created_at: datetime + certificates: List[Dict[str, Any]] + + +class DeviceManagerAPIError(Exception): + def __init__(self, status_code: int, message: str, details: Optional[Dict] = None): + self.status_code = status_code + self.message = message + self.details = details or {} + super().__init__(f"API Error {status_code}: {message}") + + +class DeviceManagerClient: + def __init__(self, base_url: Optional[str] = None): + self.base_url = base_url or os.getenv("DEVICE_MANAGER_URL", "http://localhost:8000") + self.session = requests.Session() + self.session.headers.update({"Content-Type": "application/json"}) + + def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response: + url = f"{self.base_url}{endpoint}" + try: + response = self.session.request(method, url, **kwargs) + response.raise_for_status() + return response + except requests.exceptions.HTTPError as e: + try: + error_data = e.response.json() + message = error_data.get("detail", str(e)) + except (ValueError, AttributeError): + message = str(e) + + raise DeviceManagerAPIError( + status_code=e.response.status_code, + message=message, + details=error_data if "error_data" in locals() else {}, + ) + except requests.exceptions.RequestException as e: + raise DeviceManagerAPIError( + status_code=0, message=f"Connection error: {str(e)}" + ) + + def register_device(self, name: str, location: Optional[str] = None) -> DeviceRegistrationResponse: + payload = {"name": name} + if location: + payload["location"] = location + + response = self._request("POST", "/devices/register", json=payload) + data = response.json() + + return DeviceRegistrationResponse( + device_id=data["device_id"], + certificate_id=data["certificate_id"], + certificate=data["certificate"], + private_key=data["private_key"], + ca_certificate=data["ca_certificate"], + ) + + def get_device(self, device_id: str) -> DeviceInfo: + response = self._request("GET", f"/devices/{device_id}") + data = response.json() + + return DeviceInfo( + id=data["id"], + name=data["name"], + location=data.get("location"), + is_active=data["is_active"], + created_at=datetime.fromisoformat(data["created_at"].replace("Z", "+00:00")), + certificates=data.get("certificates", []), + ) + + def list_devices(self) -> List[DeviceInfo]: + response = self._request("GET", "/devices/") + data = response.json() + + return [ + DeviceInfo( + id=device["id"], + name=device["name"], + location=device.get("location"), + is_active=device["is_active"], + created_at=datetime.fromisoformat( + device["created_at"].replace("Z", "+00:00") + ), + certificates=device.get("certificates", []), + ) + for device in data + ] + + def revoke_certificate(self, device_id: str) -> Dict[str, Any]: + response = self._request("POST", f"/devices/{device_id}/revoke") + return response.json() + + def renew_certificate(self, device_id: str) -> Dict[str, Any]: + response = self._request("POST", f"/devices/{device_id}/renew") + return response.json() + + def get_ca_certificate(self) -> str: + response = self._request("GET", "/ca_certificate") + return response.text + + def get_crl(self) -> str: + response = self._request("GET", "/crl") + return response.text + + def health_check(self) -> bool: + try: + response = self.session.get(f"{self.base_url}/docs", timeout=2) + return response.status_code == 200 + except requests.exceptions.RequestException: + return False + + +default_client = DeviceManagerClient() + + +def register_device(name: str, location: Optional[str] = None) -> DeviceRegistrationResponse: + return default_client.register_device(name, location) + + +def get_device(device_id: str) -> DeviceInfo: + return default_client.get_device(device_id) + + +def list_devices() -> List[DeviceInfo]: + return default_client.list_devices() + + +def revoke_certificate(device_id: str) -> Dict[str, Any]: + return default_client.revoke_certificate(device_id) + + +def renew_certificate(device_id: str) -> Dict[str, Any]: + return default_client.renew_certificate(device_id) diff --git a/iotDashboard/models.py b/iotDashboard/models.py index a6cb43d..16dd272 100644 --- a/iotDashboard/models.py +++ b/iotDashboard/models.py @@ -1,42 +1,175 @@ +""" +Django models that mirror the SQLAlchemy schema from db_migrations/models.py. + +These models are read-only (managed=False) and query the microservices database. +For write operations, use the device_manager API client instead. +""" + from django.db import models -class SensorType(models.Model): - name = models.CharField( - max_length=50, unique=True - ) # Sensor name, e.g., "CO2", "Noise", etc. - unit = models.CharField( - max_length=20 - ) # Unit of measurement, e.g., "ppm", "dB", "lux" - protocol = models.CharField( - max_length=20, choices=[("mqtt", "MQTT"), ("http", "HTTP")] - ) # Protocol for communication - topic = models.CharField( - max_length=100, null=True, blank=True - ) # Topic for MQTT communication - endpoint = models.CharField( - max_length=100, null=True, blank=True - ) # Endpoint for HTTP communication - - def __str__(self): - return f"{self.name} ({self.unit})" - - class Device(models.Model): - name = models.CharField(max_length=50) # Device name - ip = models.CharField(max_length=20) # Device IP address - protocol = models.CharField( - max_length=20, choices=[("mqtt", "MQTT"), ("http", "HTTP")] + """IoT devices registered in the system.""" + + id = models.CharField(max_length=8, primary_key=True) + name = models.CharField(max_length=255) + location = models.CharField(max_length=255, null=True, blank=True) + protocol = models.CharField(max_length=50, default="mqtt") + connection_config = models.JSONField(null=True, blank=True) + is_active = models.BooleanField(default=True) + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + managed = False + db_table = "devices" + + def __str__(self): + return f"{self.name} ({self.id}) [{self.protocol}]" + + @property + def active_certificate(self): + """Get the active (non-revoked) certificate for this device.""" + return self.certificates.filter(revoked_at__isnull=True).first() + + @property + def certificate_status(self): + """Get human-readable certificate status for MQTT devices.""" + if self.protocol != "mqtt": + return "N/A" + cert = self.active_certificate + if not cert: + return "No Certificate" + if cert.is_expired: + return "Expired" + if cert.is_expiring_soon: + return "Expiring Soon" + return "Valid" + + +class DeviceCertificate(models.Model): + """X.509 certificates issued to devices for mTLS authentication.""" + + id = models.CharField( + max_length=255, primary_key=True + ) # Certificate serial number (hex) + device = models.ForeignKey( + Device, on_delete=models.CASCADE, related_name="certificates", db_column="device_id" ) + certificate_pem = models.TextField() + private_key_pem = models.TextField(null=True, blank=True) # Optional backup + issued_at = models.DateTimeField() + expires_at = models.DateTimeField() + revoked_at = models.DateTimeField(null=True, blank=True) + + class Meta: + managed = False # Don't create/modify this table + db_table = "device_certificates" + indexes = [ + models.Index(fields=["device"]), + models.Index(fields=["device", "revoked_at"]), + ] def __str__(self): - return self.name + status = "Revoked" if self.revoked_at else "Active" + return f"Certificate {self.id[:8]}... for {self.device.name} ({status})" + + @property + def is_revoked(self): + """Check if certificate is revoked.""" + return self.revoked_at is not None + + @property + def is_expired(self): + """Check if certificate is expired.""" + from django.utils import timezone + + return timezone.now() > self.expires_at + + @property + def is_expiring_soon(self): + """Check if certificate expires within 30 days.""" + from django.utils import timezone + from datetime import timedelta + + return ( + not self.is_expired + and self.expires_at < timezone.now() + timedelta(days=30) + ) + + @property + def is_valid(self): + """Check if certificate is valid (not revoked and not expired).""" + return not self.is_revoked and not self.is_expired + + @property + def days_until_expiry(self): + """Calculate days until certificate expires.""" + from django.utils import timezone + + if self.is_expired: + return 0 + delta = self.expires_at - timezone.now() + return delta.days -class Sensor(models.Model): - device = models.ForeignKey(Device, related_name="sensors", on_delete=models.CASCADE) - type = models.ForeignKey(SensorType, on_delete=models.CASCADE) - enabled = models.BooleanField(default=True) +class DeviceCredential(models.Model): + """Authentication credentials for non-mTLS protocols (HTTP, webhook, etc).""" + + id = models.CharField(max_length=255, primary_key=True) + device = models.ForeignKey( + Device, on_delete=models.CASCADE, related_name="credentials", db_column="device_id" + ) + credential_type = models.CharField(max_length=50) + credential_hash = models.TextField() + created_at = models.DateTimeField() + expires_at = models.DateTimeField(null=True, blank=True) + revoked_at = models.DateTimeField(null=True, blank=True) + + class Meta: + managed = False + db_table = "device_credentials" + indexes = [ + models.Index(fields=["device"]), + models.Index(fields=["device", "revoked_at"]), + ] def __str__(self): - return f"{self.type.name} Sensor on {self.device.name}" + status = "Revoked" if self.revoked_at else "Active" + return f"{self.credential_type} for {self.device.name} ({status})" + + @property + def is_revoked(self): + return self.revoked_at is not None + + @property + def is_expired(self): + from django.utils import timezone + return self.expires_at and timezone.now() > self.expires_at + + @property + def is_valid(self): + return not self.is_revoked and not self.is_expired + + +class Telemetry(models.Model): + """Time-series telemetry data from devices.""" + + time = models.DateTimeField() + device = models.ForeignKey( + Device, on_delete=models.CASCADE, related_name="telemetry", db_column="device_id" + ) + metric = models.CharField(max_length=255) + value = models.FloatField() + unit = models.CharField(max_length=50, null=True, blank=True) + + class Meta: + managed = False + db_table = "telemetry" + unique_together = [["time", "device", "metric"]] + indexes = [ + models.Index(fields=["device", "time"]), + ] + + def __str__(self): + return f"{self.device.name} - {self.metric}: {self.value} at {self.time}" + diff --git a/iotDashboard/settings.py b/iotDashboard/settings.py index 06afa78..c863994 100644 --- a/iotDashboard/settings.py +++ b/iotDashboard/settings.py @@ -13,7 +13,6 @@ https://docs.djangoproject.com/en/4.2/ref/settings/ from dotenv import load_dotenv from pathlib import Path import os -# from huey import SqliteHuey # Build paths inside the project like this: BASE_DIR / 'subdir'. @@ -27,7 +26,12 @@ load_dotenv() # SECURITY WARNING: keep the secret key used in production secret! SECRET_KEY = os.getenv("SECRET_KEY") -CONNECTION_STRING = os.getenv("CONNECTION_STRING") + +POSTGRES_HOST = os.getenv("POSTGRES_HOST") +POSTGRES_PORT = os.getenv("POSTGRES_PORT") +POSTGRES_USER = os.getenv("POSTGRES_USER") +POSTGRES_DB = os.getenv("POSTGRES_DB") +POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD") # SECURITY WARNING: don't run with debug turned on in production! DEBUG = True @@ -45,7 +49,6 @@ INSTALLED_APPS = [ "django.contrib.messages", "django.contrib.staticfiles", "iotDashboard", - # 'huey.contrib.djhuey', ] MIDDLEWARE = [ @@ -84,17 +87,14 @@ WSGI_APPLICATION = "iotDashboard.wsgi.application" DATABASES = { "default": { - "ENGINE": "django.db.backends.sqlite3", - "NAME": BASE_DIR / "db.sqlite3", - }, - "data": { - "ENGINE": "django.db.backends.postgresql", - "NAME": "example", - "USER": "postgres", - "PASSWORD": os.getenv("PASSWORD"), - "HOST": "10.10.0.1", - "PORT": "5555", + "ENGINE": "django.db.backends.postgresql", + "NAME": POSTGRES_DB, + "USER": POSTGRES_USER, + "PASSWORD": POSTGRES_PASSWORD, + "HOST": POSTGRES_HOST, + "PORT": POSTGRES_PORT, }, + } @@ -139,12 +139,3 @@ STATIC_URL = "static/" DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField" - -# HUEY = { -# 'huey_class': 'huey.SqliteHuey', # Or 'huey.RedisHuey' for Redis -# 'filename': 'demo.db', # SQLite file for task storage -# 'results': True, -# 'store_none': False, -# 'immediate': False, -# 'utc': True, -# } diff --git a/iotDashboard/tasks.py b/iotDashboard/tasks.py deleted file mode 100644 index a21b6ad..0000000 --- a/iotDashboard/tasks.py +++ /dev/null @@ -1,180 +0,0 @@ -import json -import datetime -import os -import requests -import psycopg2 -import redis -from django.conf import settings -from huey import crontab -from huey.contrib.djhuey import periodic_task -from .models import Device -from dotenv import load_dotenv - -load_dotenv() - -REDIS_HOST = os.getenv("REDIS_HOST", "localhost") # Default to localhost if not set -try: - redis_client = redis.StrictRedis(host=REDIS_HOST, port=6379, db=0) - print(redis_client) - redis_client.ping() - print("Connected!") -except Exception as ex: - print - "Error:", ex - exit("Failed to connect, terminating.") - - -def devices_to_redis(): - """Fetch devices and their sensors' topics from Django and store them in Redis.""" - devices = Device.objects.all() - devices_list = [] - for device in devices: - for sensor in device.sensors.all(): - sensor_data = { - "device_name": device.name, - "sensor_name": sensor.type.name, - "topic": sensor.type.topic, # Assuming the topic is stored in SensorType - } - devices_list.append(sensor_data) - redis_client.set("mqtt_devices", json.dumps(devices_list)) - print("Devices with sensors stored in Redis.") - - -def fetch_data_http(device, sensor): - """Fetch data from an HTTP sensor.""" - sensor_type_name = sensor.type.name.lower() - try: - response = requests.get( - f"http://{device.ip}/sensor/{sensor_type_name}", timeout=5 - ) - response.raise_for_status() - sensor_value = response.json().get("value") - if sensor_value is not None: - return { - "time": datetime.datetime.utcnow().isoformat(), - "device": device.name, - "sensor": sensor_type_name, - "sensor_value": sensor_value, - } - else: - print(f"No value returned from {device.name} for {sensor_type_name}") - except requests.RequestException as e: - print(f"HTTP request failed for {device.name}: {e}") - return None - - -def fetch_data_mqtt_stream(device, sensor): - """Fetch data from Redis Stream for a specific MQTT device and sensor.""" - sensor_name = sensor.type.name.lower() - stream_key = f"mqtt_stream:{device.name}:{sensor_name}" - try: - stream_data = redis_client.xread({stream_key: "0-0"}, block=1000, count=1) - if stream_data: - _, entries = stream_data[0] - for entry_id, entry_data in entries: - sensor_value = entry_data.get(b"value") - timestamp = entry_data.get(b"time") - - if sensor_value and timestamp: - return { - "time": timestamp.decode("utf-8"), - "device": device.name, - "sensor_value": float(sensor_value.decode("utf-8")), - } - except Exception as e: - print(f"Error fetching data from stream {stream_key}: {e}") - return None - - -def is_recent_data(timestamp): - """Check if data is within a 1-minute freshness window.""" - data_time = datetime.datetime.fromisoformat(timestamp) - return data_time > datetime.datetime.utcnow() - datetime.timedelta(minutes=1) - - -def insert_data(data, sensor_type): - """Insert parsed data into the PostgreSQL database.""" - if "sensor_value" not in data: - print(f"Missing 'sensor_value' in data: {data}. Skipping insertion.") - return - - insert_data_dict = { - "time": data["time"], - "device": data["device"], - "metric": sensor_type.lower(), - "value": data["sensor_value"], - } - - try: - with psycopg2.connect(settings.CONNECTION_STRING) as conn: - with conn.cursor() as cursor: - insert_query = """ - INSERT INTO sensor_readings (time, device_name, metric, value) - VALUES (%s, %s, %s, %s); - """ - cursor.execute( - insert_query, - ( - insert_data_dict["time"], - insert_data_dict["device"], - insert_data_dict["metric"], - insert_data_dict["value"], - ), - ) - conn.commit() - print( - f"Data inserted successfully for {insert_data_dict['device']}: {insert_data_dict}" - ) - except Exception as e: - print(f"Failed to insert data: {e}") - - -@periodic_task(crontab(minute="*/1")) -def fetch_data_from_all_devices(): - """Fetch and insert data for all devices based on their protocol.""" - devices = Device.objects.all() - for device in devices: - for sensor in device.sensors.all(): - data = None - - if device.protocol == "http": - data = fetch_data_http(device, sensor) - elif device.protocol == "mqtt": - data = fetch_data_mqtt_stream(device, sensor) - - if data and is_recent_data(data["time"]): - insert_data(data, sensor.type.name) - else: - print(f"No recent or valid data for {device.name}. Skipping.") - - -@periodic_task(crontab(minute="*/5")) -def last_5_minutes(): - """Fetch the last 5 readings from TimescaleDB and store them in Redis.""" - try: - with psycopg2.connect(settings.CONNECTION_STRING) as conn: - with conn.cursor() as cursor: - cursor.execute(""" - SELECT time, device_name, metric, value - FROM sensor_readings - ORDER BY time DESC - LIMIT 5; - """) - results = cursor.fetchall() - - data = [ - { - "time": reading[0].isoformat(), - "device": reading[1], - "metric": reading[2], - "value": reading[3], - } - for reading in results - ] - redis_client.set("last5", json.dumps(data)) - print("Last 5 readings:", data) - except Exception as e: - print(f"Error fetching or storing the last 5 readings: {e}") - - -devices_to_redis() diff --git a/services/db_write/Dockerfile b/services/db_write/Dockerfile new file mode 100644 index 0000000..9942af5 --- /dev/null +++ b/services/db_write/Dockerfile @@ -0,0 +1,32 @@ +FROM ghcr.io/astral-sh/uv:python3.13-alpine AS builder + +WORKDIR /app + +ENV UV_COMPILE_BYTECODE=1 + +COPY pyproject.toml uv.lock ./ + +RUN uv sync --frozen --no-dev --no-install-project + +COPY ./src/ ./src/ +COPY main.py ./ + +RUN uv sync --frozen --no-dev + + +FROM python:3.13-alpine + +WORKDIR /app + +COPY --from=builder /app/.venv /app/.venv + +COPY --from=builder /app/*.py /app/ + +RUN adduser -D -u 1000 appuser && \ + chown -R appuser:appuser /app + +USER appuser + +ENV PATH="/app/.venv/bin:$PATH" + +CMD ["python", "main.py"] \ No newline at end of file diff --git a/services/device_manager/app/app.py b/services/device_manager/app/app.py index ca2782b..978684e 100644 --- a/services/device_manager/app/app.py +++ b/services/device_manager/app/app.py @@ -5,8 +5,12 @@ from fastapi import FastAPI, HTTPException from app.cert_manager import CertificateManager from app.database import get_db_context -from app.db_models import Device, DeviceCertificate # SQLAlchemy ORM models -from app.models import DeviceRegistrationRequest, DeviceRegistrationResponse, DeviceResponse +from app.db_models import Device, DeviceCertificate +from app.models import ( + DeviceRegistrationRequest, + DeviceRegistrationResponse, + DeviceResponse, +) logger = logging.getLogger(__name__) @@ -25,43 +29,62 @@ async def register_device( request: DeviceRegistrationRequest, ) -> DeviceRegistrationResponse: """ - Register a new device and issue an X.509 certificate. + Register a new device. + - MQTT devices: issues X.509 certificate for mTLS + - HTTP/webhook devices: generates API key or HMAC secret """ try: - response = cert_manager.register_device( - name=request.name, - location=request.location, - ) - - with get_db_context() as db: - device = Device( - id=response.device_id, + if request.protocol == "mqtt": + cert_response = cert_manager.register_device( name=request.name, location=request.location, - created_at=datetime.datetime.now(datetime.UTC), ) - db.add(device) - device_cert = DeviceCertificate( - id =response.certificate_id, - device_id=response.device_id, - certificate_pem=response.certificate_pem, - private_key_pem=response.private_key_pem, - issued_at=datetime.datetime.now(datetime.UTC), - expires_at=response.expires_at, + with get_db_context() as db: + device = Device( + id=cert_response.device_id, + name=request.name, + location=request.location, + protocol=request.protocol, + connection_config=request.connection_config, + created_at=datetime.datetime.now(datetime.UTC), + ) + db.add(device) + + device_cert = DeviceCertificate( + id=cert_response.certificate_id, + device_id=cert_response.device_id, + certificate_pem=cert_response.certificate_pem, + private_key_pem=cert_response.private_key_pem, + issued_at=datetime.datetime.now(datetime.UTC), + expires_at=cert_response.expires_at, + ) + db.add(device_cert) + + return DeviceRegistrationResponse( + device_id=cert_response.device_id, + protocol=request.protocol, + certificate_id=cert_response.certificate_id, + ca_certificate_pem=cert_response.ca_certificate_pem, + certificate_pem=cert_response.certificate_pem, + private_key_pem=cert_response.private_key_pem, + expires_at=cert_response.expires_at, ) - db.add(device_cert) + else: + raise HTTPException( + status_code=400, + detail=f"Protocol '{request.protocol}' not yet implemented. Only 'mqtt' is supported.", + ) + + except HTTPException: + raise except Exception as e: - logger.error( - f"Failed to register device {request.name}: {str(e)}", exc_info=True - ) + logger.error(f"Failed to register device {request.name}: {str(e)}", exc_info=True) raise HTTPException( status_code=500, detail="Failed to register device. Please try again." ) from e - return response - @app.get("/ca_certificate") async def get_ca_certificate() -> str: @@ -73,9 +96,8 @@ async def get_ca_certificate() -> str: return ca_cert_pem except Exception as e: logger.error(f"Failed to retrieve CA certificate: {str(e)}", exc_info=True) - raise HTTPException( - status_code=500, detail="Failed to retrieve CA certificate." - ) from e + raise HTTPException(status_code=500, detail="Failed to retrieve CA certificate.") from e + @app.get("/devices/{device_id}") async def get_device(device_id: str) -> DeviceResponse: @@ -88,18 +110,19 @@ async def get_device(device_id: str) -> DeviceResponse: if not device: raise HTTPException(status_code=404, detail="Device not found") - return Device( + return DeviceResponse( id=device.id, name=device.name, location=device.location, + protocol=device.protocol, + connection_config=device.connection_config, created_at=device.created_at, ) except Exception as e: logger.error(f"Failed to retrieve device {device_id}: {str(e)}", exc_info=True) - raise HTTPException( - status_code=500, detail="Failed to retrieve device information." - ) from e + raise HTTPException(status_code=500, detail="Failed to retrieve device information.") from e + @app.get("/devices/") async def list_devices() -> list[DeviceResponse]: @@ -114,6 +137,8 @@ async def list_devices() -> list[DeviceResponse]: id=device.id, name=device.name, location=device.location, + protocol=device.protocol, + connection_config=device.connection_config, created_at=device.created_at, ) for device in devices @@ -121,9 +146,8 @@ async def list_devices() -> list[DeviceResponse]: except Exception as e: logger.error(f"Failed to list devices: {str(e)}", exc_info=True) - raise HTTPException( - status_code=500, detail="Failed to list devices." - ) from e + raise HTTPException(status_code=500, detail="Failed to list devices.") from e + @app.post("/devices/{device_id}/revoke") async def revoke_device_certificate(device_id: str): @@ -135,9 +159,7 @@ async def revoke_device_certificate(device_id: str): try: with get_db_context() as db: device_cert = ( - db.query(DeviceCertificate) - .filter(DeviceCertificate.device_id == device_id) - .first() + db.query(DeviceCertificate).filter(DeviceCertificate.device_id == device_id).first() ) if not device_cert: raise HTTPException(status_code=404, detail="Device certificate not found") @@ -155,16 +177,14 @@ async def revoke_device_certificate(device_id: str): return { "device_id": device_id, "revoked_at": device_cert.revoked_at.isoformat(), - "message": "Certificate revoked successfully" + "message": "Certificate revoked successfully", } except HTTPException: raise except Exception as e: logger.error(f"Failed to revoke device {device_id}: {str(e)}", exc_info=True) - raise HTTPException( - status_code=500, detail="Failed to revoke device certificate." - ) from e + raise HTTPException(status_code=500, detail="Failed to revoke device certificate.") from e @app.get("/crl") @@ -180,15 +200,14 @@ async def get_crl(): return {"crl_pem": crl_pem} except Exception as e: logger.error(f"Failed to retrieve CRL: {str(e)}", exc_info=True) - raise HTTPException( - status_code=500, detail="Failed to retrieve CRL." - ) from e + raise HTTPException(status_code=500, detail="Failed to retrieve CRL.") from e + @app.post("/devices/{device_id}/renew") async def renew_certificate(device_id: str): """ Renew a device's certificate by issuing a new one and revoking the old one. - + This endpoint: 1. Retrieves the current certificate from DB 2. Generates a new certificate with new keys @@ -209,8 +228,7 @@ async def renew_certificate(device_id: str): ) if not device_cert: raise HTTPException( - status_code=404, - detail="No active certificate found for device" + status_code=404, detail="No active certificate found for device" ) # Check if certificate is about to expire (optional warning) @@ -227,15 +245,14 @@ async def renew_certificate(device_id: str): # Generate new certificate with new keys new_cert_pem, new_key_pem = cert_manager.renew_certificate( - current_cert_pem=device_cert.certificate_pem, - validity_days=365, - key_size=4096 + current_cert_pem=device_cert.certificate_pem, validity_days=365, key_size=4096 ) # Extract certificate ID (serial number) from the new certificate from cryptography import x509 + new_cert = x509.load_pem_x509_certificate(new_cert_pem) - new_cert_id = format(new_cert.serial_number, 'x') + new_cert_id = format(new_cert.serial_number, "x") # Create new certificate record in DB now = datetime.datetime.now(datetime.UTC) @@ -252,9 +269,12 @@ async def renew_certificate(device_id: str): logger.info(f"Successfully renewed certificate for device {device_id}") + device = db.query(Device).filter(Device.id == device_id).first() + return DeviceRegistrationResponse( - certificate_id=new_cert_id, device_id=device_id, + protocol=device.protocol if device else "mqtt", + certificate_id=new_cert_id, ca_certificate_pem=cert_manager.get_ca_certificate_pem(), certificate_pem=new_device_cert.certificate_pem, private_key_pem=new_device_cert.private_key_pem, @@ -265,6 +285,4 @@ async def renew_certificate(device_id: str): raise except Exception as e: logger.error(f"Failed to renew certificate for device {device_id}: {str(e)}", exc_info=True) - raise HTTPException( - status_code=500, detail="Failed to renew device certificate." - ) from e + raise HTTPException(status_code=500, detail="Failed to renew device certificate.") from e diff --git a/services/device_manager/app/cert_manager.py b/services/device_manager/app/cert_manager.py index 9ef37d3..fc6d4bd 100644 --- a/services/device_manager/app/cert_manager.py +++ b/services/device_manager/app/cert_manager.py @@ -8,7 +8,7 @@ from cryptography.x509.oid import NameOID from nanoid import generate from app.config import config -from app.models import DeviceRegistrationResponse +from app.models import DeviceCertificateResponse, DeviceCredentials lowercase_numbers = "abcdefghijklmnopqrstuvwxyz0123456789" @@ -21,10 +21,12 @@ class CertificateManager: self.ca_key: rsa.RSAPrivateKey = self.load_ca_private_key(config.CA_KEY_PATH) self.ca_cert_pem: bytes = self.ca_cert.public_bytes(serialization.Encoding.PEM) + def generate_device_id(self) -> str: """Generate a unique device ID using nanoid.""" return generate(alphabet=lowercase_numbers, size=config.DEVICE_ID_LENGTH) + def load_ca_certificate(self, ca_cert_path: str) -> x509.Certificate: """Load a CA certificate from file.""" with open(ca_cert_path, "rb") as f: @@ -32,6 +34,7 @@ class CertificateManager: ca_cert = x509.load_pem_x509_certificate(ca_data) return ca_cert + def load_ca_private_key(self, ca_key_path: str, password: bytes = None) -> rsa.RSAPrivateKey: """Load a CA private key from file.""" from cryptography.hazmat.primitives import serialization @@ -41,10 +44,12 @@ class CertificateManager: ca_key = serialization.load_pem_private_key(key_data, password=password) return ca_key + def generate_device_key(self, key_size: int = 4096) -> rsa.RSAPrivateKey: """Generate an RSA private key for a device.""" return rsa.generate_private_key(public_exponent=65537, key_size=key_size) + def generate_device_certificate( self, device_id: str, @@ -88,12 +93,13 @@ class CertificateManager: return cert_pem, key_pem + def create_device_credentials( self, device_id: str, validity_days: int = 365, key_size: int = 4096 - ) -> dict: + ) -> DeviceCredentials: """Create device credentials: private key and signed certificate. Returns: - dict with certificate_id, device_id, certificate_pem, private_key_pem, ca_certificate_pem, expires_at + DeviceCredentials model with certificate_id, device_id, certificate_pem, private_key_pem, expires_at """ device_key = self.generate_device_key(key_size=key_size) @@ -108,40 +114,42 @@ class CertificateManager: # Extract serial number from certificate to use as ID cert = x509.load_pem_x509_certificate(cert_pem) - cert_id = format(cert.serial_number, 'x') # Hex string of serial number + cert_id = format(cert.serial_number, 'x') expires_at = datetime.datetime.now(datetime.UTC) + datetime.timedelta(days=validity_days) - return { - "certificate_id": cert_id, - "device_id": device_id, - "certificate_pem": cert_pem, - "private_key_pem": key_pem, - "ca_certificate_pem": self.ca_cert_pem, - "expires_at": expires_at, - } + return DeviceCredentials( + certificate_id=cert_id, + device_id=device_id, + certificate_pem=cert_pem, + private_key_pem=key_pem, + expires_at=expires_at, + ) - def register_device(self, name: str, location: str | None = None) -> DeviceRegistrationResponse: + + def register_device(self, name: str, location: str | None = None) -> DeviceCertificateResponse: """Register a new device and generate its credentials. Returns: - DeviceRegistrationResponse + DeviceCertificateResponse """ device_id = self.generate_device_id() credentials = self.create_device_credentials(device_id=device_id) - return DeviceRegistrationResponse( - certificate_id=credentials["certificate_id"], - device_id=credentials["device_id"], - ca_certificate_pem=credentials["ca_certificate_pem"].decode("utf-8"), - certificate_pem=credentials["certificate_pem"].decode("utf-8"), - private_key_pem=credentials["private_key_pem"].decode("utf-8"), - expires_at=credentials["expires_at"], + return DeviceCertificateResponse( + certificate_id=credentials.certificate_id, + device_id=credentials.device_id, + ca_certificate_pem=self.ca_cert_pem.decode("utf-8"), + certificate_pem=credentials.certificate_pem.decode("utf-8"), + private_key_pem=credentials.private_key_pem.decode("utf-8"), + expires_at=credentials.expires_at, ) + def get_ca_certificate_pem(self) -> str: """Get the CA certificate in PEM format as a string.""" return self.ca_cert_pem.decode("utf-8") + def revoke_certificate( self, certificate_pem: str, reason: x509.ReasonFlags = x509.ReasonFlags.unspecified ) -> None: @@ -197,6 +205,7 @@ class CertificateManager: with open(crl_path, "wb") as f: f.write(crl.public_bytes(serialization.Encoding.PEM)) + def get_crl_pem(self) -> str | None: """Get the current CRL in PEM format.""" crl_path = Path(config.CRL_PATH) @@ -206,6 +215,7 @@ class CertificateManager: with open(crl_path, "rb") as f: return f.read().decode("utf-8") + def renew_certificate( self, current_cert_pem: str, diff --git a/services/device_manager/app/db_models.py b/services/device_manager/app/db_models.py index e20e2f4..12024cb 100644 --- a/services/device_manager/app/db_models.py +++ b/services/device_manager/app/db_models.py @@ -4,7 +4,7 @@ SQLAlchemy ORM models for device manager service. These models mirror the database schema defined in db_migrations. Kept separate to make the service independent. """ -from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Text +from sqlalchemy import JSON, Boolean, Column, DateTime, ForeignKey, Index, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.sql import func @@ -19,11 +19,13 @@ class Device(Base): id = Column(Text, primary_key=True) name = Column(Text, nullable=False) location = Column(Text) + protocol = Column(Text, nullable=False, default="mqtt") + connection_config = Column(JSON) is_active = Column(Boolean, default=True) created_at = Column(DateTime(timezone=True), server_default=func.now()) def __repr__(self): - return f"" + return f"" class DeviceCertificate(Base): @@ -33,7 +35,7 @@ class DeviceCertificate(Base): id = Column(Text, primary_key=True) device_id = Column( - Text, ForeignKey("devices.id", ondelete="CASCADE"), primary_key=True + Text, ForeignKey("devices.id", ondelete="CASCADE"), nullable=False ) certificate_pem = Column(Text, nullable=False) private_key_pem = Column(Text) @@ -41,5 +43,34 @@ class DeviceCertificate(Base): expires_at = Column(DateTime(timezone=True), nullable=False) revoked_at = Column(DateTime(timezone=True)) + __table_args__ = ( + Index("idx_device_certificates_device_id", "device_id"), + Index("idx_device_certificates_active", "device_id", "revoked_at"), + ) + def __repr__(self): - return f"" + return f"" + + +class DeviceCredential(Base): + """Authentication credentials for non-mTLS protocols (HTTP, webhook, etc).""" + + __tablename__ = "device_credentials" + + id = Column(Text, primary_key=True) + device_id = Column( + Text, ForeignKey("devices.id", ondelete="CASCADE"), nullable=False + ) + credential_type = Column(Text, nullable=False) + credential_hash = Column(Text, nullable=False) + created_at = Column(DateTime(timezone=True), nullable=False) + expires_at = Column(DateTime(timezone=True)) + revoked_at = Column(DateTime(timezone=True)) + + __table_args__ = ( + Index("idx_device_credentials_device_id", "device_id"), + Index("idx_device_credentials_active", "device_id", "revoked_at"), + ) + + def __repr__(self): + return f"" diff --git a/services/device_manager/app/models.py b/services/device_manager/app/models.py index 01d6f83..63b5678 100644 --- a/services/device_manager/app/models.py +++ b/services/device_manager/app/models.py @@ -1,17 +1,38 @@ import datetime +from typing import Any from pydantic import BaseModel class DeviceRegistrationRequest(BaseModel): - """Request model for registering a new device.""" - name: str location: str | None = None + protocol: str = "mqtt" + connection_config: dict[str, Any] | None = None + class DeviceRegistrationResponse(BaseModel): - """Response model after registering a new device.""" + device_id: str + protocol: str + certificate_id: str | None = None + ca_certificate_pem: str | None = None + certificate_pem: str | None = None + private_key_pem: str | None = None + expires_at: datetime.datetime | None = None + credential_id: str | None = None + api_key: str | None = None + webhook_secret: str | None = None + +class DeviceResponse(BaseModel): + id: str + name: str + location: str | None = None + protocol: str + connection_config: dict[str, Any] | None = None + created_at: datetime.datetime + +class DeviceCertificateResponse(BaseModel): certificate_id: str device_id: str ca_certificate_pem: str @@ -19,10 +40,10 @@ class DeviceRegistrationResponse(BaseModel): private_key_pem: str expires_at: datetime.datetime -class DeviceResponse(BaseModel): - """Response model for device information.""" - id: str - name: str - location: str | None = None - created_at: datetime.datetime +class DeviceCredentials(BaseModel): + certificate_id: str + device_id: str + certificate_pem: bytes + private_key_pem: bytes + expires_at: datetime.datetime From 212b8d39a9ef6d5eb723a61d8b645f622372a930 Mon Sep 17 00:00:00 2001 From: ferdzo Date: Mon, 3 Nov 2025 13:02:15 +0100 Subject: [PATCH 4/6] Removed unused files, updated templates, added gpt service and small fixes on Django --- iotDashboard/device_manager_client.py | 76 +++- iotDashboard/forms.py | 93 ++-- iotDashboard/main.py | 6 + iotDashboard/pyproject.toml | 5 + ...rm.html => certificate_renew_confirm.html} | 35 +- .../templates/certificate_revoke_confirm.html | 42 ++ iotDashboard/templates/chart.html | 52 +-- .../templates/device_confirm_delete.html | 2 +- .../templates/device_credentials.html | 172 +++++++ iotDashboard/templates/device_detail.html | 101 +++++ iotDashboard/templates/device_form.html | 54 +-- iotDashboard/templates/device_list.html | 54 ++- .../templates/sensor_confirm_delete.html | 48 -- iotDashboard/templates/sensor_list.html | 64 --- iotDashboard/urls.py | 27 +- iotDashboard/views.py | 424 ++++++++++-------- pyproject.toml | 2 + services/device_manager/app/app.py | 16 +- services/gpt_service/.env.sample | 6 + services/gpt_service/README.md | 0 services/gpt_service/config.py | 13 + services/gpt_service/gpt_service.py | 42 ++ services/gpt_service/main.py | 6 + services/gpt_service/pyproject.toml | 7 + uv.lock | 6 + 25 files changed, 861 insertions(+), 492 deletions(-) create mode 100644 iotDashboard/main.py create mode 100644 iotDashboard/pyproject.toml rename iotDashboard/templates/{sensor_form.html => certificate_renew_confirm.html} (51%) create mode 100644 iotDashboard/templates/certificate_revoke_confirm.html create mode 100644 iotDashboard/templates/device_credentials.html create mode 100644 iotDashboard/templates/device_detail.html delete mode 100644 iotDashboard/templates/sensor_confirm_delete.html delete mode 100644 iotDashboard/templates/sensor_list.html create mode 100644 services/gpt_service/.env.sample create mode 100644 services/gpt_service/README.md create mode 100644 services/gpt_service/config.py create mode 100644 services/gpt_service/gpt_service.py create mode 100644 services/gpt_service/main.py create mode 100644 services/gpt_service/pyproject.toml diff --git a/iotDashboard/device_manager_client.py b/iotDashboard/device_manager_client.py index 4032af1..7d19f62 100644 --- a/iotDashboard/device_manager_client.py +++ b/iotDashboard/device_manager_client.py @@ -10,10 +10,15 @@ from datetime import datetime @dataclass class DeviceRegistrationResponse: device_id: str - certificate_id: str - certificate: str - private_key: str - ca_certificate: str + protocol: str + certificate_id: Optional[str] = None + ca_certificate_pem: Optional[str] = None + certificate_pem: Optional[str] = None + private_key_pem: Optional[str] = None + expires_at: Optional[datetime] = None + credential_id: Optional[str] = None + api_key: Optional[str] = None + webhook_secret: Optional[str] = None @dataclass @@ -21,9 +26,9 @@ class DeviceInfo: id: str name: str location: Optional[str] - is_active: bool + protocol: str + connection_config: Optional[Dict[str, Any]] created_at: datetime - certificates: List[Dict[str, Any]] class DeviceManagerAPIError(Exception): @@ -63,20 +68,33 @@ class DeviceManagerClient: status_code=0, message=f"Connection error: {str(e)}" ) - def register_device(self, name: str, location: Optional[str] = None) -> DeviceRegistrationResponse: - payload = {"name": name} + def register_device( + self, + name: str, + location: Optional[str] = None, + protocol: str = "mqtt", + connection_config: Optional[Dict[str, Any]] = None + ) -> DeviceRegistrationResponse: + payload = {"name": name, "protocol": protocol} if location: payload["location"] = location + if connection_config: + payload["connection_config"] = connection_config response = self._request("POST", "/devices/register", json=payload) data = response.json() return DeviceRegistrationResponse( device_id=data["device_id"], - certificate_id=data["certificate_id"], - certificate=data["certificate"], - private_key=data["private_key"], - ca_certificate=data["ca_certificate"], + protocol=data["protocol"], + certificate_id=data.get("certificate_id"), + ca_certificate_pem=data.get("ca_certificate_pem"), + certificate_pem=data.get("certificate_pem"), + private_key_pem=data.get("private_key_pem"), + expires_at=datetime.fromisoformat(data["expires_at"].replace("Z", "+00:00")) if data.get("expires_at") else None, + credential_id=data.get("credential_id"), + api_key=data.get("api_key"), + webhook_secret=data.get("webhook_secret"), ) def get_device(self, device_id: str) -> DeviceInfo: @@ -87,9 +105,9 @@ class DeviceManagerClient: id=data["id"], name=data["name"], location=data.get("location"), - is_active=data["is_active"], + protocol=data["protocol"], + connection_config=data.get("connection_config"), created_at=datetime.fromisoformat(data["created_at"].replace("Z", "+00:00")), - certificates=data.get("certificates", []), ) def list_devices(self) -> List[DeviceInfo]: @@ -101,11 +119,11 @@ class DeviceManagerClient: id=device["id"], name=device["name"], location=device.get("location"), - is_active=device["is_active"], + protocol=device["protocol"], + connection_config=device.get("connection_config"), created_at=datetime.fromisoformat( device["created_at"].replace("Z", "+00:00") ), - certificates=device.get("certificates", []), ) for device in data ] @@ -114,9 +132,22 @@ class DeviceManagerClient: response = self._request("POST", f"/devices/{device_id}/revoke") return response.json() - def renew_certificate(self, device_id: str) -> Dict[str, Any]: + def renew_certificate(self, device_id: str) -> DeviceRegistrationResponse: response = self._request("POST", f"/devices/{device_id}/renew") - return response.json() + data = response.json() + + return DeviceRegistrationResponse( + device_id=data["device_id"], + protocol=data["protocol"], + certificate_id=data.get("certificate_id"), + ca_certificate_pem=data.get("ca_certificate_pem"), + certificate_pem=data.get("certificate_pem"), + private_key_pem=data.get("private_key_pem"), + expires_at=datetime.fromisoformat(data["expires_at"].replace("Z", "+00:00")) if data.get("expires_at") else None, + credential_id=data.get("credential_id"), + api_key=data.get("api_key"), + webhook_secret=data.get("webhook_secret"), + ) def get_ca_certificate(self) -> str: response = self._request("GET", "/ca_certificate") @@ -137,8 +168,13 @@ class DeviceManagerClient: default_client = DeviceManagerClient() -def register_device(name: str, location: Optional[str] = None) -> DeviceRegistrationResponse: - return default_client.register_device(name, location) +def register_device( + name: str, + location: Optional[str] = None, + protocol: str = "mqtt", + connection_config: Optional[Dict[str, Any]] = None +) -> DeviceRegistrationResponse: + return default_client.register_device(name, location, protocol, connection_config) def get_device(device_id: str) -> DeviceInfo: diff --git a/iotDashboard/forms.py b/iotDashboard/forms.py index a8dc4f6..0a9e1ca 100644 --- a/iotDashboard/forms.py +++ b/iotDashboard/forms.py @@ -1,65 +1,40 @@ +""" +Django forms for the IoT Dashboard. + +Note: Device registration is handled through the device_manager API. +These forms are used for the legacy Django UI only. +""" + from django import forms -from iotDashboard.models import Device, Sensor, SensorType +from iotDashboard.models import Device class DeviceForm(forms.ModelForm): + """ + Form for creating/editing devices. + + Note: This is for the Django UI only. Actual device registration + happens through the device_manager microservice API. + """ + + protocol = forms.ChoiceField( + choices=[ + ("mqtt", "MQTT"), + ("http", "HTTP"), + ("webhook", "Webhook"), + ], + initial="mqtt", + help_text="Communication protocol for this device", + ) + class Meta: model = Device - fields = ["name", "ip", "protocol"] # Exclude sensors from the fields - - def __init__(self, *args, **kwargs): - # No need to handle sensors in the form - super(DeviceForm, self).__init__(*args, **kwargs) - - def save(self, commit=True): - # Save the device instance - device = super(DeviceForm, self).save(commit=False) - - if commit: - device.save() - - return device - - -class SensorWithTypeForm(forms.ModelForm): - # Add fields for SensorType directly in the form - type_name = forms.CharField(max_length=50, label="Sensor Type Name") - unit = forms.CharField(max_length=20, label="Unit", required=False) - protocol = forms.ChoiceField( - choices=[("mqtt", "MQTT"), ("http", "HTTP")], label="Protocol" - ) - topic = forms.CharField(max_length=100, label="Topic", required=False) - endpoint = forms.CharField(max_length=100, label="Endpoint", required=False) - - class Meta: - model = Sensor - fields = ["enabled"] # Exclude 'device' from the form fields - - def __init__(self, *args, **kwargs): - self.device = kwargs.pop("device", None) # Get the device from kwargs - super(SensorWithTypeForm, self).__init__(*args, **kwargs) - - def save(self, commit=True): - # Create or get the SensorType - try: - sensor_type = SensorType.objects.get(name=self.cleaned_data["type_name"]) - except SensorType.DoesNotExist: - sensor_type = SensorType( - name=self.cleaned_data["type_name"], - unit=self.cleaned_data["unit"], - protocol=self.cleaned_data["protocol"], - topic=self.cleaned_data["topic"], - endpoint=self.cleaned_data["endpoint"], - ) - if commit: - sensor_type.save() - - # Create Sensor with the SensorType found or created - sensor = super(SensorWithTypeForm, self).save(commit=False) - sensor.type = sensor_type - sensor.device = self.device # Associate the sensor with the device - - if commit: - sensor.save() - - return sensor + fields = ["name", "location", "protocol"] + widgets = { + "name": forms.TextInput(attrs={"class": "form-control", "placeholder": "Device name"}), + "location": forms.TextInput(attrs={"class": "form-control", "placeholder": "Device location (optional)"}), + } + help_texts = { + "name": "Unique identifier for this device", + "location": "Physical location or description", + } diff --git a/iotDashboard/main.py b/iotDashboard/main.py new file mode 100644 index 0000000..979affc --- /dev/null +++ b/iotDashboard/main.py @@ -0,0 +1,6 @@ +def main(): + print("Hello from iotdashboard!") + + +if __name__ == "__main__": + main() diff --git a/iotDashboard/pyproject.toml b/iotDashboard/pyproject.toml new file mode 100644 index 0000000..7ff8b86 --- /dev/null +++ b/iotDashboard/pyproject.toml @@ -0,0 +1,5 @@ +[project] +name = "dashboard" +version = "0.1.0" +requires-python = ">=3.13" +dependencies = [] diff --git a/iotDashboard/templates/sensor_form.html b/iotDashboard/templates/certificate_renew_confirm.html similarity index 51% rename from iotDashboard/templates/sensor_form.html rename to iotDashboard/templates/certificate_renew_confirm.html index ff1c4db..55d7732 100644 --- a/iotDashboard/templates/sensor_form.html +++ b/iotDashboard/templates/certificate_renew_confirm.html @@ -3,40 +3,39 @@ - Add Sensor and Type + Renew Certificate -
-

{% if form.instance.pk %}Edit{% else %}Add{% endif %} Device

+

Add New Device

+ + {% if messages %} + {% for message in messages %} + + {% endfor %} + {% endif %}
{% csrf_token %}
- - {{ form.name.label_tag }} {{ form.name }}
- {{ form.ip.label_tag }} {{ form.ip }}
- {{ form.protocol.label_tag }} {{ form.protocol }}
- - - {% if form.errors %} -
-
    - {% for field, errors in form.errors.items %} -
  • {{ field }}: {{ errors|join:", " }}
  • - {% endfor %} -
-
- {% endif %} + + +
+ +
+ + +
+ +
+ + +
MQTT devices will receive X.509 certificates for secure communication.
- - + Cancel
- - -
- {% if form.instance.pk %} - Add Sensor - Edit Sensors - {% endif %} -
diff --git a/iotDashboard/templates/device_list.html b/iotDashboard/templates/device_list.html index bfb8c57..9ec9d45 100644 --- a/iotDashboard/templates/device_list.html +++ b/iotDashboard/templates/device_list.html @@ -23,7 +23,7 @@ {% if user.is_authenticated %} {% else %} - + {% endif %} @@ -33,41 +33,61 @@

Manage Devices

+ + {% if messages %} + {% for message in messages %} + + {% endfor %} + {% endif %} + Add Device - + - + + - {% for device in devices %} + {% for device_data in devices %} - - - - + + + - + {% empty %} - + {% endfor %} diff --git a/iotDashboard/templates/sensor_confirm_delete.html b/iotDashboard/templates/sensor_confirm_delete.html deleted file mode 100644 index 1867eb6..0000000 --- a/iotDashboard/templates/sensor_confirm_delete.html +++ /dev/null @@ -1,48 +0,0 @@ - - - - - - Confirm Delete Sensor - - - - - - - -
-

Confirm Delete Sensor

-
- Warning! Are you sure you want to delete the sensor "{{ sensor.type.name }}"? This action cannot be undone. -
-
- {% csrf_token %} -
- - Cancel -
- -
- - diff --git a/iotDashboard/templates/sensor_list.html b/iotDashboard/templates/sensor_list.html deleted file mode 100644 index dcf9182..0000000 --- a/iotDashboard/templates/sensor_list.html +++ /dev/null @@ -1,64 +0,0 @@ - - - - - - Manage Sensors for {{ device.name }} - - - - - - - -
-

Sensors for {{ device.name }}

-
NameIP AddressLocation ProtocolSensor Types Certificate StatusCertificate Expiry Actions
{{ device.name }}{{ device.ip }}{{ device.protocol }}{{ device_data.device.name }}{{ device_data.device.location|default:"—" }}{{ device_data.device.protocol|upper }} - {% for sensor in device.sensors.all %} - {{ sensor.type.name }}{% if not forloop.last %}, {% endif %} - {% empty %} - No sensors - {% endfor %} + {% if device_data.device.protocol == 'mqtt' %} + {{ device_data.certificate_status }} + {% else %} + N/A + {% endif %} - Edit - Delete + {% if device_data.active_certificate %} + {{ device_data.active_certificate.expires_at|date:"Y-m-d H:i" }} + {% else %} + — + {% endif %} + + View + Delete + {% if device_data.device.protocol == 'mqtt' %} + Renew Cert + Revoke + {% endif %}
No devices found.No devices found.
- - - - - - - - - {% for sensor in sensors %} - - - - - - {% empty %} - - - - {% endfor %} - -
Sensor TypeEnabledActions
{{ sensor.type.name }}{{ sensor.enabled }} - Edit - Delete -
No sensors found for this device.
- -
- - diff --git a/iotDashboard/urls.py b/iotDashboard/urls.py index c94bee4..98feb5e 100644 --- a/iotDashboard/urls.py +++ b/iotDashboard/urls.py @@ -21,18 +21,25 @@ from iotDashboard import views urlpatterns = [ path("admin/", admin.site.urls), - path("devices_api/", views.devices_api), + + # Main dashboard path("", views.chart, name="index"), - path("fetch_device_data/", views.fetch_device_data, name="fetch_device_data"), + path("chart/", views.chart, name="chart"), + + # Device management path("devices/", views.device_list, name="device_list"), path("devices/add/", views.add_device, name="add_device"), - path("devices/edit//", views.edit_device, name="edit_device"), - path("devices/delete//", views.delete_device, name="delete_device"), + path("devices//", views.view_device, name="view_device"), + path("devices//delete/", views.delete_device, name="delete_device"), + + # Certificate management (MQTT devices only) + path("devices//certificate/revoke/", views.revoke_certificate, name="revoke_certificate"), + path("devices//certificate/renew/", views.renew_certificate, name="renew_certificate"), + + # Telemetry data API + path("fetch_device_data/", views.fetch_device_data, name="fetch_device_data"), + + # Legacy/utility endpoints + path("devices_api/", views.devices_api, name="devices_api"), path("logout/", views.logout_view, name="logout"), - path("sensors//", views.sensor_list, name="sensor_list"), - path("sensor/add/", views.add_sensor_with_type, name="add_sensor_with_type"), - # path('devices//sensors/', views.sensor_list, name='sensor_list'), - path("device//add_sensor/", views.add_sensor, name="add_sensor"), - path("sensor/edit//", views.edit_sensor, name="edit_sensor"), - path("sensor/delete//", views.delete_sensor, name="delete_sensor"), ] diff --git a/iotDashboard/views.py b/iotDashboard/views.py index edb538c..6c38d7c 100644 --- a/iotDashboard/views.py +++ b/iotDashboard/views.py @@ -1,235 +1,263 @@ -import redis import json -from django.db import connections -from django.http import JsonResponse, HttpResponse -from django.shortcuts import render, redirect, get_object_or_404 +from django.http import JsonResponse +from django.shortcuts import render, redirect +from django.contrib import messages -from .forms import DeviceForm, SensorWithTypeForm -from iotDashboard.models import Device, Sensor +from iotDashboard.models import Device, Telemetry +from iotDashboard.device_manager_client import DeviceManagerClient, DeviceManagerAPIError -redis_client = redis.StrictRedis(host="10.10.0.1", port=6379, db=0) +device_manager = DeviceManagerClient() -def fetch_gpt_data(): - return ( - redis_client.get("gpt") - .decode("utf-8") - .strip('b"') - .replace('\\"', '"') - .replace("\\n", "") - .replace("\\", "") - .replace("\\u00b0", "°") - ) +# def index(request): +# """Redirect to chart page.""" +# if request.user.is_authenticated: +# return redirect("/chart/") +# return HttpResponse("NOT AUTHENTICATED!!!") def chart(request): - # Fetch devices and their related sensors - devices = Device.objects.prefetch_related( - "sensors__type" - ).all() # Prefetch related sensors and their types - - # Create a list of devices and associated sensors - devices_json = [ - { - "name": device.name, - "sensors": [ - {"id": sensor.id, "type": sensor.type.name} - for sensor in device.sensors.all() - ], - } - for device in devices - ] - + """Main dashboard showing telemetry charts.""" try: - gpt_data = fetch_gpt_data() - gpt = json.loads(gpt_data) - except (redis.RedisError, json.JSONDecodeError) as e: - gpt = {"summary": "Error fetching data", "recommendations": {}} - print(f"Error fetching or parsing GPT data: {e}") - - context = { - "devices_json": json.dumps(devices_json), # Convert to a JSON string - "gpt": gpt, - } - - return render(request, "chart.html", context) + devices = Device.objects.all() + + devices_data = [] + for device in devices: + # Get unique metrics for this device from telemetry + metrics = ( + Telemetry.objects + .filter(device_id=device.id) + .values_list('metric', flat=True) + .distinct() + ) + + devices_data.append({ + "id": device.id, + "name": device.name, + "protocol": device.protocol, + "metrics": list(metrics), + }) + + context = { + "devices_json": json.dumps(devices_data), + } + + return render(request, "chart.html", context) + + except Exception as e: + messages.error(request, f"Error loading dashboard: {str(e)}") + return render(request, "chart.html", {"devices_json": "[]"}) def fetch_device_data(request): - device_name = request.GET.get("device", "Livingroom") - sensor_name = request.GET.get("sensor") # This will be the actual sensor name + """Fetch telemetry data for chart visualization.""" + from datetime import datetime, timedelta + from django.utils import timezone + + device_id = request.GET.get("device_id") + metric = request.GET.get("metric") start_date = request.GET.get("start_date") end_date = request.GET.get("end_date") - # Log the parameters to ensure they are correct - sensor_name = Sensor.objects.get(id=sensor_name).type.name + if not device_id: + return JsonResponse({"error": "device_id is required"}, status=400) - print("Device Name:", device_name) - print("Sensor Name:", sensor_name) # Log sensor name - print("Start Date:", start_date) - print("End Date:", end_date) - - # Get the specific device by name - device = get_object_or_404(Device, name=device_name) - - # Initialize lists to store times and values - times = [] - values = [] - - # Prepare SQL query and parameters for the device - query = """ - SELECT time, metric, value - FROM sensor_readings - WHERE device_name = %s - """ - params = [device.name] - - # If a specific sensor is specified, filter by that sensor name (converted to lowercase) - if sensor_name: - query += " AND metric = LOWER(%s)" # Convert to lowercase for comparison - params.append(sensor_name.lower()) # Convert sensor name to lowercase - - # Add time filtering to the query - if start_date: - query += " AND time >= %s::timestamptz" - params.append(start_date) - - if end_date: - query += " AND time <= %s::timestamptz" - params.append(end_date) - - # Log the final query and params - print("Final Query:", query) - print("Params Before Execution:", params) - - # Fetch data from the database - with connections["data"].cursor() as cursor: - cursor.execute(query, params) - rows = cursor.fetchall() - - # Log the number of rows returned - print("Number of Rows Returned:", len(rows)) - - # Process the results and extract times and values - for row in rows: - time, metric, value = row - formatted_time = time.strftime("%Y-%m-%d %H:%M:%S") - - times.append(formatted_time) - values.append(value) - - # If no data is found, return empty arrays - if not times and not values: - print("No data found for the specified device and sensor.") - return JsonResponse({"times": [], "values": []}) - - # Return the response in the expected format - return JsonResponse({"times": times, "values": values}) - - -def index(request): - if request.user.is_authenticated: - return redirect("/chart/") - return HttpResponse("NOT AUTHENTICATED!!!") + try: + # Build query using Django ORM + queryset = Telemetry.objects.filter(device_id=device_id) + + # Filter by metric if provided + if metric: + queryset = queryset.filter(metric=metric) + + # Parse and filter by date range (default to last 24 hours) + if start_date: + start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00')) + queryset = queryset.filter(time__gte=start_dt) + else: + # Default: last 24 hours + queryset = queryset.filter(time__gte=timezone.now() - timedelta(hours=24)) + + if end_date: + end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00')) + queryset = queryset.filter(time__lte=end_dt) + + # Order by time and get values + results = queryset.order_by('time').values_list('time', 'value') + + times = [] + values = [] + for time, value in results: + times.append(time.strftime("%Y-%m-%d %H:%M:%S")) + values.append(float(value)) + + return JsonResponse({"times": times, "values": values}) + + except Exception as e: + return JsonResponse({"error": str(e)}, status=500) def device_list(request): - devices = Device.objects.all() - return render(request, "device_list.html", {"devices": devices}) + """List all devices with their certificate status.""" + try: + devices = Device.objects.all() + + # Enrich devices with certificate information + devices_with_certs = [] + for device in devices: + device_data = { + "device": device, + "certificate_status": device.certificate_status if device.protocol == "mqtt" else "N/A", + "active_certificate": device.active_certificate if device.protocol == "mqtt" else None, + } + devices_with_certs.append(device_data) + + return render(request, "device_list.html", {"devices": devices_with_certs}) + + except Exception as e: + messages.error(request, f"Error loading devices: {str(e)}") + return render(request, "device_list.html", {"devices": []}) def add_device(request): + """Register a new device via device_manager API.""" if request.method == "POST": - form = DeviceForm(request.POST) - if form.is_valid(): - form.save() - return redirect("device_list") - else: - form = DeviceForm() - return render(request, "device_form.html", {"form": form}) + name = request.POST.get("name") + location = request.POST.get("location") + protocol = request.POST.get("protocol", "mqtt") + + if not name: + messages.error(request, "Device name is required") + return render(request, "device_form.html") + + try: + response = device_manager.register_device( + name=name, + location=location, + protocol=protocol + ) + + # Show credentials page (one-time view) + return render(request, "device_credentials.html", { + "device_name": name, + "response": response, + }) + + except DeviceManagerAPIError as e: + messages.error(request, f"Failed to register device: {e.message}") + return render(request, "device_form.html", { + "name": name, + "location": location, + "protocol": protocol, + }) + + return render(request, "device_form.html") -def edit_device(request, pk): - device = get_object_or_404(Device, pk=pk) - if request.method == "POST": - form = DeviceForm(request.POST, instance=device) - if form.is_valid(): - form.save() - return redirect("device_list") - else: - form = DeviceForm(instance=device) - return render(request, "device_form.html", {"form": form}) - - -def delete_device(request, pk): - device = get_object_or_404(Device, pk=pk) - if request.method == "POST": - device.delete() +def view_device(request, device_id): + """View device details and certificate information.""" + try: + device = Device.objects.get(id=device_id) + + # Get certificate if MQTT device + certificate = None + if device.protocol == "mqtt": + certificate = device.active_certificate + + context = { + "device": device, + "certificate": certificate, + } + + return render(request, "device_detail.html", context) + + except Device.DoesNotExist: + messages.error(request, f"Device {device_id} not found") + return redirect("device_list") + except Exception as e: + messages.error(request, f"Error loading device: {str(e)}") return redirect("device_list") - return render(request, "device_confirm_delete.html", {"device": device}) -def add_sensor_with_type(request): - if request.method == "POST": - form = SensorWithTypeForm(request.POST) - if form.is_valid(): - form.save() # This will save both Sensor and SensorType as needed - return redirect("device_list") # Adjust this to your specific URL name - else: - form = SensorWithTypeForm() +def delete_device(request, device_id): + """Delete a device.""" + try: + device = Device.objects.get(id=device_id) + + if request.method == "POST": + device_name = device.name + device.delete() + messages.success(request, f"Device '{device_name}' deleted successfully") + return redirect("device_list") + + return render(request, "device_confirm_delete.html", {"device": device}) + + except Device.DoesNotExist: + messages.error(request, f"Device {device_id} not found") + return redirect("device_list") - context = {"form": form} - return render(request, "sensor_form.html", context) + +def revoke_certificate(request, device_id): + """Revoke a device's certificate via device_manager API.""" + try: + device = Device.objects.get(id=device_id) + + if device.protocol != "mqtt": + messages.error(request, "Only MQTT devices have certificates to revoke") + return redirect("device_list") + + if request.method == "POST": + try: + device_manager.revoke_certificate(device_id) + messages.success(request, f"Certificate for device '{device.name}' revoked successfully") + except DeviceManagerAPIError as e: + messages.error(request, f"Failed to revoke certificate: {e.message}") + + return redirect("device_list") + + return render(request, "certificate_revoke_confirm.html", {"device": device}) + + except Device.DoesNotExist: + messages.error(request, f"Device {device_id} not found") + return redirect("device_list") + + +def renew_certificate(request, device_id): + """Renew a device's certificate via device_manager API.""" + try: + device = Device.objects.get(id=device_id) + + if device.protocol != "mqtt": + messages.error(request, "Only MQTT devices have certificates to renew") + return redirect("device_list") + + if request.method == "POST": + try: + response = device_manager.renew_certificate(device_id) + + # Show the new credentials (one-time view) + return render(request, "device_credentials.html", { + "device_name": device.name, + "response": response, + "is_renewal": True, + }) + except DeviceManagerAPIError as e: + messages.error(request, f"Failed to renew certificate: {e.message}") + return redirect("device_list") + + return render(request, "certificate_renew_confirm.html", {"device": device}) + + except Device.DoesNotExist: + messages.error(request, f"Device {device_id} not found") + return redirect("device_list") def logout_view(request): + """Redirect to admin logout.""" return redirect("/admin") def devices_api(request): - devices = list(Device.objects.all().values("name", "sensors__type__name")) + """JSON API endpoint for devices.""" + devices = list(Device.objects.all().values("id", "name", "protocol", "location")) return JsonResponse(devices, safe=False) - - -def sensor_list(request, device_id): - device = get_object_or_404(Device, id=device_id) - sensors = device.sensors.all() # Get sensors for this specific device - return render(request, "sensor_list.html", {"device": device, "sensors": sensors}) - - -def edit_sensor(request, pk): - sensor = get_object_or_404(Sensor, pk=pk) - if request.method == "POST": - form = SensorWithTypeForm(request.POST, instance=sensor) - if form.is_valid(): - form.save() - return redirect("sensor_list", device_id=sensor.device.pk) - else: - form = SensorWithTypeForm(instance=sensor) - return render(request, "sensor_form.html", {"form": form}) - - -def delete_sensor(request, pk): - sensor = get_object_or_404(Sensor, pk=pk) - if request.method == "POST": - device_id = sensor.device.pk - sensor.delete() - return redirect("sensor_list", device_id=device_id) - return render(request, "sensor_confirm_delete.html", {"sensor": sensor}) - - -def add_sensor(request, device_id): - device = get_object_or_404(Device, pk=device_id) - if request.method == "POST": - form = SensorWithTypeForm(request.POST) - if form.is_valid(): - sensor = form.save(commit=False) - sensor.device = device # Associate the sensor with the device - sensor.save() - return redirect( - "device_list" - ) # Redirect to device list or appropriate view - else: - form = SensorWithTypeForm() - - return render(request, "sensor_form.html", {"form": form, "device": device}) diff --git a/pyproject.toml b/pyproject.toml index 07d63af..af295a6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,4 +23,6 @@ dev = [ [tool.uv.workspace] members = [ "db_migrations", + "iotDashboard", + "services/gpt_service", ] diff --git a/services/device_manager/app/app.py b/services/device_manager/app/app.py index 978684e..154a173 100644 --- a/services/device_manager/app/app.py +++ b/services/device_manager/app/app.py @@ -158,14 +158,20 @@ async def revoke_device_certificate(device_id: str): """ try: with get_db_context() as db: + # Get the active (non-revoked) certificate for the device device_cert = ( - db.query(DeviceCertificate).filter(DeviceCertificate.device_id == device_id).first() + db.query(DeviceCertificate) + .filter( + DeviceCertificate.device_id == device_id, + DeviceCertificate.revoked_at.is_(None) + ) + .first() ) if not device_cert: - raise HTTPException(status_code=404, detail="Device certificate not found") - - if device_cert.revoked_at: - raise HTTPException(status_code=400, detail="Certificate already revoked") + raise HTTPException( + status_code=404, + detail="No active certificate found for this device" + ) cert_manager.revoke_certificate(device_cert.certificate_pem) diff --git a/services/gpt_service/.env.sample b/services/gpt_service/.env.sample new file mode 100644 index 0000000..2e1a3db --- /dev/null +++ b/services/gpt_service/.env.sample @@ -0,0 +1,6 @@ +API_KEY = +PROVIDER_NAME = os.getenv("PROVIDER_NAME", "openai") +MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4.1") +# HOST_URL = os.getenv("MODEL_URL") +LOG_LEVEL=INFO + diff --git a/services/gpt_service/README.md b/services/gpt_service/README.md new file mode 100644 index 0000000..e69de29 diff --git a/services/gpt_service/config.py b/services/gpt_service/config.py new file mode 100644 index 0000000..261312c --- /dev/null +++ b/services/gpt_service/config.py @@ -0,0 +1,13 @@ +import os +from dotenv import load_dotenv + +load_dotenv() + +class Config: + """Configuration settings for the GPT Service.""" + + API_KEY = os.getenv("API_KEY") + PROVIDER_NAME = os.getenv("PROVIDER_NAME", "openai") + MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4") + HOST_URL = os.getenv("HOST_URL") + LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") diff --git a/services/gpt_service/gpt_service.py b/services/gpt_service/gpt_service.py new file mode 100644 index 0000000..96a2019 --- /dev/null +++ b/services/gpt_service/gpt_service.py @@ -0,0 +1,42 @@ +import openai + +from config import API_KEY, MODEL_NAME, PROVIDER_NAME, HOST_URL, LOG_LEVEL +import logging + +class GPTService: + def __init__(self): + self.api_key = API_KEY + self.model_name = MODEL_NAME + self.provider_name = PROVIDER_NAME + self.host_url = HOST_URL + + logging.basicConfig(level=getattr(logging, LOG_LEVEL.upper(), logging.INFO)) + self.logger = logging.getLogger(__name__) + + if self.provider_name == "openai": + openai.api_key = self.api_key + if self.host_url: + openai.api_base = self.host_url + self.logger.info(f"Initialized OpenAI GPTService with model {self.model_name}") + else: + self.logger.error(f"Unsupported provider: {self.provider_name}") + raise ValueError(f"Unsupported provider: {self.provider_name}") + + def analyze_metrics(self, metrics: dict) -> str: + """Analyze given metrics using GPT model and return insights.""" + prompt = f"Analyze the following metrics and provide insights:\n{metrics}" + try: + response = openai.Completion.create( + engine=self.model_name, + prompt=prompt, + max_tokens=150, + n=1, + stop=None, + temperature=0.7, + ) + insights = response.choices[0].text.strip() + self.logger.info("Successfully obtained insights from GPT model") + return insights + except Exception as e: + self.logger.error(f"Error during GPT analysis: {e}") + raise \ No newline at end of file diff --git a/services/gpt_service/main.py b/services/gpt_service/main.py new file mode 100644 index 0000000..99a4c28 --- /dev/null +++ b/services/gpt_service/main.py @@ -0,0 +1,6 @@ +def main(): + print("Hello from gpt-service!") + + +if __name__ == "__main__": + main() diff --git a/services/gpt_service/pyproject.toml b/services/gpt_service/pyproject.toml new file mode 100644 index 0000000..5bb52f8 --- /dev/null +++ b/services/gpt_service/pyproject.toml @@ -0,0 +1,7 @@ +[project] +name = "gpt-service" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.13" +dependencies = [] diff --git a/uv.lock b/uv.lock index 769283f..40655b8 100644 --- a/uv.lock +++ b/uv.lock @@ -4,6 +4,7 @@ requires-python = ">=3.13" [manifest] members = [ + "dashboard", "db-migrations", "iotdashboard", ] @@ -112,6 +113,11 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, ] +[[package]] +name = "dashboard" +version = "0.1.0" +source = { virtual = "iotDashboard" } + [[package]] name = "db-migrations" version = "0.1.0" From d2b707ea5e7f20221f95cb363f1bbcf5cc9ffc25 Mon Sep 17 00:00:00 2001 From: ferdzo Date: Mon, 3 Nov 2025 15:57:23 +0100 Subject: [PATCH 5/6] DB Writer fixes and compose.yml update --- infrastructure/compose.yml | 2 +- services/db_write/src/db_writer.py | 7 ++++--- services/db_write/src/models.py | 30 +++++++++++++++++++++++++++++ services/gpt_service/config.py | 13 +++++-------- services/gpt_service/gpt_service.py | 19 +----------------- services/gpt_service/main.py | 6 +++++- 6 files changed, 46 insertions(+), 31 deletions(-) create mode 100644 services/db_write/src/models.py diff --git a/infrastructure/compose.yml b/infrastructure/compose.yml index 7b9c76a..a93f4be 100644 --- a/infrastructure/compose.yml +++ b/infrastructure/compose.yml @@ -15,7 +15,7 @@ services: - "9001:9001" - "8883:8883" volumes: - - ./mosquitto/:/mosquitto/ + - ./mosquitto/:/mosquitto/:Z restart: unless-stopped timescaledb: diff --git a/services/db_write/src/db_writer.py b/services/db_write/src/db_writer.py index 4eddbee..fada906 100644 --- a/services/db_write/src/db_writer.py +++ b/services/db_write/src/db_writer.py @@ -6,6 +6,7 @@ from sqlalchemy.pool import QueuePool from src.config import config from src.schema import TelemetryReading +from src.models import Telemetry class DatabaseWriter: @@ -37,9 +38,9 @@ class DatabaseWriter: session = self.SessionLocal() try: - # Convert to database objects using the correct field mapping + # Convert dataclass readings to SQLAlchemy Telemetry objects db_objects = [ - TelemetryReading( + Telemetry( time=reading.time, device_id=reading.device_id, metric=reading.metric, @@ -57,7 +58,7 @@ class DatabaseWriter: return True except Exception as e: - self.logger.error(f"Failed to write batch: {e}") + self.logger.error(f"Failed to write batch: {e}", exc_info=True) session.rollback() return False finally: diff --git a/services/db_write/src/models.py b/services/db_write/src/models.py new file mode 100644 index 0000000..508d7f7 --- /dev/null +++ b/services/db_write/src/models.py @@ -0,0 +1,30 @@ +""" +SQLAlchemy models for db_write service. + +These models mirror the schema in db_migrations/models.py. +Keep them in sync when schema changes occur. +""" + +from sqlalchemy import Column, Float, Text, DateTime +from sqlalchemy.ext.declarative import declarative_base + +Base = declarative_base() + + +class Telemetry(Base): + """ + Time-series telemetry data from devices. + + This model is used by the db_write service to insert data. + """ + + __tablename__ = "telemetry" + + time = Column(DateTime(timezone=True), primary_key=True, nullable=False) + device_id = Column(Text, primary_key=True, nullable=False) + metric = Column(Text, primary_key=True, nullable=False) + value = Column(Float, nullable=False) + unit = Column(Text) + + def __repr__(self): + return f"" diff --git a/services/gpt_service/config.py b/services/gpt_service/config.py index 261312c..938f066 100644 --- a/services/gpt_service/config.py +++ b/services/gpt_service/config.py @@ -3,11 +3,8 @@ from dotenv import load_dotenv load_dotenv() -class Config: - """Configuration settings for the GPT Service.""" - - API_KEY = os.getenv("API_KEY") - PROVIDER_NAME = os.getenv("PROVIDER_NAME", "openai") - MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4") - HOST_URL = os.getenv("HOST_URL") - LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") +API_KEY = os.getenv("API_KEY") +PROVIDER_NAME = os.getenv("PROVIDER_NAME", "openai") +MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4") +HOST_URL = os.getenv("HOST_URL") +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") \ No newline at end of file diff --git a/services/gpt_service/gpt_service.py b/services/gpt_service/gpt_service.py index 96a2019..ce891c4 100644 --- a/services/gpt_service/gpt_service.py +++ b/services/gpt_service/gpt_service.py @@ -22,21 +22,4 @@ class GPTService: self.logger.error(f"Unsupported provider: {self.provider_name}") raise ValueError(f"Unsupported provider: {self.provider_name}") - def analyze_metrics(self, metrics: dict) -> str: - """Analyze given metrics using GPT model and return insights.""" - prompt = f"Analyze the following metrics and provide insights:\n{metrics}" - try: - response = openai.Completion.create( - engine=self.model_name, - prompt=prompt, - max_tokens=150, - n=1, - stop=None, - temperature=0.7, - ) - insights = response.choices[0].text.strip() - self.logger.info("Successfully obtained insights from GPT model") - return insights - except Exception as e: - self.logger.error(f"Error during GPT analysis: {e}") - raise \ No newline at end of file + \ No newline at end of file diff --git a/services/gpt_service/main.py b/services/gpt_service/main.py index 99a4c28..fe65d76 100644 --- a/services/gpt_service/main.py +++ b/services/gpt_service/main.py @@ -1,6 +1,10 @@ +from gpt_service import GPTService + + def main(): + GPTService() print("Hello from gpt-service!") if __name__ == "__main__": - main() + main() \ No newline at end of file From 153dca9d8f17f6af40cef623e968454b2500590b Mon Sep 17 00:00:00 2001 From: ferdzo Date: Mon, 3 Nov 2025 20:51:46 +0100 Subject: [PATCH 6/6] Small changes --- services/db_write/src/db_writer.py | 2 -- services/device_manager/app/app.py | 1 - services/gpt_service/.env.sample | 8 ++++---- services/gpt_service/gpt_service.py | 6 ++---- services/gpt_service/pyproject.toml | 5 ++++- uv.lock | 16 ++++++++++++++++ 6 files changed, 26 insertions(+), 12 deletions(-) diff --git a/services/db_write/src/db_writer.py b/services/db_write/src/db_writer.py index fada906..5e5852f 100644 --- a/services/db_write/src/db_writer.py +++ b/services/db_write/src/db_writer.py @@ -38,7 +38,6 @@ class DatabaseWriter: session = self.SessionLocal() try: - # Convert dataclass readings to SQLAlchemy Telemetry objects db_objects = [ Telemetry( time=reading.time, @@ -50,7 +49,6 @@ class DatabaseWriter: for reading in readings ] - # Bulk insert session.bulk_save_objects(db_objects) session.commit() diff --git a/services/device_manager/app/app.py b/services/device_manager/app/app.py index 154a173..bfe98f8 100644 --- a/services/device_manager/app/app.py +++ b/services/device_manager/app/app.py @@ -228,7 +228,6 @@ async def renew_certificate(device_id: str): db.query(DeviceCertificate) .filter( DeviceCertificate.device_id == device_id, - # DeviceCertificate.revoked_at.is_(None) ) .first() ) diff --git a/services/gpt_service/.env.sample b/services/gpt_service/.env.sample index 2e1a3db..a6ef261 100644 --- a/services/gpt_service/.env.sample +++ b/services/gpt_service/.env.sample @@ -1,6 +1,6 @@ -API_KEY = -PROVIDER_NAME = os.getenv("PROVIDER_NAME", "openai") -MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4.1") -# HOST_URL = os.getenv("MODEL_URL") +API_KEY=your_api_key_here +PROVIDER_NAME=openai +MODEL_NAME=gpt-4.1 +HOST_URL= http://localhost:8000 LOG_LEVEL=INFO diff --git a/services/gpt_service/gpt_service.py b/services/gpt_service/gpt_service.py index ce891c4..a776402 100644 --- a/services/gpt_service/gpt_service.py +++ b/services/gpt_service/gpt_service.py @@ -1,4 +1,4 @@ -import openai +from openai import OpenAI from config import API_KEY, MODEL_NAME, PROVIDER_NAME, HOST_URL, LOG_LEVEL import logging @@ -14,9 +14,7 @@ class GPTService: self.logger = logging.getLogger(__name__) if self.provider_name == "openai": - openai.api_key = self.api_key - if self.host_url: - openai.api_base = self.host_url + self.client = OpenAI(api_key=self.api_key) self.logger.info(f"Initialized OpenAI GPTService with model {self.model_name}") else: self.logger.error(f"Unsupported provider: {self.provider_name}") diff --git a/services/gpt_service/pyproject.toml b/services/gpt_service/pyproject.toml index 5bb52f8..465b17e 100644 --- a/services/gpt_service/pyproject.toml +++ b/services/gpt_service/pyproject.toml @@ -4,4 +4,7 @@ version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.13" -dependencies = [] +dependencies = [ + "openai>=2.6.1", + "python-dotenv>=1.2.1", +] diff --git a/uv.lock b/uv.lock index 40655b8..70c979c 100644 --- a/uv.lock +++ b/uv.lock @@ -6,6 +6,7 @@ requires-python = ">=3.13" members = [ "dashboard", "db-migrations", + "gpt-service", "iotdashboard", ] @@ -156,6 +157,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8f/ef/81f3372b5dd35d8d354321155d1a38894b2b766f576d0abffac4d8ae78d9/django-5.2.7-py3-none-any.whl", hash = "sha256:59a13a6515f787dec9d97a0438cd2efac78c8aca1c80025244b0fe507fe0754b", size = 8307145, upload-time = "2025-10-01T14:22:49.476Z" }, ] +[[package]] +name = "gpt-service" +version = "0.1.0" +source = { virtual = "services/gpt_service" } +dependencies = [ + { name = "openai" }, + { name = "python-dotenv" }, +] + +[package.metadata] +requires-dist = [ + { name = "openai", specifier = ">=2.6.1" }, + { name = "python-dotenv", specifier = ">=1.2.1" }, +] + [[package]] name = "greenlet" version = "3.2.4"