Files
iotDashboard/services/mqtt_ingestion/README.md

98 lines
2.3 KiB
Markdown

# MQTT Ingestion Service
Subscribes to MQTT topics and writes telemetry data to Redis streams for downstream processing.
## Purpose
This service acts as the bridge between MQTT devices and the data pipeline. It:
- Connects to Mosquitto MQTT broker
- Subscribes to device topics using wildcard pattern
- Parses incoming messages
- Writes structured data to Redis stream
## Architecture
```
MQTT Broker (port 8883)
|
v
+-------------------+
| mqtt_ingestion |
| - MQTT subscriber |
| - Topic parser |
| - Redis writer |
+-------------------+
|
v
Redis Stream: mqtt:ingestion
```
## Topic Format
Devices publish to: `devices/{device_id}/{metric}`
Examples:
- `devices/a1b2c3d4/temperature` - Temperature reading
- `devices/a1b2c3d4/humidity` - Humidity reading
- `devices/a1b2c3d4/heart_rate` - Health metric
The service subscribes to `devices/#` to receive all device messages.
## Redis Stream Format
Each message written to `mqtt:ingestion` contains:
| Field | Type | Description |
|-------|------|-------------|
| device_id | string | 8-character device identifier |
| metric | string | Metric name (temperature, humidity, etc.) |
| value | string | Metric value (stored as string) |
| timestamp | string | ISO 8601 timestamp |
Example:
```json
{
"device_id": "a1b2c3d4",
"metric": "temperature",
"value": "23.5",
"timestamp": "2025-01-15T10:30:00.000Z"
}
```
## Configuration
Environment variables (`.env` file):
| Variable | Description | Default |
|----------|-------------|---------|
| MQTT_HOST | MQTT broker hostname | localhost |
| MQTT_PORT | MQTT broker port | 8883 |
| REDIS_HOST | Redis hostname | localhost |
| REDIS_PORT | Redis port | 6379 |
## Running
```bash
cd services/mqtt_ingestion
uv sync
uv run main.py
```
## Key Files
| File | Purpose |
|------|---------|
| `main.py` | Entry point, service initialization |
| `src/mqtt_client.py` | MQTT connection and subscription logic |
| `src/redis_writer.py` | Redis stream writing |
## Error Handling
- Invalid topics (not matching `devices/{id}/{metric}`) are logged and dropped
- Connection failures trigger automatic reconnection
- Redis write failures are logged (messages may be lost)
## Integration Points
- **Upstream**: Mosquitto MQTT broker with mTLS
- **Downstream**: Redis stream consumed by db_write service