Files
iotDashboard/services/mqtt_ingestion

MQTT Ingestion Service

Subscribes to MQTT topics and writes telemetry data to Redis streams for downstream processing.

Purpose

This service acts as the bridge between MQTT devices and the data pipeline. It:

  • Connects to Mosquitto MQTT broker
  • Subscribes to device topics using wildcard pattern
  • Parses incoming messages
  • Writes structured data to Redis stream

Architecture

MQTT Broker (port 8883)
        |
        v
+-------------------+
| mqtt_ingestion    |
| - MQTT subscriber |
| - Topic parser    |
| - Redis writer    |
+-------------------+
        |
        v
Redis Stream: mqtt:ingestion

Topic Format

Devices publish to: devices/{device_id}/{metric}

Examples:

  • devices/a1b2c3d4/temperature - Temperature reading
  • devices/a1b2c3d4/humidity - Humidity reading
  • devices/a1b2c3d4/heart_rate - Health metric

The service subscribes to devices/# to receive all device messages.

Redis Stream Format

Each message written to mqtt:ingestion contains:

Field Type Description
device_id string 8-character device identifier
metric string Metric name (temperature, humidity, etc.)
value string Metric value (stored as string)
timestamp string ISO 8601 timestamp

Example:

{
  "device_id": "a1b2c3d4",
  "metric": "temperature",
  "value": "23.5",
  "timestamp": "2025-01-15T10:30:00.000Z"
}

Configuration

Environment variables (.env file):

Variable Description Default
MQTT_HOST MQTT broker hostname localhost
MQTT_PORT MQTT broker port 8883
REDIS_HOST Redis hostname localhost
REDIS_PORT Redis port 6379

Running

cd services/mqtt_ingestion
uv sync
uv run main.py

Key Files

File Purpose
main.py Entry point, service initialization
src/mqtt_client.py MQTT connection and subscription logic
src/redis_writer.py Redis stream writing

Error Handling

  • Invalid topics (not matching devices/{id}/{metric}) are logged and dropped
  • Connection failures trigger automatic reconnection
  • Redis write failures are logged (messages may be lost)

Integration Points

  • Upstream: Mosquitto MQTT broker with mTLS
  • Downstream: Redis stream consumed by db_write service