import { Pool, PoolClient, QueryResult } from 'pg'; // PostgreSQL connection pool const pool = new Pool({ host: process.env.POSTGRES_HOST || 'localhost', port: parseInt(process.env.POSTGRES_PORT || '5432'), database: process.env.POSTGRES_DB || 'iot_data', user: process.env.POSTGRES_USER || 'postgres', password: process.env.POSTGRES_PASSWORD || 'example', max: 20, idleTimeoutMillis: 30000, connectionTimeoutMillis: 2000, }); // Test connection pool.on('error', (err) => { console.error('Unexpected error on idle client', err); }); // Create tables with TimescaleDB hypertables export async function initDatabase() { const client = await pool.connect(); try { // Enable TimescaleDB extension await client.query('CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE'); // Vehicle positions time-series await client.query(` CREATE TABLE IF NOT EXISTS vehicle_positions ( time TIMESTAMPTZ NOT NULL, vehicle_id TEXT NOT NULL, route_id TEXT NOT NULL, trip_id TEXT, latitude DOUBLE PRECISION NOT NULL, longitude DOUBLE PRECISION NOT NULL, speed DOUBLE PRECISION, bearing DOUBLE PRECISION, current_status TEXT, PRIMARY KEY (time, vehicle_id) ) `); // Convert to hypertable if not already try { await client.query(` SELECT create_hypertable('vehicle_positions', 'time', if_not_exists => TRUE, chunk_time_interval => INTERVAL '1 day' ) `); } catch (e: any) { if (!e.message?.includes('already a hypertable')) { console.warn('Note: vehicle_positions hypertable setup:', e.message); } } // Create indexes await client.query(` CREATE INDEX IF NOT EXISTS idx_vehicle_positions_vehicle ON vehicle_positions(vehicle_id, time DESC) `); await client.query(` CREATE INDEX IF NOT EXISTS idx_vehicle_positions_route ON vehicle_positions(route_id, time DESC) `); // Arrival records time-series await client.query(` CREATE TABLE IF NOT EXISTS arrival_records ( time TIMESTAMPTZ NOT NULL, stop_id TEXT NOT NULL, route_id TEXT NOT NULL, trip_id TEXT, scheduled_time TIMESTAMPTZ NOT NULL, predicted_time TIMESTAMPTZ, actual_time TIMESTAMPTZ, delay_seconds INTEGER, is_realtime BOOLEAN, headsign TEXT, PRIMARY KEY (time, stop_id, route_id, scheduled_time) ) `); // Convert to hypertable try { await client.query(` SELECT create_hypertable('arrival_records', 'time', if_not_exists => TRUE, chunk_time_interval => INTERVAL '1 day' ) `); } catch (e: any) { if (!e.message?.includes('already a hypertable')) { console.warn('Note: arrival_records hypertable setup:', e.message); } } // Create indexes await client.query(` CREATE INDEX IF NOT EXISTS idx_arrival_records_stop ON arrival_records(stop_id, time DESC) `); await client.query(` CREATE INDEX IF NOT EXISTS idx_arrival_records_route ON arrival_records(route_id, time DESC) `); // Service metrics (aggregated hourly) await client.query(` CREATE TABLE IF NOT EXISTS service_metrics ( hour_timestamp TIMESTAMPTZ NOT NULL, route_id TEXT NOT NULL, stop_id TEXT NOT NULL DEFAULT '', total_arrivals INTEGER DEFAULT 0, on_time_arrivals INTEGER DEFAULT 0, late_arrivals INTEGER DEFAULT 0, avg_delay_seconds DOUBLE PRECISION DEFAULT 0, max_delay_seconds INTEGER DEFAULT 0, active_vehicles INTEGER DEFAULT 0, PRIMARY KEY (hour_timestamp, route_id, stop_id) ) `); // Route performance summary await client.query(` CREATE TABLE IF NOT EXISTS route_stats ( route_id TEXT PRIMARY KEY, route_name TEXT, last_updated TIMESTAMPTZ, total_trips INTEGER DEFAULT 0, avg_delay_seconds DOUBLE PRECISION DEFAULT 0, reliability_score DOUBLE PRECISION DEFAULT 0 ) `); // Create continuous aggregates for better performance try { await client.query(` CREATE MATERIALIZED VIEW IF NOT EXISTS hourly_route_delays WITH (timescaledb.continuous) AS SELECT time_bucket('1 hour', time) AS bucket, route_id, COUNT(*) as arrival_count, AVG(delay_seconds) as avg_delay, MAX(delay_seconds) as max_delay, COUNT(*) FILTER (WHERE ABS(delay_seconds) <= 120) as on_time_count FROM arrival_records WHERE delay_seconds IS NOT NULL GROUP BY bucket, route_id WITH NO DATA `); // Refresh policy await client.query(` SELECT add_continuous_aggregate_policy('hourly_route_delays', start_offset => INTERVAL '3 hours', end_offset => INTERVAL '1 hour', schedule_interval => INTERVAL '1 hour', if_not_exists => TRUE ) `); } catch (e: any) { if (!e.message?.includes('already exists')) { console.warn('Note: continuous aggregate setup:', e.message); } } // Set up retention policy (keep 90 days) try { await client.query(` SELECT add_retention_policy('vehicle_positions', INTERVAL '90 days', if_not_exists => TRUE ) `); await client.query(` SELECT add_retention_policy('arrival_records', INTERVAL '90 days', if_not_exists => TRUE ) `); } catch (e: any) { console.warn('Note: retention policy setup:', e.message); } console.log('[OK] TimescaleDB initialized successfully'); } finally { client.release(); } } // Insert vehicle position export interface VehiclePosition { timestamp: number; vehicleId: string; routeId: string; tripId?: string; latitude: number; longitude: number; speed?: number; bearing?: number; currentStatus?: string; } export async function logVehiclePosition(position: VehiclePosition) { try { await pool.query(` INSERT INTO vehicle_positions (time, vehicle_id, route_id, trip_id, latitude, longitude, speed, bearing, current_status) VALUES (to_timestamp($1::double precision / 1000), $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (time, vehicle_id) DO NOTHING `, [ position.timestamp, position.vehicleId, position.routeId, position.tripId || null, position.latitude, position.longitude, position.speed || null, position.bearing || null, position.currentStatus || null ]); } catch (error) { console.error('Failed to log vehicle position:', error); } } // Batch insert vehicle positions export async function logVehiclePositions(positions: VehiclePosition[]) { if (positions.length === 0) return; const client = await pool.connect(); try { await client.query('BEGIN'); for (const pos of positions) { await client.query(` INSERT INTO vehicle_positions (time, vehicle_id, route_id, trip_id, latitude, longitude, speed, bearing, current_status) VALUES (to_timestamp($1::double precision / 1000), $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (time, vehicle_id) DO NOTHING `, [ pos.timestamp, pos.vehicleId, pos.routeId, pos.tripId || null, pos.latitude, pos.longitude, pos.speed || null, pos.bearing || null, pos.currentStatus || null ]); } await client.query('COMMIT'); } catch (error) { await client.query('ROLLBACK'); console.error('Failed to batch log vehicle positions:', error); } finally { client.release(); } } // Insert arrival record export interface ArrivalRecord { timestamp: number; stopId: string; routeId: string; tripId?: string; scheduledTime: number; predictedTime?: number; actualTime?: number; delaySeconds?: number; isRealtime: boolean; headsign?: string; } export async function logArrival(arrival: ArrivalRecord) { try { await pool.query(` INSERT INTO arrival_records (time, stop_id, route_id, trip_id, scheduled_time, predicted_time, actual_time, delay_seconds, is_realtime, headsign) VALUES ( to_timestamp($1::double precision / 1000), $2, $3, $4, to_timestamp($5::double precision / 1000), to_timestamp($6::double precision / 1000), to_timestamp($7::double precision / 1000), $8, $9, $10 ) ON CONFLICT (time, stop_id, route_id, scheduled_time) DO UPDATE SET predicted_time = EXCLUDED.predicted_time, delay_seconds = EXCLUDED.delay_seconds, is_realtime = EXCLUDED.is_realtime `, [ arrival.timestamp, arrival.stopId, arrival.routeId, arrival.tripId || null, arrival.scheduledTime, arrival.predictedTime || null, arrival.actualTime || null, arrival.delaySeconds || null, arrival.isRealtime, arrival.headsign || null ]); } catch (error) { console.error('Failed to log arrival:', error); } } // Query historical vehicle positions export async function getVehicleHistory(vehicleId: string, startTime: number, endTime: number) { const result = await pool.query(` SELECT EXTRACT(EPOCH FROM time) * 1000 as timestamp, latitude, longitude, speed, bearing, current_status FROM vehicle_positions WHERE vehicle_id = $1 AND time BETWEEN to_timestamp($2::double precision / 1000) AND to_timestamp($3::double precision / 1000) ORDER BY time ASC `, [vehicleId, startTime, endTime]); return result.rows; } // Query route vehicle positions in time range export async function getRouteVehiclePositions(routeId: string, startTime: number, endTime: number) { const result = await pool.query(` SELECT EXTRACT(EPOCH FROM time) * 1000 as timestamp, vehicle_id, latitude, longitude, speed, bearing, current_status FROM vehicle_positions WHERE route_id = $1 AND time BETWEEN to_timestamp($2::double precision / 1000) AND to_timestamp($3::double precision / 1000) ORDER BY time ASC `, [routeId, startTime, endTime]); return result.rows; } // Query arrival history for a stop export async function getStopArrivalHistory(stopId: string, routeId: string, startTime: number, endTime: number) { const result = await pool.query(` SELECT EXTRACT(EPOCH FROM time) * 1000 as timestamp, EXTRACT(EPOCH FROM scheduled_time) * 1000 as scheduled_time, EXTRACT(EPOCH FROM predicted_time) * 1000 as predicted_time, EXTRACT(EPOCH FROM actual_time) * 1000 as actual_time, delay_seconds, is_realtime, headsign FROM arrival_records WHERE stop_id = $1 AND route_id = $2 AND time BETWEEN to_timestamp($3::double precision / 1000) AND to_timestamp($4::double precision / 1000) ORDER BY time ASC `, [stopId, routeId, startTime, endTime]); return result.rows; } // Get route delay statistics export async function getRouteDelayStats(routeId: string, hours: number = 24) { const result = await pool.query(` SELECT COUNT(*) as total_arrivals, AVG(delay_seconds) as avg_delay, MIN(delay_seconds) as min_delay, MAX(delay_seconds) as max_delay, COUNT(*) FILTER (WHERE ABS(delay_seconds) <= 120) as on_time_count, COUNT(*) FILTER (WHERE delay_seconds > 120) as late_count, COUNT(*) FILTER (WHERE delay_seconds < -120) as early_count FROM arrival_records WHERE route_id = $1 AND time >= NOW() - INTERVAL '1 hour' * $2 `, [routeId, hours]); return result.rows[0]; } // Get stop delay statistics export async function getStopDelayStats(stopId: string, hours: number = 24) { const result = await pool.query(` SELECT route_id, COUNT(*) as total_arrivals, AVG(delay_seconds) as avg_delay, COUNT(*) FILTER (WHERE ABS(delay_seconds) <= 120) as on_time_count FROM arrival_records WHERE stop_id = $1 AND time >= NOW() - INTERVAL '1 hour' * $2 AND delay_seconds IS NOT NULL GROUP BY route_id `, [stopId, hours]); return result.rows; } // Get hourly pattern for a route export async function getRouteHourlyPattern(routeId: string, days: number = 7) { const result = await pool.query(` SELECT EXTRACT(HOUR FROM ar.time) as hour, COUNT(*) as arrival_count, AVG(ar.delay_seconds) as avg_delay, COUNT(DISTINCT vp.vehicle_id) as unique_vehicles FROM arrival_records ar LEFT JOIN vehicle_positions vp ON ar.trip_id = vp.trip_id AND ABS(EXTRACT(EPOCH FROM (ar.time - vp.time))) < 60 WHERE ar.route_id = $1 AND ar.time >= NOW() - INTERVAL '1 day' * $2 GROUP BY hour ORDER BY hour `, [routeId, days]); return result.rows; } // Cleanup old data (already handled by retention policy, but keep for manual cleanup) export async function cleanupOldData(daysToKeep: number = 90) { const client = await pool.connect(); try { const result1 = await client.query(` DELETE FROM vehicle_positions WHERE time < NOW() - INTERVAL '1 day' * $1 `, [daysToKeep]); const result2 = await client.query(` DELETE FROM arrival_records WHERE time < NOW() - INTERVAL '1 day' * $1 `, [daysToKeep]); console.log(`Cleaned up old data: ${result1.rowCount} positions, ${result2.rowCount} arrivals`); } finally { client.release(); } } // Get database statistics export async function getDatabaseStats() { const client = await pool.connect(); try { const positionCount = await client.query('SELECT COUNT(*) as count FROM vehicle_positions'); const arrivalCount = await client.query('SELECT COUNT(*) as count FROM arrival_records'); const oldestPosition = await client.query('SELECT MIN(time) as oldest FROM vehicle_positions'); const newestPosition = await client.query('SELECT MAX(time) as newest FROM vehicle_positions'); return { vehiclePositions: parseInt(positionCount.rows[0].count), arrivalRecords: parseInt(arrivalCount.rows[0].count), oldestRecord: oldestPosition.rows[0].oldest, newestRecord: newestPosition.rows[0].newest, dbType: 'TimescaleDB', host: pool.options.host, database: pool.options.database }; } finally { client.release(); } } // Close database export async function closeDatabase() { await pool.end(); }