From 3882e3b0a66423ac7305ba16a55e5d1165d95b8c Mon Sep 17 00:00:00 2001 From: Andrej Mickov Date: Fri, 13 Mar 2026 00:01:14 +0100 Subject: [PATCH] Cleanup and rename --- .gitignore | 2 +- README.md | 18 +++++------ index.ts | 50 +++++++++++------------------- lib/{database.ts => storage.ts} | 54 +++++++++++++++++++++++++++------ package.json | 8 ++--- 5 files changed, 75 insertions(+), 57 deletions(-) rename lib/{database.ts => storage.ts} (93%) diff --git a/.gitignore b/.gitignore index 230f6ce..3be94d0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ node_modules/ .env dist/ -data/*.parquet +data/**/*.parquet *.log \ No newline at end of file diff --git a/README.md b/README.md index 4154aa7..2f2ef37 100644 --- a/README.md +++ b/README.md @@ -5,8 +5,8 @@ Real-time Skopje public transport tracking with Bun, GTFS/GTFS-RT ingestion, par ## What Is In This Repo - `bus-tracker-json.ts`: terminal tracker for one stop + one route. -- `background-tracker.ts`: continuous collector for multiple routes/stops. -- `lib/database.ts`: parquet write layer with rolling segments and optional S3 upload. +- `index.ts`: continuous collector for multiple routes/stops. +- `lib/storage.ts`: parquet write layer with rolling segments and optional S3 upload. - `lib/gtfs.ts`: GTFS CSV loading helpers. - `config.ts`: API base URL, defaults, and tracker timing. @@ -25,19 +25,19 @@ bun run typecheck Run single stop/route terminal tracker: ```bash -bun run tracker +bun run bus-tracker-json.ts ``` Run with custom stop and route IDs: ```bash -bun run tracker -- --stop 1571 --route 125 +bun run bus-tracker-json.ts --stop 1571 --route 125 ``` Run background collection pipeline: ```bash -bun run track +bun run start ``` ## Environment @@ -58,13 +58,11 @@ Key variables: ## Scripts -- `bun run start`: alias for the terminal tracker. -- `bun run tracker`: terminal tracker. -- `bun run track`: background collector. +- `bun run start`: collector entrypoint (`index.ts`). - `bun run typecheck`: TypeScript no-emit check. ## Notes -- Generated parquet files are intentionally ignored by git (`data/*.parquet`). -- The background tracker rotates segments and uploads each closed segment when S3 is enabled. +- Generated parquet files are intentionally ignored by git (`data/**/*.parquet`). +- The collector rotates segments and uploads each closed segment when S3 is enabled. - On process shutdown (`SIGINT`/`SIGTERM`), writers are flushed so the current segment is finalized. diff --git a/index.ts b/index.ts index 27ebb59..b9be4c3 100644 --- a/index.ts +++ b/index.ts @@ -1,21 +1,15 @@ -#!/usr/bin/env bun -/** - * Background tracker for popular bus routes in Skopje - * Continuously monitors GTFS-RT feeds and stores data as parquet segments - */ - import GtfsRealtimeBindings from 'gtfs-realtime-bindings'; import { config } from './config'; import { GtfsRoute, GtfsStop, loadGtfsRoutes, loadGtfsStops } from './lib/gtfs'; import { - initDatabase, + initStorage, logVehiclePositions, logVehicleFeedSnapshot, logArrival, - closeDatabase, + closeStorage, VehicleFeedSnapshot, VehiclePosition -} from './lib/database'; +} from './lib/storage'; // Popular routes to track const TRACKED_ROUTES = [ @@ -53,6 +47,7 @@ const REFRESH_INTERVAL = 30000; // 30 seconds const ARRIVAL_STOP_CAP = 150; // Max stops to query per cycle const SAVE_ALL_VEHICLE_SNAPSHOTS = (process.env.SAVE_ALL_VEHICLE_SNAPSHOTS ?? 'true').toLowerCase() === 'true'; const SAVE_ALL_VEHICLE_POSITIONS = (process.env.SAVE_ALL_VEHICLE_POSITIONS ?? 'true').toLowerCase() === 'true'; +const VERBOSE_TRACKER_LOGS = (process.env.VERBOSE_TRACKER_LOGS ?? 'false').toLowerCase() === 'true'; let stats = { cycles: 0, @@ -109,7 +104,9 @@ async function trackVehicles() { } const allVehicles = await vehiclesResponse.json() as any[]; - console.log(` Found ${allVehicles.length} total vehicles`); + if (VERBOSE_TRACKER_LOGS) { + console.log(` Found ${allVehicles.length} total vehicles`); + } if (SAVE_ALL_VEHICLE_SNAPSHOTS && allVehicles.length > 0) { const captureTime = Date.now(); @@ -187,19 +184,8 @@ async function trackVehicles() { } } - console.log(` Matched ${vehicleRouteMap.size} vehicles to GTFS routes`); - - // Debug: Show sample vehicle IDs from both sources - if (vehicleRouteMap.size > 0) { - const sampleGtfsIds = Array.from(vehicleRouteMap.keys()).slice(0, 5); - console.log(` Sample GTFS-RT vehicle IDs: ${sampleGtfsIds.join(', ')}`); - } - - if (allVehicles.length > 0) { - const sampleJsonIds = allVehicles.slice(0, 5).map(v => - `${v.identificationNumber || v.inventoryNumber || 'unknown'}` - ); - console.log(` Sample JSON API vehicle IDs: ${sampleJsonIds.join(', ')}`); + if (VERBOSE_TRACKER_LOGS) { + console.log(` Matched ${vehicleRouteMap.size} vehicles to GTFS routes`); } // Prepare vehicle positions. @@ -231,7 +217,7 @@ async function trackVehicles() { }); } - // Log to database + // Persist current cycle positions if (positions.length > 0) { await logVehiclePositions(positions); console.log(` [OK] Logged ${positions.length} vehicle positions${SAVE_ALL_VEHICLE_POSITIONS ? ' (all vehicles mode)' : ''}`); @@ -350,10 +336,10 @@ async function trackArrivals() { console.log(` [OK] Logged ${arrivalsLogged} new arrivals (${duplicates} duplicates skipped, ${arrivalsFound} total found)`); stats.arrivalsLogged += arrivalsLogged; } else { - console.log(` [INFO] Found ${arrivalsFound} arrivals but all were duplicates (already in database)`); + console.log(` [INFO] Found ${arrivalsFound} arrivals but all were duplicates (already recorded)`); } - if (matchedRouteCounts.size > 0) { + if (VERBOSE_TRACKER_LOGS && matchedRouteCounts.size > 0) { const matchedSummary = Array.from(matchedRouteCounts.entries()) .sort((a, b) => b[1] - a[1]) .slice(0, 10) @@ -362,7 +348,7 @@ async function trackArrivals() { console.log(` [DEBUG] Matched route IDs: ${matchedSummary}`); } - if (unmatchedRoutes.size > 0) { + if (VERBOSE_TRACKER_LOGS && unmatchedRoutes.size > 0) { const topUnmatched = Array.from(unmatchedRoutes.entries()) .sort((a, b) => b[1] - a[1]) .slice(0, 8) @@ -399,11 +385,11 @@ function printStats() { async function main() { console.log('\nStarting Background Bus Tracker for Popular Routes & Stops\n'); - // Initialize database + // Initialize storage try { - await initDatabase(); + await initStorage(); } catch (error) { - console.error('Failed to initialize database:', error); + console.error('Failed to initialize storage:', error); console.log('Continuing without data logging...'); } @@ -429,14 +415,14 @@ async function main() { process.on('SIGINT', async () => { console.log('\n\nShutting down tracker...'); printStats(); - await closeDatabase(); + await closeStorage(); process.exit(0); }); process.on('SIGTERM', async () => { console.log('\n\nReceived SIGTERM, closing tracker...'); printStats(); - await closeDatabase(); + await closeStorage(); process.exit(0); }); } diff --git a/lib/database.ts b/lib/storage.ts similarity index 93% rename from lib/database.ts rename to lib/storage.ts index 29ee757..5c25cda 100644 --- a/lib/database.ts +++ b/lib/storage.ts @@ -170,22 +170,41 @@ function segmentId(): string { return new Date().toISOString().replace(/[.:]/g, '-'); } +function partitionPathFromSegmentId(id: string): string { + const match = /^(\d{4})-(\d{2})-(\d{2})T/.exec(id); + if (match) { + const [, year, month, day] = match; + return path.join(`year=${year}`, `month=${month}`, `day=${day}`); + } + + const now = new Date(); + const year = now.getUTCFullYear().toString(); + const month = String(now.getUTCMonth() + 1).padStart(2, '0'); + const day = String(now.getUTCDate()).padStart(2, '0'); + return path.join(`year=${year}`, `month=${month}`, `day=${day}`); +} + +function buildPartitionedFilePath(prefix: string, id: string): string { + return path.join(DATA_DIR, partitionPathFromSegmentId(id), `${prefix}-${id}.parquet`); +} + function buildVehicleFile(id: string): string { - return path.join(DATA_DIR, `vehicle_positions-${id}.parquet`); + return buildPartitionedFilePath('vehicle_positions', id); } function buildArrivalFile(id: string): string { - return path.join(DATA_DIR, `arrival_records-${id}.parquet`); + return buildPartitionedFilePath('arrival_records', id); } function buildSnapshotFile(id: string): string { - return path.join(DATA_DIR, `vehicle_snapshots-${id}.parquet`); + return buildPartitionedFilePath('vehicle_snapshots', id); } function createFileWriter(filename: string): Writer { const writer = new ByteWriter() as unknown as Writer & { index: number }; const chunkSize = 1_000_000; + fsSync.mkdirSync(path.dirname(filename), { recursive: true }); fsSync.writeFileSync(filename, '', { flag: 'w' }); const flush = () => { @@ -299,7 +318,12 @@ async function uploadFileToObjectStorage(filePath: string): Promise { } const keyPrefix = OBJECT_STORAGE_PREFIX ? `${OBJECT_STORAGE_PREFIX}/` : ''; - const key = `${keyPrefix}${path.basename(filePath)}`; + const relativePath = path.relative(DATA_DIR, filePath); + const normalizedRelativePath = + !relativePath.startsWith('..') && !path.isAbsolute(relativePath) + ? relativePath.split(path.sep).join('/') + : path.basename(filePath); + const key = `${keyPrefix}${normalizedRelativePath}`; const body = await fs.readFile(filePath); @@ -448,10 +472,14 @@ async function ensureInitialized(): Promise { console.log(`[OK] Hyparquet storage ready at ${DATA_DIR}, roll=${ROLL_INTERVAL_MS / 60000}m${storageInfo}`); } -export async function initDatabase(): Promise { +export async function initStorage(): Promise { await ensureInitialized(); } +export async function initDatabase(): Promise { + await initStorage(); +} + export async function logVehiclePosition(position: VehiclePosition): Promise { await logVehiclePositions([position]); } @@ -566,7 +594,7 @@ export async function cleanupOldData(_daysToKeep: number = 90): Promise { console.log('cleanupOldData skipped: parquet segment mode'); } -export async function getDatabaseStats() { +export async function getStorageStats() { await ensureInitialized(); const [vehicleInfo, arrivalInfo, snapshotInfo] = await Promise.all([ @@ -581,9 +609,9 @@ export async function getDatabaseStats() { vehicleSnapshots: writes.vehicleSnapshots, oldestRecord: null, newestRecord: null, - dbType: 'hyparquet(rolling-write)', + storageType: 'hyparquet(rolling-write)', host: OBJECT_STORAGE_ENABLED ? 'object-storage+local' : 'local-filesystem', - database: DATA_DIR, + storagePath: DATA_DIR, rolling: { minutes: ROLL_INTERVAL_MS / 60000, currentSegmentId, @@ -616,7 +644,11 @@ export async function getDatabaseStats() { }; } -export async function closeDatabase(): Promise { +export async function getDatabaseStats() { + return getStorageStats(); +} + +export async function closeStorage(): Promise { if (!initialized) { return; } @@ -630,3 +662,7 @@ export async function closeDatabase(): Promise { await enqueueRotation('shutdown', false); initialized = false; } + +export async function closeDatabase(): Promise { + await closeStorage(); +} diff --git a/package.json b/package.json index 4a92eca..a2e013f 100644 --- a/package.json +++ b/package.json @@ -1,12 +1,10 @@ { - "name": "skopje-bus-tracker", + "name": "openjsp", "version": "1.0.0", "description": "Real-time bus tracking for Skopje public transport", - "main": "server.ts", + "main": "index.ts", "scripts": { - "start": "bun run bus-tracker-json.ts", - "tracker": "bun run bus-tracker-json.ts", - "track": "bun run background-tracker.ts", + "start": "bun run index.ts", "typecheck": "bunx tsc --noEmit" }, "keywords": [