669 lines
23 KiB
TypeScript
669 lines
23 KiB
TypeScript
import { promises as fs } from 'fs';
|
|
import * as fsSync from 'fs';
|
|
import * as path from 'path';
|
|
import { PutObjectCommand, S3Client } from '@aws-sdk/client-s3';
|
|
import { ByteWriter, ParquetWriter } from 'hyparquet-writer';
|
|
import type { SchemaElement } from 'hyparquet';
|
|
import type { Writer } from 'hyparquet-writer';
|
|
|
|
const env = process.env;
|
|
const DATA_DIR = env.PARQUET_DIR ?? path.join(process.cwd(), 'data');
|
|
const ROLL_MINUTES = Number.parseInt(env.PARQUET_ROLL_MINUTES ?? '5', 10);
|
|
const ROLL_INTERVAL_MS = Math.max(1, Number.isNaN(ROLL_MINUTES) ? 5 : ROLL_MINUTES) * 60_000;
|
|
|
|
const OBJECT_STORAGE_BUCKET = env.S3_BUCKET;
|
|
const OBJECT_STORAGE_REGION = env.S3_REGION ?? 'us-east-1';
|
|
const OBJECT_STORAGE_ENDPOINT = env.S3_ENDPOINT;
|
|
const OBJECT_STORAGE_PREFIX = (env.S3_PREFIX ?? 'parquet').replace(/^\/+|\/+$/g, '');
|
|
const OBJECT_STORAGE_FORCE_PATH_STYLE = (env.S3_FORCE_PATH_STYLE ?? 'true').toLowerCase() === 'true';
|
|
const OBJECT_STORAGE_ACCESS_KEY = env.S3_ACCESS_KEY_ID;
|
|
const OBJECT_STORAGE_SECRET_KEY = env.S3_SECRET_ACCESS_KEY;
|
|
const OBJECT_STORAGE_ENABLED =
|
|
(env.S3_ENABLED ?? 'false').toLowerCase() === 'true' &&
|
|
!!OBJECT_STORAGE_BUCKET &&
|
|
!!OBJECT_STORAGE_ACCESS_KEY &&
|
|
!!OBJECT_STORAGE_SECRET_KEY;
|
|
const DELETE_LOCAL_AFTER_UPLOAD = (env.S3_DELETE_LOCAL_AFTER_UPLOAD ?? 'false').toLowerCase() === 'true';
|
|
const OBJECT_STORAGE_UPLOAD_RETRIES = Math.max(1, Number.parseInt(env.S3_UPLOAD_RETRIES ?? '3', 10) || 3);
|
|
const OBJECT_STORAGE_UPLOAD_RETRY_BASE_MS = Math.max(100, Number.parseInt(env.S3_UPLOAD_RETRY_BASE_MS ?? '1000', 10) || 1000);
|
|
|
|
type BasicType = 'BOOLEAN' | 'INT32' | 'INT64' | 'FLOAT' | 'DOUBLE' | 'STRING' | 'TIMESTAMP';
|
|
|
|
type ColumnSource = {
|
|
name: string;
|
|
data: Array<string | number | bigint | boolean | Date | null>;
|
|
type: BasicType;
|
|
nullable?: boolean;
|
|
};
|
|
|
|
type IncrementalWriter = {
|
|
write: (options: { columnData: ColumnSource[]; rowGroupSize?: number | number[]; pageSize?: number }) => void;
|
|
finish: () => void;
|
|
};
|
|
|
|
type SegmentMeta = {
|
|
segmentId: string;
|
|
vehicleFile: string;
|
|
arrivalFile: string;
|
|
snapshotFile: string;
|
|
vehicleRows: number;
|
|
arrivalRows: number;
|
|
snapshotRows: number;
|
|
};
|
|
|
|
const VEHICLE_SCHEMA: SchemaElement[] = [
|
|
{ name: 'root', num_children: 9 },
|
|
{ name: 'timestamp', type: 'INT64', repetition_type: 'REQUIRED' },
|
|
{ name: 'vehicleId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'REQUIRED' },
|
|
{ name: 'routeId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'REQUIRED' },
|
|
{ name: 'tripId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
|
|
{ name: 'latitude', type: 'DOUBLE', repetition_type: 'REQUIRED' },
|
|
{ name: 'longitude', type: 'DOUBLE', repetition_type: 'REQUIRED' },
|
|
{ name: 'speed', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
|
|
{ name: 'bearing', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
|
|
{ name: 'currentStatus', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
|
|
];
|
|
|
|
const ARRIVAL_SCHEMA: SchemaElement[] = [
|
|
{ name: 'root', num_children: 10 },
|
|
{ name: 'timestamp', type: 'INT64', repetition_type: 'REQUIRED' },
|
|
{ name: 'stopId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'REQUIRED' },
|
|
{ name: 'routeId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'REQUIRED' },
|
|
{ name: 'tripId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
|
|
{ name: 'scheduledTime', type: 'INT64', repetition_type: 'REQUIRED' },
|
|
{ name: 'predictedTime', type: 'INT64', repetition_type: 'OPTIONAL' },
|
|
{ name: 'actualTime', type: 'INT64', repetition_type: 'OPTIONAL' },
|
|
{ name: 'delaySeconds', type: 'INT32', repetition_type: 'OPTIONAL' },
|
|
{ name: 'isRealtime', type: 'BOOLEAN', repetition_type: 'REQUIRED' },
|
|
{ name: 'headsign', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
|
|
];
|
|
|
|
const SNAPSHOT_SCHEMA: SchemaElement[] = [
|
|
{ name: 'root', num_children: 10 },
|
|
{ name: 'timestamp', type: 'INT64', repetition_type: 'REQUIRED' },
|
|
{ name: 'sourceTimestamp', type: 'INT64', repetition_type: 'OPTIONAL' },
|
|
{ name: 'vehicleId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
|
|
{ name: 'inventoryNumber', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
|
|
{ name: 'latitude', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
|
|
{ name: 'longitude', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
|
|
{ name: 'speed', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
|
|
{ name: 'bearing', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
|
|
{ name: 'status', type: 'INT32', repetition_type: 'OPTIONAL' },
|
|
{ name: 'rawJson', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'REQUIRED' },
|
|
];
|
|
|
|
let initialized = false;
|
|
let vehicleWriter: IncrementalWriter | undefined;
|
|
let arrivalWriter: IncrementalWriter | undefined;
|
|
let snapshotWriter: IncrementalWriter | undefined;
|
|
let vehicleWriteQueue: Promise<void> = Promise.resolve();
|
|
let arrivalWriteQueue: Promise<void> = Promise.resolve();
|
|
let snapshotWriteQueue: Promise<void> = Promise.resolve();
|
|
let rotationQueue: Promise<void> = Promise.resolve();
|
|
let rollTimer: ReturnType<typeof setInterval> | undefined;
|
|
|
|
let currentSegmentId = '';
|
|
let currentVehicleFile = '';
|
|
let currentArrivalFile = '';
|
|
let currentSnapshotFile = '';
|
|
let currentSegmentRows = { vehicle: 0, arrival: 0, snapshot: 0 };
|
|
|
|
const writes = {
|
|
vehiclePositions: 0,
|
|
arrivalRecords: 0,
|
|
vehicleSnapshots: 0,
|
|
};
|
|
|
|
const s3Client = OBJECT_STORAGE_ENABLED
|
|
? new S3Client({
|
|
region: OBJECT_STORAGE_REGION,
|
|
endpoint: OBJECT_STORAGE_ENDPOINT,
|
|
forcePathStyle: OBJECT_STORAGE_FORCE_PATH_STYLE,
|
|
requestChecksumCalculation: 'WHEN_REQUIRED',
|
|
responseChecksumValidation: 'WHEN_REQUIRED',
|
|
credentials: {
|
|
accessKeyId: OBJECT_STORAGE_ACCESS_KEY as string,
|
|
secretAccessKey: OBJECT_STORAGE_SECRET_KEY as string,
|
|
},
|
|
})
|
|
: undefined;
|
|
|
|
export interface VehiclePosition {
|
|
timestamp: number;
|
|
vehicleId: string;
|
|
routeId: string;
|
|
tripId?: string;
|
|
latitude: number;
|
|
longitude: number;
|
|
speed?: number;
|
|
bearing?: number;
|
|
currentStatus?: string;
|
|
}
|
|
|
|
export interface ArrivalRecord {
|
|
timestamp: number;
|
|
stopId: string;
|
|
routeId: string;
|
|
tripId?: string;
|
|
scheduledTime: number;
|
|
predictedTime?: number;
|
|
actualTime?: number;
|
|
delaySeconds?: number;
|
|
isRealtime: boolean;
|
|
headsign?: string;
|
|
}
|
|
|
|
export interface VehicleFeedSnapshot {
|
|
timestamp: number;
|
|
sourceTimestamp?: number;
|
|
vehicleId?: string;
|
|
inventoryNumber?: string;
|
|
latitude?: number;
|
|
longitude?: number;
|
|
speed?: number;
|
|
bearing?: number;
|
|
status?: number;
|
|
rawJson: string;
|
|
}
|
|
|
|
function segmentId(): string {
|
|
return new Date().toISOString().replace(/[.:]/g, '-');
|
|
}
|
|
|
|
function partitionPathFromSegmentId(id: string): string {
|
|
const match = /^(\d{4})-(\d{2})-(\d{2})T/.exec(id);
|
|
if (match) {
|
|
const [, year, month, day] = match;
|
|
return path.join(`year=${year}`, `month=${month}`, `day=${day}`);
|
|
}
|
|
|
|
const now = new Date();
|
|
const year = now.getUTCFullYear().toString();
|
|
const month = String(now.getUTCMonth() + 1).padStart(2, '0');
|
|
const day = String(now.getUTCDate()).padStart(2, '0');
|
|
return path.join(`year=${year}`, `month=${month}`, `day=${day}`);
|
|
}
|
|
|
|
function buildPartitionedFilePath(prefix: string, id: string): string {
|
|
return path.join(DATA_DIR, prefix, partitionPathFromSegmentId(id), `${prefix}-${id}.parquet`);
|
|
}
|
|
|
|
function buildVehicleFile(id: string): string {
|
|
return buildPartitionedFilePath('vehicle_positions', id);
|
|
}
|
|
|
|
function buildArrivalFile(id: string): string {
|
|
return buildPartitionedFilePath('arrival_records', id);
|
|
}
|
|
|
|
function buildSnapshotFile(id: string): string {
|
|
return buildPartitionedFilePath('vehicle_snapshots', id);
|
|
}
|
|
|
|
function createFileWriter(filename: string): Writer {
|
|
const writer = new ByteWriter() as unknown as Writer & { index: number };
|
|
const chunkSize = 1_000_000;
|
|
|
|
fsSync.mkdirSync(path.dirname(filename), { recursive: true });
|
|
fsSync.writeFileSync(filename, '', { flag: 'w' });
|
|
|
|
const flush = () => {
|
|
const chunk = new Uint8Array(writer.buffer, 0, writer.index);
|
|
fsSync.writeFileSync(filename, chunk, { flag: 'a' });
|
|
writer.index = 0;
|
|
};
|
|
|
|
writer.ensure = (size: number) => {
|
|
if (writer.index > chunkSize) {
|
|
flush();
|
|
}
|
|
if (writer.index + size > writer.buffer.byteLength) {
|
|
const newSize = Math.max(writer.buffer.byteLength * 2, writer.index + size);
|
|
const newBuffer = new ArrayBuffer(newSize);
|
|
new Uint8Array(newBuffer).set(new Uint8Array(writer.buffer));
|
|
writer.buffer = newBuffer;
|
|
writer.view = new DataView(writer.buffer);
|
|
}
|
|
};
|
|
|
|
writer.getBuffer = () => {
|
|
throw new Error('getBuffer not supported for file writer');
|
|
};
|
|
writer.getBytes = () => {
|
|
throw new Error('getBytes not supported for file writer');
|
|
};
|
|
writer.finish = () => {
|
|
flush();
|
|
};
|
|
|
|
return writer;
|
|
}
|
|
|
|
function createParquetWriter(filename: string, schema: SchemaElement[]): IncrementalWriter {
|
|
return new ParquetWriter({ writer: createFileWriter(filename), schema }) as unknown as IncrementalWriter;
|
|
}
|
|
|
|
function queuedWrite(queue: Promise<void>, task: () => Promise<void>): Promise<void> {
|
|
return queue.then(task).catch((error) => {
|
|
console.error('Parquet write failed:', error);
|
|
});
|
|
}
|
|
|
|
function toInt64(value: number): bigint {
|
|
return BigInt(Math.trunc(value));
|
|
}
|
|
|
|
function toNullableInt64(value: number | undefined): bigint | null {
|
|
return value == null ? null : toInt64(value);
|
|
}
|
|
|
|
function toVehicleColumns(positions: VehiclePosition[]): ColumnSource[] {
|
|
return [
|
|
{ name: 'timestamp', data: positions.map((p) => toInt64(p.timestamp)), type: 'INT64', nullable: false },
|
|
{ name: 'vehicleId', data: positions.map((p) => p.vehicleId), type: 'STRING', nullable: false },
|
|
{ name: 'routeId', data: positions.map((p) => p.routeId), type: 'STRING', nullable: false },
|
|
{ name: 'tripId', data: positions.map((p) => p.tripId ?? null), type: 'STRING' },
|
|
{ name: 'latitude', data: positions.map((p) => p.latitude), type: 'DOUBLE', nullable: false },
|
|
{ name: 'longitude', data: positions.map((p) => p.longitude), type: 'DOUBLE', nullable: false },
|
|
{ name: 'speed', data: positions.map((p) => p.speed ?? null), type: 'DOUBLE' },
|
|
{ name: 'bearing', data: positions.map((p) => p.bearing ?? null), type: 'DOUBLE' },
|
|
{ name: 'currentStatus', data: positions.map((p) => p.currentStatus ?? null), type: 'STRING' },
|
|
];
|
|
}
|
|
|
|
function toArrivalColumns(arrivals: ArrivalRecord[]): ColumnSource[] {
|
|
return [
|
|
{ name: 'timestamp', data: arrivals.map((a) => toInt64(a.timestamp)), type: 'INT64', nullable: false },
|
|
{ name: 'stopId', data: arrivals.map((a) => a.stopId), type: 'STRING', nullable: false },
|
|
{ name: 'routeId', data: arrivals.map((a) => a.routeId), type: 'STRING', nullable: false },
|
|
{ name: 'tripId', data: arrivals.map((a) => a.tripId ?? null), type: 'STRING' },
|
|
{ name: 'scheduledTime', data: arrivals.map((a) => toInt64(a.scheduledTime)), type: 'INT64', nullable: false },
|
|
{ name: 'predictedTime', data: arrivals.map((a) => toNullableInt64(a.predictedTime)), type: 'INT64' },
|
|
{ name: 'actualTime', data: arrivals.map((a) => toNullableInt64(a.actualTime)), type: 'INT64' },
|
|
{ name: 'delaySeconds', data: arrivals.map((a) => (a.delaySeconds == null ? null : Math.trunc(a.delaySeconds))), type: 'INT32' },
|
|
{ name: 'isRealtime', data: arrivals.map((a) => a.isRealtime), type: 'BOOLEAN', nullable: false },
|
|
{ name: 'headsign', data: arrivals.map((a) => a.headsign ?? null), type: 'STRING' },
|
|
];
|
|
}
|
|
|
|
function toSnapshotColumns(snapshots: VehicleFeedSnapshot[]): ColumnSource[] {
|
|
return [
|
|
{ name: 'timestamp', data: snapshots.map((s) => toInt64(s.timestamp)), type: 'INT64', nullable: false },
|
|
{ name: 'sourceTimestamp', data: snapshots.map((s) => toNullableInt64(s.sourceTimestamp)), type: 'INT64' },
|
|
{ name: 'vehicleId', data: snapshots.map((s) => s.vehicleId ?? null), type: 'STRING' },
|
|
{ name: 'inventoryNumber', data: snapshots.map((s) => s.inventoryNumber ?? null), type: 'STRING' },
|
|
{ name: 'latitude', data: snapshots.map((s) => (s.latitude == null ? null : s.latitude)), type: 'DOUBLE' },
|
|
{ name: 'longitude', data: snapshots.map((s) => (s.longitude == null ? null : s.longitude)), type: 'DOUBLE' },
|
|
{ name: 'speed', data: snapshots.map((s) => (s.speed == null ? null : s.speed)), type: 'DOUBLE' },
|
|
{ name: 'bearing', data: snapshots.map((s) => (s.bearing == null ? null : s.bearing)), type: 'DOUBLE' },
|
|
{ name: 'status', data: snapshots.map((s) => (s.status == null ? null : Math.trunc(s.status))), type: 'INT32' },
|
|
{ name: 'rawJson', data: snapshots.map((s) => s.rawJson), type: 'STRING', nullable: false },
|
|
];
|
|
}
|
|
|
|
function startNewSegment(id: string): void {
|
|
currentSegmentId = id;
|
|
currentVehicleFile = buildVehicleFile(id);
|
|
currentArrivalFile = buildArrivalFile(id);
|
|
currentSnapshotFile = buildSnapshotFile(id);
|
|
currentSegmentRows = { vehicle: 0, arrival: 0, snapshot: 0 };
|
|
vehicleWriter = createParquetWriter(currentVehicleFile, VEHICLE_SCHEMA);
|
|
arrivalWriter = createParquetWriter(currentArrivalFile, ARRIVAL_SCHEMA);
|
|
snapshotWriter = createParquetWriter(currentSnapshotFile, SNAPSHOT_SCHEMA);
|
|
}
|
|
|
|
async function uploadFileToObjectStorage(filePath: string): Promise<boolean> {
|
|
if (!OBJECT_STORAGE_ENABLED || !s3Client || !OBJECT_STORAGE_BUCKET) {
|
|
return true;
|
|
}
|
|
|
|
const keyPrefix = OBJECT_STORAGE_PREFIX ? `${OBJECT_STORAGE_PREFIX}/` : '';
|
|
const relativePath = path.relative(DATA_DIR, filePath);
|
|
const normalizedRelativePath =
|
|
!relativePath.startsWith('..') && !path.isAbsolute(relativePath)
|
|
? relativePath.split(path.sep).join('/')
|
|
: path.basename(filePath);
|
|
const key = `${keyPrefix}${normalizedRelativePath}`;
|
|
|
|
const body = await fs.readFile(filePath);
|
|
|
|
for (let attempt = 1; attempt <= OBJECT_STORAGE_UPLOAD_RETRIES; attempt += 1) {
|
|
try {
|
|
await s3Client.send(
|
|
new PutObjectCommand({
|
|
Bucket: OBJECT_STORAGE_BUCKET,
|
|
Key: key,
|
|
Body: body,
|
|
ContentLength: body.byteLength,
|
|
ContentType: 'application/octet-stream',
|
|
})
|
|
);
|
|
|
|
console.log(`[OK] Uploaded parquet to object storage: s3://${OBJECT_STORAGE_BUCKET}/${key}`);
|
|
return true;
|
|
} catch (error) {
|
|
const status = (error as { $metadata?: { httpStatusCode?: number } })?.$metadata?.httpStatusCode;
|
|
const shouldRetry = !!status && status >= 500;
|
|
const canRetry = shouldRetry && attempt < OBJECT_STORAGE_UPLOAD_RETRIES;
|
|
|
|
console.error(
|
|
`[ERROR] Upload failed for ${filePath} (attempt ${attempt}/${OBJECT_STORAGE_UPLOAD_RETRIES}):`,
|
|
error
|
|
);
|
|
|
|
if (!canRetry) {
|
|
break;
|
|
}
|
|
|
|
const backoffMs = OBJECT_STORAGE_UPLOAD_RETRY_BASE_MS * attempt;
|
|
await new Promise((resolve) => setTimeout(resolve, backoffMs));
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
async function deleteLocalFile(filePath: string): Promise<void> {
|
|
await fs.unlink(filePath).catch((error: unknown) => {
|
|
const code = (error as { code?: string }).code;
|
|
if (code !== 'ENOENT') {
|
|
console.warn(`[WARN] Failed to delete local parquet file ${filePath}:`, error);
|
|
}
|
|
});
|
|
}
|
|
|
|
async function finalizeSegment(reason: string, reopen: boolean): Promise<void> {
|
|
if (!initialized) {
|
|
return;
|
|
}
|
|
|
|
await Promise.all([vehicleWriteQueue, arrivalWriteQueue, snapshotWriteQueue]);
|
|
|
|
const meta: SegmentMeta = {
|
|
segmentId: currentSegmentId,
|
|
vehicleFile: currentVehicleFile,
|
|
arrivalFile: currentArrivalFile,
|
|
snapshotFile: currentSnapshotFile,
|
|
vehicleRows: currentSegmentRows.vehicle,
|
|
arrivalRows: currentSegmentRows.arrival,
|
|
snapshotRows: currentSegmentRows.snapshot,
|
|
};
|
|
|
|
if (vehicleWriter) {
|
|
vehicleWriter.finish();
|
|
vehicleWriter = undefined;
|
|
}
|
|
if (arrivalWriter) {
|
|
arrivalWriter.finish();
|
|
arrivalWriter = undefined;
|
|
}
|
|
if (snapshotWriter) {
|
|
snapshotWriter.finish();
|
|
snapshotWriter = undefined;
|
|
}
|
|
|
|
const hasRows = meta.vehicleRows > 0 || meta.arrivalRows > 0 || meta.snapshotRows > 0;
|
|
|
|
try {
|
|
if (hasRows) {
|
|
const uploadResults = await Promise.all([
|
|
uploadFileToObjectStorage(meta.vehicleFile),
|
|
uploadFileToObjectStorage(meta.arrivalFile),
|
|
uploadFileToObjectStorage(meta.snapshotFile),
|
|
]);
|
|
const allUploaded = uploadResults.every(Boolean);
|
|
|
|
if (OBJECT_STORAGE_ENABLED && DELETE_LOCAL_AFTER_UPLOAD && allUploaded) {
|
|
await Promise.all([
|
|
deleteLocalFile(meta.vehicleFile),
|
|
deleteLocalFile(meta.arrivalFile),
|
|
deleteLocalFile(meta.snapshotFile),
|
|
]);
|
|
console.log(`[OK] Deleted local parquet segment after upload: ${meta.segmentId}`);
|
|
} else if (OBJECT_STORAGE_ENABLED && DELETE_LOCAL_AFTER_UPLOAD && !allUploaded) {
|
|
console.warn(`[WARN] Kept local parquet segment due to upload errors: ${meta.segmentId}`);
|
|
}
|
|
} else if (OBJECT_STORAGE_ENABLED && DELETE_LOCAL_AFTER_UPLOAD) {
|
|
await Promise.all([
|
|
deleteLocalFile(meta.vehicleFile),
|
|
deleteLocalFile(meta.arrivalFile),
|
|
deleteLocalFile(meta.snapshotFile),
|
|
]);
|
|
}
|
|
} finally {
|
|
if (reopen) {
|
|
startNewSegment(segmentId());
|
|
console.log(`[OK] Rotated parquet segment (${reason}) from ${meta.segmentId}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
function enqueueRotation(reason: string, reopen: boolean): Promise<void> {
|
|
rotationQueue = rotationQueue.then(() => finalizeSegment(reason, reopen)).catch((error) => {
|
|
console.error('Parquet rotation failed:', error);
|
|
});
|
|
return rotationQueue;
|
|
}
|
|
|
|
function startRollTimer(): void {
|
|
if (rollTimer) {
|
|
clearInterval(rollTimer);
|
|
}
|
|
|
|
rollTimer = setInterval(() => {
|
|
void enqueueRotation('interval', true);
|
|
}, ROLL_INTERVAL_MS);
|
|
}
|
|
|
|
async function ensureInitialized(): Promise<void> {
|
|
if (initialized) {
|
|
return;
|
|
}
|
|
|
|
await fs.mkdir(DATA_DIR, { recursive: true });
|
|
startNewSegment(segmentId());
|
|
startRollTimer();
|
|
initialized = true;
|
|
|
|
const storageInfo = OBJECT_STORAGE_ENABLED
|
|
? `, object storage enabled (bucket=${OBJECT_STORAGE_BUCKET}, deleteLocal=${DELETE_LOCAL_AFTER_UPLOAD})`
|
|
: ', object storage disabled';
|
|
|
|
console.log(`[OK] Hyparquet storage ready at ${DATA_DIR}, roll=${ROLL_INTERVAL_MS / 60000}m${storageInfo}`);
|
|
}
|
|
|
|
export async function initStorage(): Promise<void> {
|
|
await ensureInitialized();
|
|
}
|
|
|
|
export async function initDatabase(): Promise<void> {
|
|
await initStorage();
|
|
}
|
|
|
|
export async function logVehiclePosition(position: VehiclePosition): Promise<void> {
|
|
await logVehiclePositions([position]);
|
|
}
|
|
|
|
export async function logVehiclePositions(positions: VehiclePosition[]): Promise<void> {
|
|
if (positions.length === 0) {
|
|
return;
|
|
}
|
|
|
|
await ensureInitialized();
|
|
await rotationQueue;
|
|
|
|
vehicleWriteQueue = queuedWrite(vehicleWriteQueue, async () => {
|
|
const writer = vehicleWriter;
|
|
if (!writer) {
|
|
return;
|
|
}
|
|
|
|
writer.write({
|
|
columnData: toVehicleColumns(positions),
|
|
rowGroupSize: positions.length,
|
|
});
|
|
|
|
currentSegmentRows.vehicle += positions.length;
|
|
writes.vehiclePositions += positions.length;
|
|
});
|
|
|
|
await vehicleWriteQueue;
|
|
}
|
|
|
|
export async function logArrival(arrival: ArrivalRecord): Promise<void> {
|
|
await ensureInitialized();
|
|
await rotationQueue;
|
|
|
|
arrivalWriteQueue = queuedWrite(arrivalWriteQueue, async () => {
|
|
const writer = arrivalWriter;
|
|
if (!writer) {
|
|
return;
|
|
}
|
|
|
|
writer.write({
|
|
columnData: toArrivalColumns([arrival]),
|
|
rowGroupSize: 1,
|
|
});
|
|
|
|
currentSegmentRows.arrival += 1;
|
|
writes.arrivalRecords += 1;
|
|
});
|
|
|
|
await arrivalWriteQueue;
|
|
}
|
|
|
|
export async function logVehicleFeedSnapshot(snapshots: VehicleFeedSnapshot[]): Promise<void> {
|
|
if (snapshots.length === 0) {
|
|
return;
|
|
}
|
|
|
|
await ensureInitialized();
|
|
await rotationQueue;
|
|
|
|
snapshotWriteQueue = queuedWrite(snapshotWriteQueue, async () => {
|
|
const writer = snapshotWriter;
|
|
if (!writer) {
|
|
return;
|
|
}
|
|
|
|
writer.write({
|
|
columnData: toSnapshotColumns(snapshots),
|
|
rowGroupSize: snapshots.length,
|
|
});
|
|
|
|
currentSegmentRows.snapshot += snapshots.length;
|
|
writes.vehicleSnapshots += snapshots.length;
|
|
});
|
|
|
|
await snapshotWriteQueue;
|
|
}
|
|
|
|
export async function getVehicleHistory(_vehicleId: string, _startTime: number, _endTime: number) {
|
|
return [];
|
|
}
|
|
|
|
export async function getRouteVehiclePositions(_routeId: string, _startTime: number, _endTime: number) {
|
|
return [];
|
|
}
|
|
|
|
export async function getStopArrivalHistory(_stopId: string, _routeId: string, _startTime: number, _endTime: number) {
|
|
return [];
|
|
}
|
|
|
|
export async function getRouteDelayStats(_routeId: string, _hours: number = 24) {
|
|
return {
|
|
total_arrivals: 0,
|
|
avg_delay: null,
|
|
min_delay: null,
|
|
max_delay: null,
|
|
on_time_count: 0,
|
|
late_count: 0,
|
|
early_count: 0,
|
|
};
|
|
}
|
|
|
|
export async function getStopDelayStats(_stopId: string, _hours: number = 24) {
|
|
return [];
|
|
}
|
|
|
|
export async function getRouteHourlyPattern(_routeId: string, _days: number = 7) {
|
|
return [];
|
|
}
|
|
|
|
export async function cleanupOldData(_daysToKeep: number = 90): Promise<void> {
|
|
console.log('cleanupOldData skipped: parquet segment mode');
|
|
}
|
|
|
|
export async function getStorageStats() {
|
|
await ensureInitialized();
|
|
|
|
const [vehicleInfo, arrivalInfo, snapshotInfo] = await Promise.all([
|
|
fs.stat(currentVehicleFile).catch(() => null),
|
|
fs.stat(currentArrivalFile).catch(() => null),
|
|
fs.stat(currentSnapshotFile).catch(() => null),
|
|
]);
|
|
|
|
return {
|
|
vehiclePositions: writes.vehiclePositions,
|
|
arrivalRecords: writes.arrivalRecords,
|
|
vehicleSnapshots: writes.vehicleSnapshots,
|
|
oldestRecord: null,
|
|
newestRecord: null,
|
|
storageType: 'hyparquet(rolling-write)',
|
|
host: OBJECT_STORAGE_ENABLED ? 'object-storage+local' : 'local-filesystem',
|
|
storagePath: DATA_DIR,
|
|
rolling: {
|
|
minutes: ROLL_INTERVAL_MS / 60000,
|
|
currentSegmentId,
|
|
},
|
|
objectStorage: {
|
|
enabled: OBJECT_STORAGE_ENABLED,
|
|
bucket: OBJECT_STORAGE_BUCKET ?? null,
|
|
region: OBJECT_STORAGE_REGION,
|
|
endpoint: OBJECT_STORAGE_ENDPOINT ?? null,
|
|
prefix: OBJECT_STORAGE_PREFIX,
|
|
deleteLocalAfterUpload: DELETE_LOCAL_AFTER_UPLOAD,
|
|
},
|
|
files: {
|
|
vehicle: {
|
|
path: currentVehicleFile,
|
|
bytes: vehicleInfo?.size ?? 0,
|
|
modifiedAt: vehicleInfo?.mtime ?? null,
|
|
},
|
|
arrivals: {
|
|
path: currentArrivalFile,
|
|
bytes: arrivalInfo?.size ?? 0,
|
|
modifiedAt: arrivalInfo?.mtime ?? null,
|
|
},
|
|
snapshots: {
|
|
path: currentSnapshotFile,
|
|
bytes: snapshotInfo?.size ?? 0,
|
|
modifiedAt: snapshotInfo?.mtime ?? null,
|
|
},
|
|
},
|
|
};
|
|
}
|
|
|
|
export async function getDatabaseStats() {
|
|
return getStorageStats();
|
|
}
|
|
|
|
export async function closeStorage(): Promise<void> {
|
|
if (!initialized) {
|
|
return;
|
|
}
|
|
|
|
if (rollTimer) {
|
|
clearInterval(rollTimer);
|
|
rollTimer = undefined;
|
|
}
|
|
|
|
await Promise.all([vehicleWriteQueue, arrivalWriteQueue, snapshotWriteQueue]);
|
|
await enqueueRotation('shutdown', false);
|
|
initialized = false;
|
|
}
|
|
|
|
export async function closeDatabase(): Promise<void> {
|
|
await closeStorage();
|
|
}
|