mirror of
https://github.com/ferdzo/iotDashboard.git
synced 2026-04-05 09:06:26 +00:00
MQTT Ingestion Service
Subscribes to MQTT topics and writes telemetry data to Redis streams for downstream processing.
Purpose
This service acts as the bridge between MQTT devices and the data pipeline. It:
- Connects to Mosquitto MQTT broker
- Subscribes to device topics using wildcard pattern
- Parses incoming messages
- Writes structured data to Redis stream
Architecture
MQTT Broker (port 8883)
|
v
+-------------------+
| mqtt_ingestion |
| - MQTT subscriber |
| - Topic parser |
| - Redis writer |
+-------------------+
|
v
Redis Stream: mqtt:ingestion
Topic Format
Devices publish to: devices/{device_id}/{metric}
Examples:
devices/a1b2c3d4/temperature- Temperature readingdevices/a1b2c3d4/humidity- Humidity readingdevices/a1b2c3d4/heart_rate- Health metric
The service subscribes to devices/# to receive all device messages.
Redis Stream Format
Each message written to mqtt:ingestion contains:
| Field | Type | Description |
|---|---|---|
| device_id | string | 8-character device identifier |
| metric | string | Metric name (temperature, humidity, etc.) |
| value | string | Metric value (stored as string) |
| timestamp | string | ISO 8601 timestamp |
Example:
{
"device_id": "a1b2c3d4",
"metric": "temperature",
"value": "23.5",
"timestamp": "2025-01-15T10:30:00.000Z"
}
Configuration
Environment variables (.env file):
| Variable | Description | Default |
|---|---|---|
| MQTT_HOST | MQTT broker hostname | localhost |
| MQTT_PORT | MQTT broker port | 8883 |
| REDIS_HOST | Redis hostname | localhost |
| REDIS_PORT | Redis port | 6379 |
Running
cd services/mqtt_ingestion
uv sync
uv run main.py
Key Files
| File | Purpose |
|---|---|
main.py |
Entry point, service initialization |
src/mqtt_client.py |
MQTT connection and subscription logic |
src/redis_writer.py |
Redis stream writing |
Error Handling
- Invalid topics (not matching
devices/{id}/{metric}) are logged and dropped - Connection failures trigger automatic reconnection
- Redis write failures are logged (messages may be lost)
Integration Points
- Upstream: Mosquitto MQTT broker with mTLS
- Downstream: Redis stream consumed by db_write service