Saving GTFS-RT data to Parquet
This commit is contained in:
444
index.ts
Normal file
444
index.ts
Normal file
@@ -0,0 +1,444 @@
|
||||
#!/usr/bin/env bun
|
||||
/**
|
||||
* Background tracker for popular bus routes in Skopje
|
||||
* Continuously monitors GTFS-RT feeds and stores data as parquet segments
|
||||
*/
|
||||
|
||||
import GtfsRealtimeBindings from 'gtfs-realtime-bindings';
|
||||
import { config } from './config';
|
||||
import { GtfsRoute, GtfsStop, loadGtfsRoutes, loadGtfsStops } from './lib/gtfs';
|
||||
import {
|
||||
initDatabase,
|
||||
logVehiclePositions,
|
||||
logVehicleFeedSnapshot,
|
||||
logArrival,
|
||||
closeDatabase,
|
||||
VehicleFeedSnapshot,
|
||||
VehiclePosition
|
||||
} from './lib/database';
|
||||
|
||||
// Popular routes to track
|
||||
const TRACKED_ROUTES = [
|
||||
{ id: '245', name: 'Route 2' },
|
||||
{ id: '121', name: 'Route 4' },
|
||||
{ id: '123', name: 'Route 5' },
|
||||
{ id: '125', name: 'Route 7' },
|
||||
{ id: '129', name: 'Route 15' },
|
||||
{ id: '134', name: 'Route 21' },
|
||||
{ id: '136', name: 'Route 22' },
|
||||
{ id: '138', name: 'Route 24' },
|
||||
// Private routes (П)
|
||||
{ id: '203', name: 'Route 22 П' },
|
||||
{ id: '204', name: 'Route 12 П' },
|
||||
];
|
||||
|
||||
const MONITORED_STOPS = [
|
||||
// Central area and major transit hubs
|
||||
'1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15',
|
||||
'16', '17', '18', '19', '20', '21',
|
||||
// Transit centers and bus stations
|
||||
'47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60',
|
||||
'61', '62', '63', '64', '65', '66', '67', '68',
|
||||
// Kisela Voda area
|
||||
'174', '175', '176', '177', '178', '179', '180', '181',
|
||||
// Gjorche Petrov corridor
|
||||
'246', '247', '248', '249', '250', '251', '252', '253', '254', '255', '256', '257',
|
||||
'258', '259', '260', '261', '262', '263', '264', '265', '266', '267', '268', '269',
|
||||
// Karposh areas
|
||||
'270', '271', '272', '273', '274', '275', '276', '277', '278', '279', '280', '281',
|
||||
'282', '283', '284', '285', '286', '287', '288', '289',
|
||||
];
|
||||
|
||||
const REFRESH_INTERVAL = 30000; // 30 seconds
|
||||
const ARRIVAL_STOP_CAP = 150; // Max stops to query per cycle
|
||||
const SAVE_ALL_VEHICLE_SNAPSHOTS = (process.env.SAVE_ALL_VEHICLE_SNAPSHOTS ?? 'true').toLowerCase() === 'true';
|
||||
const SAVE_ALL_VEHICLE_POSITIONS = (process.env.SAVE_ALL_VEHICLE_POSITIONS ?? 'true').toLowerCase() === 'true';
|
||||
|
||||
let stats = {
|
||||
cycles: 0,
|
||||
lastUpdate: new Date(),
|
||||
vehiclesTracked: 0,
|
||||
vehicleSnapshots: 0,
|
||||
arrivalsLogged: 0,
|
||||
errors: 0,
|
||||
};
|
||||
|
||||
function toOptionalNumber(value: unknown): number | undefined {
|
||||
return typeof value === 'number' && Number.isFinite(value) ? value : undefined;
|
||||
}
|
||||
|
||||
// Load GTFS stops data
|
||||
const stops = loadGtfsStops();
|
||||
const stopsByCode = new Map<string, GtfsStop>();
|
||||
|
||||
for (const stop of stops.values()) {
|
||||
if (stop.stop_code) {
|
||||
stopsByCode.set(stop.stop_code, stop);
|
||||
}
|
||||
}
|
||||
|
||||
// Load GTFS routes data to map route short names back to IDs
|
||||
const routes = loadGtfsRoutes();
|
||||
const trackedRouteIds = new Set(TRACKED_ROUTES.map(r => r.id));
|
||||
const routeIdByShortName = new Map<string, string>();
|
||||
const routeIdByShortNameNormalized = new Map<string, string>();
|
||||
|
||||
function normalizeRouteKey(value: string): string {
|
||||
return value.replace(/\s+/g, ' ').trim();
|
||||
}
|
||||
|
||||
for (const routeId of trackedRouteIds) {
|
||||
const route = routes.get(routeId);
|
||||
if (route?.route_short_name) {
|
||||
const shortName = route.route_short_name;
|
||||
routeIdByShortName.set(shortName, routeId);
|
||||
routeIdByShortNameNormalized.set(normalizeRouteKey(shortName), routeId);
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch and process vehicle positions
|
||||
async function trackVehicles() {
|
||||
try {
|
||||
console.log(`[${new Date().toISOString()}] Fetching vehicle data...`);
|
||||
|
||||
// Fetch all vehicles from JSON API
|
||||
const vehiclesResponse = await fetch(config.apiEndpoints.vehiclesJson);
|
||||
|
||||
if (!vehiclesResponse.ok) {
|
||||
throw new Error(`HTTP error! status: ${vehiclesResponse.status}`);
|
||||
}
|
||||
|
||||
const allVehicles = await vehiclesResponse.json() as any[];
|
||||
console.log(` Found ${allVehicles.length} total vehicles`);
|
||||
|
||||
if (SAVE_ALL_VEHICLE_SNAPSHOTS && allVehicles.length > 0) {
|
||||
const captureTime = Date.now();
|
||||
const snapshots: VehicleFeedSnapshot[] = allVehicles.map((vehicle) => ({
|
||||
timestamp: captureTime,
|
||||
sourceTimestamp: toOptionalNumber(vehicle.positionModifiedAt),
|
||||
vehicleId: vehicle.identificationNumber?.toString() || undefined,
|
||||
inventoryNumber: vehicle.inventoryNumber?.toString() || undefined,
|
||||
latitude: toOptionalNumber(vehicle.positionLatitude),
|
||||
longitude: toOptionalNumber(vehicle.positionLongitude),
|
||||
speed: toOptionalNumber(vehicle.positionSpeed),
|
||||
bearing: toOptionalNumber(vehicle.positionBearing),
|
||||
status: toOptionalNumber(vehicle.status),
|
||||
rawJson: JSON.stringify(vehicle),
|
||||
}));
|
||||
|
||||
await logVehicleFeedSnapshot(snapshots);
|
||||
stats.vehicleSnapshots += snapshots.length;
|
||||
console.log(` [OK] Logged ${snapshots.length} full-vehicle snapshots`);
|
||||
}
|
||||
|
||||
// Fetch trip updates to match vehicles to routes
|
||||
const tripUpdatesResponse = await fetch(config.apiEndpoints.gtfsRtTripUpdates);
|
||||
|
||||
if (!tripUpdatesResponse.ok) {
|
||||
console.warn(' Could not fetch trip updates');
|
||||
return;
|
||||
}
|
||||
|
||||
const buffer = await tripUpdatesResponse.arrayBuffer();
|
||||
|
||||
if (buffer.byteLength === 0) {
|
||||
console.warn(' Empty trip updates feed');
|
||||
return;
|
||||
}
|
||||
|
||||
let feed;
|
||||
try {
|
||||
feed = GtfsRealtimeBindings.transit_realtime.FeedMessage.decode(
|
||||
new Uint8Array(buffer)
|
||||
);
|
||||
} catch (decodeError) {
|
||||
console.error(' Failed to decode GTFS-RT feed:', decodeError);
|
||||
stats.errors++;
|
||||
return;
|
||||
}
|
||||
|
||||
// Build map of vehicle IDs to route IDs from GTFS trip updates.
|
||||
const vehicleRouteMap = new Map<string, { routeId: string, tripId: string }>();
|
||||
const trackedRouteSet = new Set(TRACKED_ROUTES.map(r => r.id));
|
||||
|
||||
for (const entity of feed.entity) {
|
||||
if (!entity.tripUpdate) continue;
|
||||
|
||||
const tripUpdate = entity.tripUpdate;
|
||||
const routeId = tripUpdate.trip?.routeId;
|
||||
|
||||
// In "all positions" mode keep every route; otherwise keep tracked routes only.
|
||||
if (!routeId) continue;
|
||||
if (!SAVE_ALL_VEHICLE_POSITIONS && !trackedRouteSet.has(routeId)) continue;
|
||||
|
||||
// Get vehicle ID and label - store both as keys
|
||||
const vehicleId = tripUpdate.vehicle?.id;
|
||||
const vehicleLabel = tripUpdate.vehicle?.label;
|
||||
const tripInfo = {
|
||||
routeId: routeId,
|
||||
tripId: tripUpdate.trip.tripId || '',
|
||||
};
|
||||
|
||||
if (vehicleId) {
|
||||
vehicleRouteMap.set(vehicleId, tripInfo);
|
||||
}
|
||||
if (vehicleLabel && vehicleLabel !== vehicleId) {
|
||||
vehicleRouteMap.set(vehicleLabel, tripInfo);
|
||||
}
|
||||
}
|
||||
|
||||
console.log(` Matched ${vehicleRouteMap.size} vehicles to GTFS routes`);
|
||||
|
||||
// Debug: Show sample vehicle IDs from both sources
|
||||
if (vehicleRouteMap.size > 0) {
|
||||
const sampleGtfsIds = Array.from(vehicleRouteMap.keys()).slice(0, 5);
|
||||
console.log(` Sample GTFS-RT vehicle IDs: ${sampleGtfsIds.join(', ')}`);
|
||||
}
|
||||
|
||||
if (allVehicles.length > 0) {
|
||||
const sampleJsonIds = allVehicles.slice(0, 5).map(v =>
|
||||
`${v.identificationNumber || v.inventoryNumber || 'unknown'}`
|
||||
);
|
||||
console.log(` Sample JSON API vehicle IDs: ${sampleJsonIds.join(', ')}`);
|
||||
}
|
||||
|
||||
// Prepare vehicle positions.
|
||||
const positions: VehiclePosition[] = [];
|
||||
const now = Date.now();
|
||||
|
||||
for (const vehicle of allVehicles) {
|
||||
const identificationNumber = vehicle.identificationNumber;
|
||||
const inventoryNumber = vehicle.inventoryNumber?.toString();
|
||||
|
||||
if (!identificationNumber && !inventoryNumber) continue;
|
||||
|
||||
// Try multiple matching strategies.
|
||||
const routeInfo = vehicleRouteMap.get(identificationNumber) || vehicleRouteMap.get(inventoryNumber);
|
||||
const includeVehicle = SAVE_ALL_VEHICLE_POSITIONS || !!routeInfo;
|
||||
|
||||
if (!includeVehicle) continue;
|
||||
|
||||
positions.push({
|
||||
timestamp: now,
|
||||
vehicleId: identificationNumber || inventoryNumber,
|
||||
routeId: routeInfo?.routeId || 'UNKNOWN',
|
||||
tripId: routeInfo?.tripId,
|
||||
latitude: vehicle.positionLatitude,
|
||||
longitude: vehicle.positionLongitude,
|
||||
speed: vehicle.positionSpeed,
|
||||
bearing: vehicle.positionBearing,
|
||||
currentStatus: vehicle.status === 2 ? 'IN_TRANSIT_TO' : 'UNKNOWN',
|
||||
});
|
||||
}
|
||||
|
||||
// Log to database
|
||||
if (positions.length > 0) {
|
||||
await logVehiclePositions(positions);
|
||||
console.log(` [OK] Logged ${positions.length} vehicle positions${SAVE_ALL_VEHICLE_POSITIONS ? ' (all vehicles mode)' : ''}`);
|
||||
stats.vehiclesTracked += positions.length;
|
||||
} else {
|
||||
console.log(` [WARN] No vehicles found for tracked routes`);
|
||||
}
|
||||
|
||||
stats.cycles++;
|
||||
stats.lastUpdate = new Date();
|
||||
|
||||
} catch (error) {
|
||||
console.error(' [ERROR] Error tracking vehicles:', error);
|
||||
stats.errors++;
|
||||
}
|
||||
}
|
||||
|
||||
// Track arrivals at monitored stops for traffic pattern analysis
|
||||
async function trackArrivals() {
|
||||
try {
|
||||
const stopsToQuery = MONITORED_STOPS.slice(0, ARRIVAL_STOP_CAP);
|
||||
console.log(`[${new Date().toISOString()}] Tracking arrivals at ${stopsToQuery.length} key stops...`);
|
||||
|
||||
const now = new Date();
|
||||
let arrivalsLogged = 0;
|
||||
let arrivalsFound = 0;
|
||||
let duplicates = 0;
|
||||
const unmatchedRoutes = new Map<string, number>();
|
||||
const matchedRouteCounts = new Map<string, number>();
|
||||
|
||||
// Process each monitored stop
|
||||
for (const stopKey of stopsToQuery) {
|
||||
const stop = stops.get(stopKey) || stopsByCode.get(stopKey);
|
||||
if (!stop) continue;
|
||||
|
||||
const stopId = stop.stop_id;
|
||||
const stopCode = stop.stop_code;
|
||||
|
||||
try {
|
||||
// Fetch arrivals near this stop
|
||||
const radius = 50;
|
||||
const nearbyUrl = `${config.baseUrl}/transport/planner/stops/nearbyTimes?latitude=${stop.stop_lat}&longitude=${stop.stop_lon}&radius=${radius}`;
|
||||
|
||||
const response = await fetch(nearbyUrl);
|
||||
if (!response.ok) continue;
|
||||
|
||||
const nearbyData = await response.json() as any[];
|
||||
|
||||
// Process arrivals for tracked routes
|
||||
for (const stopData of nearbyData) {
|
||||
const apiStopId = stopData.id?.toString();
|
||||
if (apiStopId !== stopId && apiStopId !== stopCode) continue;
|
||||
|
||||
for (const pattern of stopData.patterns) {
|
||||
const routeKey = pattern.routeId?.toString();
|
||||
if (!routeKey) continue;
|
||||
|
||||
const normalizedRouteKey = normalizeRouteKey(routeKey);
|
||||
|
||||
// Only track our monitored routes (match by route_id or short name)
|
||||
const canonicalRouteId = trackedRouteIds.has(routeKey)
|
||||
? routeKey
|
||||
: routeIdByShortName.get(routeKey) || routeIdByShortNameNormalized.get(normalizedRouteKey);
|
||||
|
||||
if (!canonicalRouteId) {
|
||||
unmatchedRoutes.set(routeKey, (unmatchedRoutes.get(routeKey) || 0) + 1);
|
||||
continue;
|
||||
}
|
||||
|
||||
matchedRouteCounts.set(
|
||||
canonicalRouteId,
|
||||
(matchedRouteCounts.get(canonicalRouteId) || 0) + 1
|
||||
);
|
||||
|
||||
for (const stopTime of pattern.stopTimes) {
|
||||
const serviceDay = new Date(stopTime.serviceDay * 1000);
|
||||
const arrivalTime = new Date(serviceDay.getTime() + stopTime.realtimeArrival * 1000);
|
||||
const scheduledTime = new Date(serviceDay.getTime() + stopTime.scheduledArrival * 1000);
|
||||
|
||||
const minutesUntil = Math.floor((arrivalTime.getTime() - now.getTime()) / 60000);
|
||||
|
||||
// Log arrivals in the next 60 minutes
|
||||
if (minutesUntil >= -2 && minutesUntil <= 60) {
|
||||
arrivalsFound++;
|
||||
try {
|
||||
await logArrival({
|
||||
timestamp: scheduledTime.getTime(),
|
||||
stopId,
|
||||
routeId: canonicalRouteId,
|
||||
scheduledTime: scheduledTime.getTime(),
|
||||
predictedTime: arrivalTime.getTime(),
|
||||
delaySeconds: stopTime.arrivalDelay,
|
||||
isRealtime: stopTime.realtime,
|
||||
headsign: stopTime.headsign,
|
||||
});
|
||||
arrivalsLogged++;
|
||||
} catch (dbError) {
|
||||
duplicates++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Small delay to avoid overwhelming the API
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
|
||||
} catch (stopError) {
|
||||
// Skip this stop if there's an error
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (arrivalsFound === 0) {
|
||||
console.log(` [INFO] No arrivals found in time window`);
|
||||
} else if (arrivalsLogged > 0) {
|
||||
console.log(` [OK] Logged ${arrivalsLogged} new arrivals (${duplicates} duplicates skipped, ${arrivalsFound} total found)`);
|
||||
stats.arrivalsLogged += arrivalsLogged;
|
||||
} else {
|
||||
console.log(` [INFO] Found ${arrivalsFound} arrivals but all were duplicates (already in database)`);
|
||||
}
|
||||
|
||||
if (matchedRouteCounts.size > 0) {
|
||||
const matchedSummary = Array.from(matchedRouteCounts.entries())
|
||||
.sort((a, b) => b[1] - a[1])
|
||||
.slice(0, 10)
|
||||
.map(([route, count]) => `${route}:${count}`)
|
||||
.join(', ');
|
||||
console.log(` [DEBUG] Matched route IDs: ${matchedSummary}`);
|
||||
}
|
||||
|
||||
if (unmatchedRoutes.size > 0) {
|
||||
const topUnmatched = Array.from(unmatchedRoutes.entries())
|
||||
.sort((a, b) => b[1] - a[1])
|
||||
.slice(0, 8)
|
||||
.map(([route, count]) => `${route}:${count}`)
|
||||
.join(', ');
|
||||
console.log(` [DEBUG] Unmatched route keys: ${topUnmatched}`);
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
console.error(' [ERROR] Error tracking arrivals:', error);
|
||||
stats.errors++;
|
||||
}
|
||||
}
|
||||
|
||||
// Print statistics
|
||||
function printStats() {
|
||||
console.log('\n' + '='.repeat(60));
|
||||
console.log('Background Tracker Statistics');
|
||||
console.log('='.repeat(60));
|
||||
console.log(`Tracking ${TRACKED_ROUTES.length} routes:`);
|
||||
TRACKED_ROUTES.forEach(r => console.log(` - ${r.name} (ID: ${r.id})`));
|
||||
console.log(`Monitoring up to ${ARRIVAL_STOP_CAP} key stops for arrival data`);
|
||||
console.log(`\nCycles completed: ${stats.cycles}`);
|
||||
console.log(`Vehicle positions tracked: ${stats.vehiclesTracked}`);
|
||||
console.log(`Full vehicle snapshots logged: ${stats.vehicleSnapshots}`);
|
||||
console.log(`Arrival predictions logged: ${stats.arrivalsLogged}`);
|
||||
console.log(`Errors: ${stats.errors}`);
|
||||
console.log(`Last update: ${stats.lastUpdate.toLocaleString()}`);
|
||||
console.log(`Refresh interval: ${REFRESH_INTERVAL / 1000}s`);
|
||||
console.log('='.repeat(60) + '\n');
|
||||
}
|
||||
|
||||
// Main loop
|
||||
async function main() {
|
||||
console.log('\nStarting Background Bus Tracker for Popular Routes & Stops\n');
|
||||
|
||||
// Initialize database
|
||||
try {
|
||||
await initDatabase();
|
||||
} catch (error) {
|
||||
console.error('Failed to initialize database:', error);
|
||||
console.log('Continuing without data logging...');
|
||||
}
|
||||
|
||||
// Print tracked routes
|
||||
printStats();
|
||||
|
||||
// Initial fetch
|
||||
await trackVehicles();
|
||||
await trackArrivals();
|
||||
|
||||
// Set up recurring fetch
|
||||
setInterval(async () => {
|
||||
await trackVehicles();
|
||||
await trackArrivals();
|
||||
}, REFRESH_INTERVAL);
|
||||
|
||||
// Print stats every 5 minutes
|
||||
setInterval(() => {
|
||||
printStats();
|
||||
}, 5 * 60 * 1000);
|
||||
|
||||
// Handle graceful shutdown
|
||||
process.on('SIGINT', async () => {
|
||||
console.log('\n\nShutting down tracker...');
|
||||
printStats();
|
||||
await closeDatabase();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
process.on('SIGTERM', async () => {
|
||||
console.log('\n\nReceived SIGTERM, closing tracker...');
|
||||
printStats();
|
||||
await closeDatabase();
|
||||
process.exit(0);
|
||||
});
|
||||
}
|
||||
|
||||
main().catch(console.error);
|
||||
Reference in New Issue
Block a user