mirror of
https://github.com/ferdzo/iotDashboard.git
synced 2026-04-05 17:16:26 +00:00
Functioning mqtt ingestion and db write, formating changes, device manager initiated
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
Configuration management for the database writer service.
|
||||
Loads settings from environment variables with sensible defaults.
|
||||
"""
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
@@ -13,6 +14,7 @@ dotenv.load_dotenv()
|
||||
@dataclass
|
||||
class RedisConfig:
|
||||
"""Redis connection configuration"""
|
||||
|
||||
host: str
|
||||
port: int = 6379
|
||||
db: int = 0
|
||||
@@ -22,6 +24,7 @@ class RedisConfig:
|
||||
@dataclass
|
||||
class DatabaseConfig:
|
||||
"""Database connection configuration"""
|
||||
|
||||
url: Optional[str] = None
|
||||
host: Optional[str] = None
|
||||
port: int = 5432
|
||||
@@ -30,21 +33,22 @@ class DatabaseConfig:
|
||||
password: Optional[str] = None
|
||||
table_name: str = "sensor_readings"
|
||||
enable_timescale: bool = False
|
||||
|
||||
|
||||
def get_connection_string(self) -> str:
|
||||
"""Build connection string from components or return URL"""
|
||||
if self.url:
|
||||
return self.url
|
||||
|
||||
|
||||
if not all([self.host, self.name, self.user, self.password]):
|
||||
raise ValueError("Either DATABASE_URL or all DB_* variables must be set")
|
||||
|
||||
|
||||
return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.name}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConsumerConfig:
|
||||
"""Redis consumer group configuration"""
|
||||
|
||||
group_name: str = "db_writer"
|
||||
consumer_name: str = "worker-01"
|
||||
batch_size: int = 100
|
||||
@@ -56,6 +60,7 @@ class ConsumerConfig:
|
||||
@dataclass
|
||||
class StreamConfig:
|
||||
"""Redis stream configuration"""
|
||||
|
||||
pattern: str = "mqtt_stream:*"
|
||||
dead_letter_stream: str = "mqtt_stream:failed"
|
||||
max_retries: int = 3
|
||||
@@ -65,76 +70,70 @@ class StreamConfig:
|
||||
@dataclass
|
||||
class LogConfig:
|
||||
"""Logging configuration"""
|
||||
|
||||
level: str = "INFO"
|
||||
format: str = "json" # json or console
|
||||
|
||||
|
||||
class Config:
|
||||
"""Main configuration class"""
|
||||
|
||||
|
||||
def __init__(self):
|
||||
self.redis = RedisConfig(
|
||||
host=os.getenv('REDIS_HOST', 'localhost'),
|
||||
port=int(os.getenv('REDIS_PORT', 6379)),
|
||||
db=int(os.getenv('REDIS_DB', 0)),
|
||||
password=os.getenv('REDIS_PASSWORD', None) or None
|
||||
host=os.getenv("REDIS_HOST", "localhost"),
|
||||
port=int(os.getenv("REDIS_PORT", 6379)),
|
||||
db=int(os.getenv("REDIS_DB", 0)),
|
||||
password=os.getenv("REDIS_PASSWORD", None) or None,
|
||||
)
|
||||
|
||||
|
||||
self.database = DatabaseConfig(
|
||||
url=os.getenv('DATABASE_URL', None),
|
||||
host=os.getenv('DB_HOST', None),
|
||||
port=int(os.getenv('DB_PORT', 5432)),
|
||||
name=os.getenv('DB_NAME', None),
|
||||
user=os.getenv('DB_USER', None),
|
||||
password=os.getenv('DB_PASSWORD', None),
|
||||
table_name=os.getenv('TABLE_NAME', 'sensor_readings'),
|
||||
enable_timescale=os.getenv('ENABLE_TIMESCALE', 'false').lower() == 'true'
|
||||
url=os.getenv("DATABASE_URL", None),
|
||||
host=os.getenv("DB_HOST", None),
|
||||
port=int(os.getenv("DB_PORT", 5432)),
|
||||
name=os.getenv("DB_NAME", None),
|
||||
user=os.getenv("DB_USER", None),
|
||||
password=os.getenv("DB_PASSWORD", None),
|
||||
table_name=os.getenv("TABLE_NAME", "sensor_readings"),
|
||||
enable_timescale=os.getenv("ENABLE_TIMESCALE", "false").lower() == "true",
|
||||
)
|
||||
|
||||
|
||||
self.consumer = ConsumerConfig(
|
||||
group_name=os.getenv('CONSUMER_GROUP_NAME', 'db_writer'),
|
||||
consumer_name=os.getenv('CONSUMER_NAME', 'worker-01'),
|
||||
batch_size=int(os.getenv('BATCH_SIZE', 100)),
|
||||
batch_timeout_sec=int(os.getenv('BATCH_TIMEOUT_SEC', 5)),
|
||||
processing_interval_sec=float(os.getenv('PROCESSING_INTERVAL_SEC', 1.0)),
|
||||
block_time_ms=int(os.getenv('BLOCK_TIME_MS', 5000))
|
||||
group_name=os.getenv("CONSUMER_GROUP_NAME", "db_writer"),
|
||||
consumer_name=os.getenv("CONSUMER_NAME", "worker-01"),
|
||||
batch_size=int(os.getenv("BATCH_SIZE", 100)),
|
||||
batch_timeout_sec=int(os.getenv("BATCH_TIMEOUT_SEC", 5)),
|
||||
processing_interval_sec=float(os.getenv("PROCESSING_INTERVAL_SEC", 1.0)),
|
||||
block_time_ms=int(os.getenv("BLOCK_TIME_MS", 5000)),
|
||||
)
|
||||
|
||||
|
||||
self.stream = StreamConfig(
|
||||
pattern=os.getenv('STREAM_PATTERN', 'mqtt_stream:*'),
|
||||
dead_letter_stream=os.getenv('DEAD_LETTER_STREAM', 'mqtt_stream:failed'),
|
||||
max_retries=int(os.getenv('MAX_RETRIES', 3)),
|
||||
trim_maxlen=int(os.getenv('TRIM_MAXLEN', 10000))
|
||||
max_retries=int(os.getenv("MAX_RETRIES", 3)),
|
||||
trim_maxlen=int(os.getenv("TRIM_MAXLEN", 10000)),
|
||||
)
|
||||
|
||||
|
||||
self.log = LogConfig(
|
||||
level=os.getenv('LOG_LEVEL', 'INFO'),
|
||||
format=os.getenv('LOG_FORMAT', 'json')
|
||||
level=os.getenv("LOG_LEVEL", "INFO"), format=os.getenv("LOG_FORMAT", "json")
|
||||
)
|
||||
|
||||
|
||||
def validate(self):
|
||||
"""Validate configuration"""
|
||||
errors = []
|
||||
|
||||
# Validate Redis config
|
||||
|
||||
if not self.redis.host:
|
||||
errors.append("REDIS_HOST is required")
|
||||
|
||||
# Validate database config
|
||||
|
||||
try:
|
||||
self.database.get_connection_string()
|
||||
except ValueError as e:
|
||||
errors.append(str(e))
|
||||
|
||||
# Validate consumer config
|
||||
|
||||
if self.consumer.batch_size < 1:
|
||||
errors.append("BATCH_SIZE must be >= 1")
|
||||
|
||||
|
||||
if errors:
|
||||
raise ValueError(f"Configuration errors: {', '.join(errors)}")
|
||||
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# Global config instance
|
||||
config = Config()
|
||||
|
||||
@@ -5,75 +5,65 @@ from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.pool import QueuePool
|
||||
|
||||
from config import config
|
||||
from schema import SensorReading
|
||||
from models import SensorReading as SensorReadingModel
|
||||
from schema import TelemetryReading
|
||||
from models import Telemetry
|
||||
|
||||
|
||||
class DatabaseWriter:
|
||||
"""
|
||||
Database writer using SQLAlchemy.
|
||||
|
||||
Schema is defined in models.py and should be managed using Alembic migrations.
|
||||
This class only handles data insertion, NOT schema creation.
|
||||
|
||||
To manage schema:
|
||||
1. Edit models.py to define your schema
|
||||
2. Generate migration: alembic revision --autogenerate -m "description"
|
||||
3. Apply migration: alembic upgrade head
|
||||
Database writer for telemetry data.
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
# Initialize SQLAlchemy engine with connection pooling
|
||||
|
||||
connection_string = config.database.get_connection_string()
|
||||
|
||||
|
||||
self.engine = create_engine(
|
||||
connection_string,
|
||||
poolclass=QueuePool,
|
||||
pool_size=5,
|
||||
max_overflow=10,
|
||||
pool_pre_ping=True
|
||||
pool_pre_ping=True,
|
||||
)
|
||||
|
||||
# Create session factory
|
||||
|
||||
self.SessionLocal = sessionmaker(bind=self.engine)
|
||||
|
||||
|
||||
self.logger.info("Database writer initialized")
|
||||
|
||||
def write_batch(self, readings: List[SensorReading]) -> bool:
|
||||
"""Write a batch of sensor readings to the database"""
|
||||
|
||||
def write_batch(self, readings: List[TelemetryReading]) -> bool:
|
||||
"""Write a batch of telemetry readings to the database"""
|
||||
if not readings:
|
||||
return True
|
||||
|
||||
|
||||
session = self.SessionLocal()
|
||||
try:
|
||||
# Convert to database objects
|
||||
# Convert to database objects using the correct field mapping
|
||||
db_objects = [
|
||||
SensorReadingModel(
|
||||
timestamp=reading.timestamp,
|
||||
Telemetry(
|
||||
time=reading.time,
|
||||
device_id=reading.device_id,
|
||||
sensor_type=reading.sensor_type,
|
||||
metric=reading.metric,
|
||||
value=reading.value,
|
||||
metadata=reading.metadata
|
||||
unit=reading.unit,
|
||||
)
|
||||
for reading in readings
|
||||
]
|
||||
|
||||
|
||||
# Bulk insert
|
||||
session.bulk_save_objects(db_objects)
|
||||
session.commit()
|
||||
|
||||
|
||||
self.logger.debug(f"Wrote {len(readings)} readings to database")
|
||||
return True
|
||||
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to write batch: {e}")
|
||||
session.rollback()
|
||||
return False
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
def health_check(self) -> bool:
|
||||
"""Check if database connection is healthy"""
|
||||
try:
|
||||
@@ -83,9 +73,9 @@ class DatabaseWriter:
|
||||
except Exception as e:
|
||||
self.logger.error(f"Database health check failed: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def close(self):
|
||||
"""Close database engine and all connections"""
|
||||
if hasattr(self, 'engine') and self.engine:
|
||||
if hasattr(self, "engine") and self.engine:
|
||||
self.engine.dispose()
|
||||
self.logger.info("Database engine closed")
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
Main entry point for the database writer service.
|
||||
Orchestrates the read → transform → write cycle with error handling.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
@@ -12,12 +13,12 @@ from typing import List
|
||||
from config import config
|
||||
from redis_reader import RedisReader
|
||||
from db_writer import DatabaseWriter
|
||||
from schema import SchemaHandler, StreamMessage, SensorReading
|
||||
from schema import SchemaHandler, StreamMessage, TelemetryReading
|
||||
|
||||
|
||||
def configure_logging():
|
||||
"""Configure structured logging"""
|
||||
if config.log.format == 'json':
|
||||
if config.log.format == "json":
|
||||
structlog.configure(
|
||||
processors=[
|
||||
structlog.stdlib.filter_by_level,
|
||||
@@ -27,129 +28,127 @@ def configure_logging():
|
||||
structlog.processors.TimeStamper(fmt="iso"),
|
||||
structlog.processors.StackInfoRenderer(),
|
||||
structlog.processors.format_exc_info,
|
||||
structlog.processors.JSONRenderer()
|
||||
structlog.processors.JSONRenderer(),
|
||||
],
|
||||
wrapper_class=structlog.stdlib.BoundLogger,
|
||||
context_class=dict,
|
||||
logger_factory=structlog.stdlib.LoggerFactory(),
|
||||
cache_logger_on_first_use=True,
|
||||
)
|
||||
|
||||
# Configure standard logging
|
||||
|
||||
logging.basicConfig(
|
||||
level=getattr(logging, config.log.level.upper(), logging.INFO),
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
)
|
||||
|
||||
|
||||
class DatabaseWriterService:
|
||||
"""Main service class that orchestrates the data pipeline"""
|
||||
|
||||
|
||||
def __init__(self):
|
||||
self.running = False
|
||||
self.redis_reader: RedisReader = None
|
||||
self.db_writer: DatabaseWriter = None
|
||||
self.schema_handler: SchemaHandler = None
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
# Setup signal handlers for graceful shutdown
|
||||
|
||||
signal.signal(signal.SIGTERM, self._signal_handler)
|
||||
signal.signal(signal.SIGINT, self._signal_handler)
|
||||
|
||||
# Statistics
|
||||
|
||||
self.stats = {
|
||||
'messages_read': 0,
|
||||
'messages_written': 0,
|
||||
'messages_failed': 0,
|
||||
'batches_processed': 0,
|
||||
'errors': 0
|
||||
"messages_read": 0,
|
||||
"messages_written": 0,
|
||||
"messages_failed": 0,
|
||||
"batches_processed": 0,
|
||||
"errors": 0,
|
||||
}
|
||||
|
||||
|
||||
def _signal_handler(self, signum, frame):
|
||||
"""Handle shutdown signals"""
|
||||
self.logger.info(f"Received signal {signum}, initiating graceful shutdown...")
|
||||
self.stop()
|
||||
|
||||
|
||||
def start(self) -> bool:
|
||||
"""Start the service"""
|
||||
self.logger.info("Starting Database Writer Service...")
|
||||
|
||||
|
||||
try:
|
||||
# Validate configuration
|
||||
config.validate()
|
||||
self.logger.info("Configuration validated successfully")
|
||||
|
||||
# Initialize components
|
||||
|
||||
self.schema_handler = SchemaHandler()
|
||||
self.logger.info("Schema handler initialized")
|
||||
|
||||
|
||||
self.redis_reader = RedisReader()
|
||||
self.logger.info("Redis reader initialized")
|
||||
|
||||
|
||||
self.db_writer = DatabaseWriter()
|
||||
self.logger.info("Database writer initialized")
|
||||
|
||||
|
||||
# Start the processing loop
|
||||
self.running = True
|
||||
self.logger.info("Service started successfully, entering processing loop")
|
||||
|
||||
|
||||
self._processing_loop()
|
||||
|
||||
|
||||
return True
|
||||
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Service startup failed: {e}", exc_info=True)
|
||||
return False
|
||||
|
||||
|
||||
def _processing_loop(self):
|
||||
"""Main processing loop"""
|
||||
consecutive_errors = 0
|
||||
max_consecutive_errors = 5
|
||||
|
||||
|
||||
while self.running:
|
||||
try:
|
||||
# Read batch from Redis
|
||||
messages = self.redis_reader.read_batch()
|
||||
|
||||
|
||||
if not messages:
|
||||
# No messages, sleep briefly
|
||||
time.sleep(config.consumer.processing_interval_sec)
|
||||
continue
|
||||
|
||||
self.stats['messages_read'] += len(messages)
|
||||
|
||||
self.stats["messages_read"] += len(messages)
|
||||
self.logger.debug(f"Read {len(messages)} messages from Redis")
|
||||
|
||||
|
||||
# Transform messages to sensor readings
|
||||
readings = self._transform_messages(messages)
|
||||
|
||||
|
||||
if not readings:
|
||||
self.logger.warning("No valid readings after transformation")
|
||||
# Acknowledge the messages anyway (they were invalid)
|
||||
self.redis_reader.acknowledge_batch(messages)
|
||||
continue
|
||||
|
||||
|
||||
# Write to database
|
||||
result = self.db_writer.write_batch(readings)
|
||||
|
||||
if result.success:
|
||||
success = self.db_writer.write_batch(readings)
|
||||
|
||||
if success:
|
||||
# Successfully written, acknowledge the messages
|
||||
ack_count = self.redis_reader.acknowledge_batch(messages)
|
||||
self.stats['messages_written'] += result.rows_written
|
||||
self.stats['batches_processed'] += 1
|
||||
self.stats["messages_written"] += len(readings)
|
||||
self.stats["batches_processed"] += 1
|
||||
consecutive_errors = 0
|
||||
|
||||
|
||||
self.logger.info(
|
||||
f"Processed batch: {result.rows_written} readings written, "
|
||||
f"Processed batch: {len(readings)} readings written, "
|
||||
f"{ack_count} messages acknowledged"
|
||||
)
|
||||
else:
|
||||
# Write failed, send to dead letter queue
|
||||
self.logger.error(f"Failed to write batch: {result.error}")
|
||||
self._handle_failed_batch(messages, result.error)
|
||||
self.stats['messages_failed'] += len(messages)
|
||||
self.stats['errors'] += 1
|
||||
# Write failed, log error and acknowledge to prevent blocking
|
||||
self.logger.error(
|
||||
f"Failed to write batch of {len(readings)} readings"
|
||||
)
|
||||
# Acknowledge anyway so they don't block the queue
|
||||
self.redis_reader.acknowledge_batch(messages)
|
||||
self.stats["messages_failed"] += len(messages)
|
||||
self.stats["errors"] += 1
|
||||
consecutive_errors += 1
|
||||
|
||||
|
||||
# Check for too many consecutive errors
|
||||
if consecutive_errors >= max_consecutive_errors:
|
||||
self.logger.error(
|
||||
@@ -158,26 +157,28 @@ class DatabaseWriterService:
|
||||
)
|
||||
time.sleep(30)
|
||||
consecutive_errors = 0
|
||||
|
||||
|
||||
# Brief pause between batches
|
||||
if config.consumer.processing_interval_sec > 0:
|
||||
time.sleep(config.consumer.processing_interval_sec)
|
||||
|
||||
|
||||
except KeyboardInterrupt:
|
||||
self.logger.info("Keyboard interrupt received")
|
||||
break
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error in processing loop: {e}", exc_info=True)
|
||||
self.stats['errors'] += 1
|
||||
self.stats["errors"] += 1
|
||||
consecutive_errors += 1
|
||||
time.sleep(5) # Back off on errors
|
||||
|
||||
|
||||
self.logger.info("Processing loop terminated")
|
||||
|
||||
def _transform_messages(self, messages: List[StreamMessage]) -> List[SensorReading]:
|
||||
|
||||
def _transform_messages(
|
||||
self, messages: List[StreamMessage]
|
||||
) -> List[TelemetryReading]:
|
||||
"""Transform stream messages to sensor readings"""
|
||||
readings = []
|
||||
|
||||
|
||||
for msg in messages:
|
||||
reading = self.schema_handler.transform_message(msg)
|
||||
if reading:
|
||||
@@ -186,28 +187,17 @@ class DatabaseWriterService:
|
||||
self.logger.warning(
|
||||
f"Failed to transform message {msg.message_id} from {msg.stream_key}"
|
||||
)
|
||||
|
||||
|
||||
return readings
|
||||
|
||||
def _handle_failed_batch(self, messages: List[StreamMessage], error: str):
|
||||
"""Handle a batch that failed to write to database"""
|
||||
# Send all messages to dead letter queue
|
||||
for msg in messages:
|
||||
self.redis_reader.send_to_dead_letter(msg, error)
|
||||
|
||||
# Acknowledge them so they don't block the consumer group
|
||||
self.redis_reader.acknowledge_batch(messages)
|
||||
|
||||
self.logger.warning(f"Sent {len(messages)} messages to dead letter queue")
|
||||
|
||||
|
||||
def stop(self):
|
||||
"""Stop the service gracefully"""
|
||||
if not self.running:
|
||||
return
|
||||
|
||||
|
||||
self.logger.info("Stopping service...")
|
||||
self.running = False
|
||||
|
||||
|
||||
# Print final statistics
|
||||
self.logger.info(
|
||||
f"Final statistics: "
|
||||
@@ -217,31 +207,31 @@ class DatabaseWriterService:
|
||||
f"batches_processed={self.stats['batches_processed']}, "
|
||||
f"errors={self.stats['errors']}"
|
||||
)
|
||||
|
||||
|
||||
# Close connections
|
||||
if self.redis_reader:
|
||||
self.redis_reader.close()
|
||||
|
||||
|
||||
if self.db_writer:
|
||||
self.db_writer.close()
|
||||
|
||||
|
||||
self.logger.info("Service stopped")
|
||||
|
||||
|
||||
def health_check(self) -> dict:
|
||||
"""Check service health"""
|
||||
health = {
|
||||
'running': self.running,
|
||||
'redis': False,
|
||||
'database': False,
|
||||
'stats': self.stats
|
||||
"running": self.running,
|
||||
"redis": False,
|
||||
"database": False,
|
||||
"stats": self.stats,
|
||||
}
|
||||
|
||||
|
||||
if self.redis_reader:
|
||||
health['redis'] = self.redis_reader.health_check()
|
||||
|
||||
health["redis"] = self.redis_reader.health_check()
|
||||
|
||||
if self.db_writer:
|
||||
health['database'] = self.db_writer.health_check()
|
||||
|
||||
health["database"] = self.db_writer.health_check()
|
||||
|
||||
return health
|
||||
|
||||
|
||||
@@ -250,7 +240,7 @@ def main():
|
||||
# Configure logging
|
||||
configure_logging()
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
logger.info("=" * 60)
|
||||
logger.info("Database Writer Service")
|
||||
logger.info(f"Consumer Group: {config.consumer.group_name}")
|
||||
@@ -258,9 +248,9 @@ def main():
|
||||
logger.info(f"Batch Size: {config.consumer.batch_size}")
|
||||
logger.info(f"Stream Pattern: {config.stream.pattern}")
|
||||
logger.info("=" * 60)
|
||||
|
||||
|
||||
service = DatabaseWriterService()
|
||||
|
||||
|
||||
try:
|
||||
success = service.start()
|
||||
if not success:
|
||||
@@ -273,9 +263,9 @@ def main():
|
||||
sys.exit(1)
|
||||
finally:
|
||||
service.stop()
|
||||
|
||||
|
||||
logger.info("Service exited")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
import redis
|
||||
import logging
|
||||
from typing import List, Optional, Dict
|
||||
@@ -8,123 +7,127 @@ from schema import SchemaHandler, StreamMessage
|
||||
|
||||
class RedisReader:
|
||||
"""Redis stream consumer with consumer groups for reliability"""
|
||||
|
||||
def __init__(self, streams: Optional[List[str]] = None):
|
||||
|
||||
def __init__(self, stream_name: str = "mqtt:ingestion"):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.schema_handler = SchemaHandler()
|
||||
|
||||
|
||||
self.redis_client = redis.StrictRedis(
|
||||
host=config.redis.host,
|
||||
port=config.redis.port,
|
||||
db=config.redis.db,
|
||||
password=config.redis.password,
|
||||
decode_responses=False
|
||||
decode_responses=False,
|
||||
)
|
||||
|
||||
|
||||
self.redis_client.ping()
|
||||
self.logger.info(f"Connected to Redis at {config.redis.host}:{config.redis.port}")
|
||||
|
||||
if streams:
|
||||
self.streams = streams
|
||||
else:
|
||||
pattern = config.stream.pattern
|
||||
keys = self.redis_client.keys(pattern)
|
||||
self.streams = [k.decode('utf-8') if isinstance(k, bytes) else k for k in keys]
|
||||
|
||||
self.logger.info(f"Monitoring {len(self.streams)} streams")
|
||||
|
||||
# Initialize consumer groups
|
||||
self._setup_consumer_groups()
|
||||
|
||||
def _setup_consumer_groups(self):
|
||||
"""Create consumer groups for streams"""
|
||||
self.logger.info(
|
||||
f"Connected to Redis at {config.redis.host}:{config.redis.port}"
|
||||
)
|
||||
|
||||
# Use single stream instead of pattern matching
|
||||
self.stream_name = stream_name
|
||||
self.logger.info(f"Monitoring stream: {self.stream_name}")
|
||||
|
||||
# Initialize consumer group for the single stream
|
||||
self._setup_consumer_group()
|
||||
|
||||
def _setup_consumer_group(self):
|
||||
"""Create consumer group for the single stream"""
|
||||
group_name = config.consumer.group_name
|
||||
|
||||
for stream in self.streams:
|
||||
try:
|
||||
self.redis_client.xgroup_create(stream, group_name, id='0', mkstream=True)
|
||||
self.logger.info(f"Created consumer group '{group_name}' for '{stream}'")
|
||||
except redis.exceptions.ResponseError as e:
|
||||
if 'BUSYGROUP' in str(e):
|
||||
self.logger.debug(f"Consumer group '{group_name}' already exists for '{stream}'")
|
||||
else:
|
||||
self.logger.error(f"Error creating consumer group for {stream}: {e}")
|
||||
|
||||
def read_batch(self, batch_size: Optional[int] = None,
|
||||
timeout_ms: Optional[int] = None) -> List[StreamMessage]:
|
||||
"""Read a batch of messages from streams using consumer group"""
|
||||
try:
|
||||
self.redis_client.xgroup_create(
|
||||
self.stream_name, group_name, id="0", mkstream=True
|
||||
)
|
||||
self.logger.info(
|
||||
f"Created consumer group '{group_name}' for '{self.stream_name}'"
|
||||
)
|
||||
except redis.exceptions.ResponseError as e:
|
||||
if "BUSYGROUP" in str(e):
|
||||
self.logger.debug(
|
||||
f"Consumer group '{group_name}' already exists for '{self.stream_name}'"
|
||||
)
|
||||
else:
|
||||
self.logger.error(f"Error creating consumer group: {e}")
|
||||
|
||||
def read_batch(
|
||||
self, batch_size: Optional[int] = None, timeout_ms: Optional[int] = None
|
||||
) -> List[StreamMessage]:
|
||||
"""Read a batch of messages from single stream using consumer group"""
|
||||
if batch_size is None:
|
||||
batch_size = config.consumer.batch_size
|
||||
if timeout_ms is None:
|
||||
timeout_ms = config.consumer.block_time_ms
|
||||
|
||||
if not self.streams:
|
||||
return []
|
||||
|
||||
# Prepare stream dict for XREADGROUP
|
||||
stream_dict = {stream: '>' for stream in self.streams}
|
||||
|
||||
|
||||
# Read from single stream - much simpler!
|
||||
stream_dict = {self.stream_name: ">"}
|
||||
|
||||
try:
|
||||
results = self.redis_client.xreadgroup(
|
||||
groupname=config.consumer.group_name,
|
||||
consumername=config.consumer.consumer_name,
|
||||
streams=stream_dict,
|
||||
count=batch_size,
|
||||
block=timeout_ms
|
||||
block=timeout_ms,
|
||||
)
|
||||
|
||||
|
||||
if not results:
|
||||
return []
|
||||
|
||||
|
||||
# Parse results into StreamMessage objects
|
||||
messages = []
|
||||
for stream_key, entries in results:
|
||||
stream_name = stream_key.decode('utf-8') if isinstance(stream_key, bytes) else stream_key
|
||||
|
||||
for message_id, fields in entries:
|
||||
msg_id = message_id.decode('utf-8') if isinstance(message_id, bytes) else message_id
|
||||
|
||||
stream_msg = self.schema_handler.parse_stream_entry(stream_name, msg_id, fields)
|
||||
msg_id = (
|
||||
message_id.decode("utf-8")
|
||||
if isinstance(message_id, bytes)
|
||||
else message_id
|
||||
)
|
||||
|
||||
# Parse with new format (device_id and metric in payload)
|
||||
stream_msg = self.schema_handler.parse_stream_entry_new_format(
|
||||
self.stream_name, msg_id, fields
|
||||
)
|
||||
if stream_msg:
|
||||
messages.append(stream_msg)
|
||||
|
||||
|
||||
if messages:
|
||||
self.logger.debug(f"Read {len(messages)} messages")
|
||||
|
||||
|
||||
return messages
|
||||
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error reading from Redis: {e}")
|
||||
return []
|
||||
|
||||
|
||||
def acknowledge_batch(self, messages: List[StreamMessage]) -> int:
|
||||
"""Acknowledge multiple messages at once"""
|
||||
ack_count = 0
|
||||
|
||||
|
||||
# Group messages by stream
|
||||
by_stream: Dict[str, List[str]] = {}
|
||||
for msg in messages:
|
||||
if msg.stream_key not in by_stream:
|
||||
by_stream[msg.stream_key] = []
|
||||
by_stream[msg.stream_key].append(msg.message_id)
|
||||
|
||||
|
||||
# Acknowledge each stream's messages
|
||||
for stream_key, message_ids in by_stream.items():
|
||||
try:
|
||||
result = self.redis_client.xack(
|
||||
stream_key,
|
||||
config.consumer.group_name,
|
||||
*message_ids
|
||||
stream_key, config.consumer.group_name, *message_ids
|
||||
)
|
||||
ack_count += result
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to acknowledge messages from {stream_key}: {e}")
|
||||
|
||||
self.logger.error(
|
||||
f"Failed to acknowledge messages from {stream_key}: {e}"
|
||||
)
|
||||
|
||||
if ack_count > 0:
|
||||
self.logger.debug(f"Acknowledged {ack_count} messages")
|
||||
|
||||
|
||||
return ack_count
|
||||
|
||||
|
||||
def health_check(self) -> bool:
|
||||
"""Check if Redis connection is healthy"""
|
||||
try:
|
||||
@@ -132,7 +135,7 @@ class RedisReader:
|
||||
return True
|
||||
except redis.RedisError:
|
||||
return False
|
||||
|
||||
|
||||
def close(self):
|
||||
"""Close Redis connection"""
|
||||
try:
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
Schema definitions and data transformation logic.
|
||||
Handles conversion between Redis stream messages and database records.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Optional, Dict, Any
|
||||
@@ -11,6 +12,7 @@ import json
|
||||
@dataclass
|
||||
class StreamMessage:
|
||||
"""Represents a message from Redis stream"""
|
||||
|
||||
stream_key: str
|
||||
message_id: str
|
||||
device_id: str
|
||||
@@ -18,7 +20,7 @@ class StreamMessage:
|
||||
value: float
|
||||
timestamp: str
|
||||
metadata: Optional[Dict[str, Any]] = None
|
||||
|
||||
|
||||
@property
|
||||
def stream_name(self) -> str:
|
||||
"""Return the stream name without prefix"""
|
||||
@@ -26,173 +28,176 @@ class StreamMessage:
|
||||
|
||||
|
||||
@dataclass
|
||||
class SensorReading:
|
||||
"""Represents a sensor reading ready for database insertion"""
|
||||
timestamp: datetime
|
||||
class TelemetryReading:
|
||||
"""Represents a telemetry reading ready for database insertion - matches Telemetry model"""
|
||||
|
||||
time: datetime
|
||||
device_id: str
|
||||
sensor_type: str
|
||||
metric: str # renamed from sensor_type
|
||||
value: float
|
||||
metadata: Optional[Dict[str, Any]] = None
|
||||
|
||||
unit: Optional[str] = None
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary for database insertion"""
|
||||
return {
|
||||
'timestamp': self.timestamp,
|
||||
'device_id': self.device_id,
|
||||
'sensor_type': self.sensor_type,
|
||||
'value': self.value,
|
||||
'metadata': json.dumps(self.metadata) if self.metadata else None
|
||||
"time": self.time,
|
||||
"device_id": self.device_id,
|
||||
"metric": self.metric,
|
||||
"value": self.value,
|
||||
"unit": self.unit,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ValidationResult:
|
||||
"""Result of data validation"""
|
||||
|
||||
valid: bool
|
||||
error: Optional[str] = None
|
||||
|
||||
|
||||
class SchemaHandler:
|
||||
"""Handles schema transformation and validation"""
|
||||
|
||||
|
||||
def __init__(self):
|
||||
self.logger = self._get_logger()
|
||||
|
||||
|
||||
def _get_logger(self):
|
||||
"""Get logger instance"""
|
||||
import logging
|
||||
|
||||
return logging.getLogger(__name__)
|
||||
|
||||
def transform_message(self, stream_message: StreamMessage) -> Optional[SensorReading]:
|
||||
|
||||
def transform_message(
|
||||
self, stream_message: StreamMessage
|
||||
) -> Optional[TelemetryReading]:
|
||||
"""
|
||||
Transform a Redis stream message into a SensorReading.
|
||||
Transform a Redis stream message into a TelemetryReading.
|
||||
Returns None if transformation fails.
|
||||
"""
|
||||
try:
|
||||
# Parse timestamp
|
||||
timestamp = self._parse_timestamp(stream_message.timestamp)
|
||||
|
||||
# Create sensor reading
|
||||
reading = SensorReading(
|
||||
timestamp=timestamp,
|
||||
|
||||
reading = TelemetryReading(
|
||||
time=timestamp,
|
||||
device_id=stream_message.device_id,
|
||||
sensor_type=stream_message.sensor_type,
|
||||
metric=stream_message.sensor_type, # sensor_type maps to metric
|
||||
value=float(stream_message.value),
|
||||
metadata=stream_message.metadata
|
||||
unit=stream_message.metadata.get("unit")
|
||||
if stream_message.metadata
|
||||
else None,
|
||||
)
|
||||
|
||||
|
||||
# Validate the reading
|
||||
validation = self.validate_reading(reading)
|
||||
if not validation.valid:
|
||||
self.logger.error(f"Invalid reading: {validation.error}")
|
||||
return None
|
||||
|
||||
|
||||
return reading
|
||||
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to transform message: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
def validate_reading(self, reading: SensorReading) -> ValidationResult:
|
||||
"""Validate a sensor reading"""
|
||||
|
||||
def validate_reading(self, reading: TelemetryReading) -> ValidationResult:
|
||||
"""Validate a telemetry reading"""
|
||||
try:
|
||||
# Check required fields
|
||||
if not reading.device_id:
|
||||
return ValidationResult(False, "device_id is required")
|
||||
|
||||
if not reading.sensor_type:
|
||||
return ValidationResult(False, "sensor_type is required")
|
||||
|
||||
|
||||
if not reading.metric:
|
||||
return ValidationResult(False, "metric is required")
|
||||
|
||||
if reading.value is None:
|
||||
return ValidationResult(False, "value is required")
|
||||
|
||||
# Validate timestamp
|
||||
if not isinstance(reading.timestamp, datetime):
|
||||
return ValidationResult(False, "timestamp must be a datetime object")
|
||||
|
||||
# Validate value is numeric
|
||||
|
||||
if not isinstance(reading.time, datetime):
|
||||
return ValidationResult(False, "time must be a datetime object")
|
||||
|
||||
if not isinstance(reading.value, (int, float)):
|
||||
return ValidationResult(False, "value must be numeric")
|
||||
|
||||
# Check for reasonable value ranges (can be customized)
|
||||
|
||||
if reading.value < -1000000 or reading.value > 1000000:
|
||||
self.logger.warning(f"Value {reading.value} is outside typical range")
|
||||
|
||||
|
||||
return ValidationResult(True)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
return ValidationResult(False, f"Validation error: {str(e)}")
|
||||
|
||||
|
||||
def _parse_timestamp(self, timestamp_str: str) -> datetime:
|
||||
"""Parse timestamp string into datetime object"""
|
||||
# Try ISO format first
|
||||
try:
|
||||
return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
|
||||
return datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Try common formats
|
||||
|
||||
formats = [
|
||||
'%Y-%m-%dT%H:%M:%S.%fZ',
|
||||
'%Y-%m-%dT%H:%M:%SZ',
|
||||
'%Y-%m-%d %H:%M:%S.%f',
|
||||
'%Y-%m-%d %H:%M:%S',
|
||||
"%Y-%m-%dT%H:%M:%S.%fZ",
|
||||
"%Y-%m-%dT%H:%M:%SZ",
|
||||
"%Y-%m-%d %H:%M:%S.%f",
|
||||
"%Y-%m-%d %H:%M:%S",
|
||||
]
|
||||
|
||||
|
||||
for fmt in formats:
|
||||
try:
|
||||
return datetime.strptime(timestamp_str, fmt)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
# If all else fails, use current time and log warning
|
||||
self.logger.warning(f"Could not parse timestamp '{timestamp_str}', using current time")
|
||||
|
||||
self.logger.warning(
|
||||
f"Could not parse timestamp '{timestamp_str}', using current time"
|
||||
)
|
||||
return datetime.utcnow()
|
||||
|
||||
def parse_stream_entry(self, stream_key: str, message_id: str, fields: Dict[bytes, bytes]) -> Optional[StreamMessage]:
|
||||
|
||||
def parse_stream_entry_new_format(
|
||||
self, stream_key: str, message_id: str, fields: Dict[bytes, bytes]
|
||||
) -> Optional[StreamMessage]:
|
||||
"""
|
||||
Parse a raw Redis stream entry into a StreamMessage.
|
||||
Expected stream key format: mqtt_stream:{device_id}:{sensor_type}
|
||||
Expected fields: value, timestamp (and optionally metadata)
|
||||
Parse a raw Redis stream entry with NEW single-stream format.
|
||||
Expected fields: device_id, metric, value, timestamp
|
||||
"""
|
||||
try:
|
||||
# Extract device_id and sensor_type from stream key
|
||||
# Format: mqtt_stream:{device_id}:{sensor_type}
|
||||
parts = stream_key.split(':')
|
||||
if len(parts) < 3:
|
||||
self.logger.error(f"Invalid stream key format: {stream_key}")
|
||||
return None
|
||||
|
||||
device_id = parts[1]
|
||||
sensor_type = ':'.join(parts[2:]) # Handle sensor types with colons
|
||||
|
||||
# Extract fields from message
|
||||
value_bytes = fields.get(b'value')
|
||||
timestamp_bytes = fields.get(b'timestamp') or fields.get(b'time')
|
||||
|
||||
if not value_bytes or not timestamp_bytes:
|
||||
# Extract fields from message (device_id and metric are IN the payload now!)
|
||||
device_id_bytes = fields.get(b"device_id")
|
||||
metric_bytes = fields.get(b"metric")
|
||||
value_bytes = fields.get(b"value")
|
||||
timestamp_bytes = fields.get(b"timestamp") or fields.get(b"time")
|
||||
|
||||
if not all([device_id_bytes, metric_bytes, value_bytes, timestamp_bytes]):
|
||||
self.logger.error(f"Missing required fields in message: {fields}")
|
||||
return None
|
||||
|
||||
|
||||
# Parse metadata if present
|
||||
metadata = None
|
||||
metadata_bytes = fields.get(b'metadata')
|
||||
metadata_bytes = fields.get(b"metadata")
|
||||
if metadata_bytes:
|
||||
try:
|
||||
metadata = json.loads(metadata_bytes.decode('utf-8'))
|
||||
metadata = json.loads(metadata_bytes.decode("utf-8"))
|
||||
except json.JSONDecodeError:
|
||||
self.logger.warning(f"Could not parse metadata: {metadata_bytes}")
|
||||
|
||||
|
||||
return StreamMessage(
|
||||
stream_key=stream_key,
|
||||
message_id=message_id,
|
||||
device_id=device_id,
|
||||
sensor_type=sensor_type,
|
||||
value=float(value_bytes.decode('utf-8')),
|
||||
timestamp=timestamp_bytes.decode('utf-8'),
|
||||
metadata=metadata
|
||||
device_id=device_id_bytes.decode("utf-8"),
|
||||
sensor_type=metric_bytes.decode("utf-8"),
|
||||
value=float(value_bytes.decode("utf-8")),
|
||||
timestamp=timestamp_bytes.decode("utf-8"),
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to parse stream entry: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
def parse_stream_entry(
|
||||
self, stream_key: str, message_id: str, fields: Dict[bytes, bytes]
|
||||
) -> Optional[StreamMessage]:
|
||||
"""
|
||||
DEPRECATED: Old format with stream key containing device_id.
|
||||
Kept for backward compatibility. Use parse_stream_entry_new_format() instead.
|
||||
"""
|
||||
return self.parse_stream_entry_new_format(stream_key, message_id, fields)
|
||||
|
||||
Reference in New Issue
Block a user