Update README to remove features and architecture

Removed features and architecture sections from README.
This commit is contained in:
2025-11-10 00:23:23 +01:00
committed by GitHub
parent 7921049f56
commit ed105fccd3

View File

@@ -2,405 +2,6 @@
A robust, production-ready service that reads sensor data from Redis streams and writes it to PostgreSQL/TimescaleDB. Part of the IoT Dashboard project.
## Features
-**Reliable consumption** from Redis streams using consumer groups
-**Batch processing** for high throughput
-**At-least-once delivery** with message acknowledgments
-**Dead letter queue** for failed messages
-**Connection pooling** for database efficiency
-**Graceful shutdown** handling
-**Flexible schema** that adapts to changes
-**Structured logging** with JSON output
-**Health checks** for monitoring
-**TimescaleDB support** for time-series optimization
## Architecture
```
Redis Streams → Consumer Group → Transform → Database → Acknowledge
Failed messages
Dead Letter Queue
```
### Components
- **`main.py`**: Service orchestration and processing loop
- **`redis_reader.py`**: Redis stream consumer with fault tolerance
- **`db_writer.py`**: Database operations with connection pooling
- **`schema.py`**: Data transformation and validation
- **`config.py`**: Configuration management
## Quick Start
### Prerequisites
- Python 3.13+
- [uv](https://github.com/astral-sh/uv) package manager
- Redis server with streams
- PostgreSQL or TimescaleDB
### Installation
1. **Navigate to the service directory**:
```bash
cd services/db_write
```
2. **Copy and configure environment variables**:
```bash
cp .env.example .env
# Edit .env with your DATABASE_URL and other settings
```
3. **Install dependencies**:
```bash
uv sync
```
4. **Setup database schema** (IMPORTANT - do this before running):
```bash
# Review the schema in models.py first
cat models.py
# Create initial migration
chmod +x migrate.sh
./migrate.sh create "initial schema"
# Review the generated migration
ls -lt alembic/versions/
# Apply migrations
./migrate.sh upgrade
```
5. **Run the service**:
```bash
uv run main.py
```
Or use the standalone script:
```bash
chmod +x run-standalone.sh
./run-standalone.sh
```
### ⚠️ Important: Schema Management
This service uses **Alembic** for database migrations. The service will NOT create tables automatically.
- Schema is defined in `models.py`
- Migrations are managed with `./migrate.sh` or `alembic` commands
- See `SCHEMA_MANAGEMENT.md` for detailed guide
## Schema Management
This service uses **SQLAlchemy** for models and **Alembic** for migrations.
### Key Files
- **`models.py`**: Define your database schema here (SQLAlchemy models)
- **`alembic/`**: Migration scripts directory
- **`migrate.sh`**: Helper script for common migration tasks
- **`SCHEMA_MANAGEMENT.md`**: Comprehensive migration guide
### Quick Migration Commands
```bash
# Create a new migration after editing models.py
./migrate.sh create "add new column"
# Apply pending migrations
./migrate.sh upgrade
# Check migration status
./migrate.sh check
# View migration history
./migrate.sh history
# Rollback last migration
./migrate.sh downgrade 1
```
**See `SCHEMA_MANAGEMENT.md` for detailed documentation.**
## Configuration
All configuration is done via environment variables. See `.env.example` for all available options.
### Required Settings
```bash
# Redis connection
REDIS_HOST=localhost
REDIS_PORT=6379
# Database connection
DATABASE_URL=postgresql://user:password@localhost:5432/iot_dashboard
```
### Optional Settings
```bash
# Consumer configuration
CONSUMER_GROUP_NAME=db_writer # Consumer group name
CONSUMER_NAME=worker-01 # Unique consumer name
BATCH_SIZE=100 # Messages per batch
BATCH_TIMEOUT_SEC=5 # Read timeout
PROCESSING_INTERVAL_SEC=1 # Delay between batches
# Stream configuration
STREAM_PATTERN=mqtt_stream:* # Stream name pattern
DEAD_LETTER_STREAM=mqtt_stream:failed
# Database
TABLE_NAME=sensor_readings # Target table name
ENABLE_TIMESCALE=false # Use TimescaleDB features
# Logging
LOG_LEVEL=INFO # DEBUG, INFO, WARNING, ERROR
LOG_FORMAT=json # json or console
```
## Data Flow
### Input (Redis Streams)
The service reads from Redis streams with the format:
```
mqtt_stream:{device_id}:{sensor_type}
```
Each message contains:
```
{
"value": "23.5",
"timestamp": "2023-10-18T14:30:00Z",
"metadata": "{...}" (optional)
}
```
### Output (Database)
Data is written to the `sensor_readings` table:
```sql
CREATE TABLE sensor_readings (
id BIGSERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
device_id VARCHAR(100) NOT NULL,
sensor_type VARCHAR(100) NOT NULL,
value DOUBLE PRECISION NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
```
**Note**: The table is automatically created if it doesn't exist.
## Running with Docker
### Build the image
```bash
docker build -t db-writer:latest .
```
### Run the container
```bash
docker run -d \
--name db-writer \
-e REDIS_HOST=redis \
-e DATABASE_URL=postgresql://user:pass@postgres:5432/iot \
db-writer:latest
```
## Consumer Groups
The service uses Redis consumer groups for reliable, distributed processing:
- **Multiple instances**: Run multiple workers for load balancing
- **Fault tolerance**: Messages are not lost if a consumer crashes
- **Acknowledgments**: Messages are only removed after successful processing
- **Pending messages**: Unacknowledged messages can be reclaimed
### Running Multiple Workers
```bash
# Terminal 1
CONSUMER_NAME=worker-01 uv run main.py
# Terminal 2
CONSUMER_NAME=worker-02 uv run main.py
```
All workers in the same consumer group will share the load.
## Error Handling
### Dead Letter Queue
Failed messages are sent to the dead letter stream (`mqtt_stream:failed`) with error information:
```
{
"original_stream": "mqtt_stream:esp32:temperature",
"original_id": "1634567890123-0",
"device_id": "esp32",
"sensor_type": "temperature",
"value": "23.5",
"error": "Database connection failed",
"failed_at": "1634567890.123"
}
```
### Retry Strategy
- **Transient errors**: Automatic retry with backoff
- **Data errors**: Immediate send to DLQ
- **Connection errors**: Reconnection attempts
## Monitoring
### Health Checks
Check service health programmatically:
```python
from main import DatabaseWriterService
service = DatabaseWriterService()
health = service.health_check()
print(health)
# {
# 'running': True,
# 'redis': True,
# 'database': True,
# 'stats': {...}
# }
```
### Logs
The service outputs structured logs:
```json
{
"event": "Processed batch",
"rows_written": 100,
"messages_acknowledged": 100,
"timestamp": "2023-10-18T14:30:00Z",
"level": "info"
}
```
### Statistics
Runtime statistics are tracked:
- `messages_read`: Total messages consumed
- `messages_written`: Total rows inserted
- `messages_failed`: Failed messages sent to DLQ
- `batches_processed`: Number of successful batches
- `errors`: Total errors encountered
## Development
### Project Structure
```
db_write/
├── config.py # Configuration management
├── db_writer.py # Database operations
├── redis_reader.py # Redis stream consumer
├── schema.py # Data models and transformation
├── main.py # Service entry point
├── pyproject.toml # Dependencies
├── .env.example # Configuration template
└── README.md # This file
```
### Adding Dependencies
```bash
uv add package-name
```
### Running Tests
```bash
uv run pytest
```
## Troubleshooting
### Service won't start
1. **Check configuration**: Verify all required environment variables are set
2. **Test connections**: Ensure Redis and PostgreSQL are accessible
3. **Check logs**: Look for specific error messages
### No messages being processed
1. **Check streams exist**: `redis-cli KEYS "mqtt_stream:*"`
2. **Verify consumer group**: The service creates it automatically, but check Redis logs
3. **Check stream pattern**: Ensure `STREAM_PATTERN` matches your stream names
### Messages going to dead letter queue
1. **Check DLQ**: `redis-cli XRANGE mqtt_stream:failed - + COUNT 10`
2. **Review error messages**: Each DLQ entry contains the error reason
3. **Validate data format**: Ensure messages match expected schema
### High memory usage
1. **Reduce batch size**: Lower `BATCH_SIZE` in configuration
2. **Check connection pool**: May need to adjust pool size
3. **Monitor pending messages**: Use `XPENDING` to check backlog
## Performance Tuning
### Throughput Optimization
- **Increase batch size**: Process more messages per batch
- **Multiple workers**: Run multiple consumer instances
- **Connection pooling**: Adjust pool size based on load
- **Processing interval**: Reduce delay between batches
### Latency Optimization
- **Decrease batch size**: Process smaller batches more frequently
- **Reduce timeout**: Lower `BATCH_TIMEOUT_SEC`
- **Single worker**: Avoid consumer group coordination overhead
## Production Deployment
### Recommended Settings
```bash
BATCH_SIZE=500
PROCESSING_INTERVAL_SEC=0.1
LOG_LEVEL=INFO
LOG_FORMAT=json
ENABLE_TIMESCALE=true
```
### Monitoring
- Monitor consumer lag using Redis `XPENDING`
- Track database insert latency
- Alert on error rate > 5%
- Monitor DLQ depth
### Scaling
1. **Horizontal**: Add more consumer instances with unique `CONSUMER_NAME`
2. **Vertical**: Increase resources for database writes
3. **Database**: Use TimescaleDB for better time-series performance
## License
Part of the IoT Dashboard project.