Files
openjsp/lib/storage.ts
2026-03-13 01:16:35 +01:00

669 lines
23 KiB
TypeScript

import { promises as fs } from 'fs';
import * as fsSync from 'fs';
import * as path from 'path';
import { PutObjectCommand, S3Client } from '@aws-sdk/client-s3';
import { ByteWriter, ParquetWriter } from 'hyparquet-writer';
import type { SchemaElement } from 'hyparquet';
import type { Writer } from 'hyparquet-writer';
const env = process.env;
const DATA_DIR = env.PARQUET_DIR ?? path.join(process.cwd(), 'data');
const ROLL_MINUTES = Number.parseInt(env.PARQUET_ROLL_MINUTES ?? '5', 10);
const ROLL_INTERVAL_MS = Math.max(1, Number.isNaN(ROLL_MINUTES) ? 5 : ROLL_MINUTES) * 60_000;
const OBJECT_STORAGE_BUCKET = env.S3_BUCKET;
const OBJECT_STORAGE_REGION = env.S3_REGION ?? 'us-east-1';
const OBJECT_STORAGE_ENDPOINT = env.S3_ENDPOINT;
const OBJECT_STORAGE_PREFIX = (env.S3_PREFIX ?? 'parquet').replace(/^\/+|\/+$/g, '');
const OBJECT_STORAGE_FORCE_PATH_STYLE = (env.S3_FORCE_PATH_STYLE ?? 'true').toLowerCase() === 'true';
const OBJECT_STORAGE_ACCESS_KEY = env.S3_ACCESS_KEY_ID;
const OBJECT_STORAGE_SECRET_KEY = env.S3_SECRET_ACCESS_KEY;
const OBJECT_STORAGE_ENABLED =
(env.S3_ENABLED ?? 'false').toLowerCase() === 'true' &&
!!OBJECT_STORAGE_BUCKET &&
!!OBJECT_STORAGE_ACCESS_KEY &&
!!OBJECT_STORAGE_SECRET_KEY;
const DELETE_LOCAL_AFTER_UPLOAD = (env.S3_DELETE_LOCAL_AFTER_UPLOAD ?? 'false').toLowerCase() === 'true';
const OBJECT_STORAGE_UPLOAD_RETRIES = Math.max(1, Number.parseInt(env.S3_UPLOAD_RETRIES ?? '3', 10) || 3);
const OBJECT_STORAGE_UPLOAD_RETRY_BASE_MS = Math.max(100, Number.parseInt(env.S3_UPLOAD_RETRY_BASE_MS ?? '1000', 10) || 1000);
type BasicType = 'BOOLEAN' | 'INT32' | 'INT64' | 'FLOAT' | 'DOUBLE' | 'STRING' | 'TIMESTAMP';
type ColumnSource = {
name: string;
data: Array<string | number | bigint | boolean | Date | null>;
type: BasicType;
nullable?: boolean;
};
type IncrementalWriter = {
write: (options: { columnData: ColumnSource[]; rowGroupSize?: number | number[]; pageSize?: number }) => void;
finish: () => void;
};
type SegmentMeta = {
segmentId: string;
vehicleFile: string;
arrivalFile: string;
snapshotFile: string;
vehicleRows: number;
arrivalRows: number;
snapshotRows: number;
};
const VEHICLE_SCHEMA: SchemaElement[] = [
{ name: 'root', num_children: 9 },
{ name: 'timestamp', type: 'INT64', repetition_type: 'REQUIRED' },
{ name: 'vehicleId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'REQUIRED' },
{ name: 'routeId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'REQUIRED' },
{ name: 'tripId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
{ name: 'latitude', type: 'DOUBLE', repetition_type: 'REQUIRED' },
{ name: 'longitude', type: 'DOUBLE', repetition_type: 'REQUIRED' },
{ name: 'speed', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
{ name: 'bearing', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
{ name: 'currentStatus', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
];
const ARRIVAL_SCHEMA: SchemaElement[] = [
{ name: 'root', num_children: 10 },
{ name: 'timestamp', type: 'INT64', repetition_type: 'REQUIRED' },
{ name: 'stopId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'REQUIRED' },
{ name: 'routeId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'REQUIRED' },
{ name: 'tripId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
{ name: 'scheduledTime', type: 'INT64', repetition_type: 'REQUIRED' },
{ name: 'predictedTime', type: 'INT64', repetition_type: 'OPTIONAL' },
{ name: 'actualTime', type: 'INT64', repetition_type: 'OPTIONAL' },
{ name: 'delaySeconds', type: 'INT32', repetition_type: 'OPTIONAL' },
{ name: 'isRealtime', type: 'BOOLEAN', repetition_type: 'REQUIRED' },
{ name: 'headsign', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
];
const SNAPSHOT_SCHEMA: SchemaElement[] = [
{ name: 'root', num_children: 10 },
{ name: 'timestamp', type: 'INT64', repetition_type: 'REQUIRED' },
{ name: 'sourceTimestamp', type: 'INT64', repetition_type: 'OPTIONAL' },
{ name: 'vehicleId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
{ name: 'inventoryNumber', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
{ name: 'latitude', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
{ name: 'longitude', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
{ name: 'speed', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
{ name: 'bearing', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
{ name: 'status', type: 'INT32', repetition_type: 'OPTIONAL' },
{ name: 'rawJson', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'REQUIRED' },
];
let initialized = false;
let vehicleWriter: IncrementalWriter | undefined;
let arrivalWriter: IncrementalWriter | undefined;
let snapshotWriter: IncrementalWriter | undefined;
let vehicleWriteQueue: Promise<void> = Promise.resolve();
let arrivalWriteQueue: Promise<void> = Promise.resolve();
let snapshotWriteQueue: Promise<void> = Promise.resolve();
let rotationQueue: Promise<void> = Promise.resolve();
let rollTimer: ReturnType<typeof setInterval> | undefined;
let currentSegmentId = '';
let currentVehicleFile = '';
let currentArrivalFile = '';
let currentSnapshotFile = '';
let currentSegmentRows = { vehicle: 0, arrival: 0, snapshot: 0 };
const writes = {
vehiclePositions: 0,
arrivalRecords: 0,
vehicleSnapshots: 0,
};
const s3Client = OBJECT_STORAGE_ENABLED
? new S3Client({
region: OBJECT_STORAGE_REGION,
endpoint: OBJECT_STORAGE_ENDPOINT,
forcePathStyle: OBJECT_STORAGE_FORCE_PATH_STYLE,
requestChecksumCalculation: 'WHEN_REQUIRED',
responseChecksumValidation: 'WHEN_REQUIRED',
credentials: {
accessKeyId: OBJECT_STORAGE_ACCESS_KEY as string,
secretAccessKey: OBJECT_STORAGE_SECRET_KEY as string,
},
})
: undefined;
export interface VehiclePosition {
timestamp: number;
vehicleId: string;
routeId: string;
tripId?: string;
latitude: number;
longitude: number;
speed?: number;
bearing?: number;
currentStatus?: string;
}
export interface ArrivalRecord {
timestamp: number;
stopId: string;
routeId: string;
tripId?: string;
scheduledTime: number;
predictedTime?: number;
actualTime?: number;
delaySeconds?: number;
isRealtime: boolean;
headsign?: string;
}
export interface VehicleFeedSnapshot {
timestamp: number;
sourceTimestamp?: number;
vehicleId?: string;
inventoryNumber?: string;
latitude?: number;
longitude?: number;
speed?: number;
bearing?: number;
status?: number;
rawJson: string;
}
function segmentId(): string {
return new Date().toISOString().replace(/[.:]/g, '-');
}
function partitionPathFromSegmentId(id: string): string {
const match = /^(\d{4})-(\d{2})-(\d{2})T/.exec(id);
if (match) {
const [, year, month, day] = match;
return path.join(`year=${year}`, `month=${month}`, `day=${day}`);
}
const now = new Date();
const year = now.getUTCFullYear().toString();
const month = String(now.getUTCMonth() + 1).padStart(2, '0');
const day = String(now.getUTCDate()).padStart(2, '0');
return path.join(`year=${year}`, `month=${month}`, `day=${day}`);
}
function buildPartitionedFilePath(prefix: string, id: string): string {
return path.join(DATA_DIR, prefix, partitionPathFromSegmentId(id), `${prefix}-${id}.parquet`);
}
function buildVehicleFile(id: string): string {
return buildPartitionedFilePath('vehicle_positions', id);
}
function buildArrivalFile(id: string): string {
return buildPartitionedFilePath('arrival_records', id);
}
function buildSnapshotFile(id: string): string {
return buildPartitionedFilePath('vehicle_snapshots', id);
}
function createFileWriter(filename: string): Writer {
const writer = new ByteWriter() as unknown as Writer & { index: number };
const chunkSize = 1_000_000;
fsSync.mkdirSync(path.dirname(filename), { recursive: true });
fsSync.writeFileSync(filename, '', { flag: 'w' });
const flush = () => {
const chunk = new Uint8Array(writer.buffer, 0, writer.index);
fsSync.writeFileSync(filename, chunk, { flag: 'a' });
writer.index = 0;
};
writer.ensure = (size: number) => {
if (writer.index > chunkSize) {
flush();
}
if (writer.index + size > writer.buffer.byteLength) {
const newSize = Math.max(writer.buffer.byteLength * 2, writer.index + size);
const newBuffer = new ArrayBuffer(newSize);
new Uint8Array(newBuffer).set(new Uint8Array(writer.buffer));
writer.buffer = newBuffer;
writer.view = new DataView(writer.buffer);
}
};
writer.getBuffer = () => {
throw new Error('getBuffer not supported for file writer');
};
writer.getBytes = () => {
throw new Error('getBytes not supported for file writer');
};
writer.finish = () => {
flush();
};
return writer;
}
function createParquetWriter(filename: string, schema: SchemaElement[]): IncrementalWriter {
return new ParquetWriter({ writer: createFileWriter(filename), schema }) as unknown as IncrementalWriter;
}
function queuedWrite(queue: Promise<void>, task: () => Promise<void>): Promise<void> {
return queue.then(task).catch((error) => {
console.error('Parquet write failed:', error);
});
}
function toInt64(value: number): bigint {
return BigInt(Math.trunc(value));
}
function toNullableInt64(value: number | undefined): bigint | null {
return value == null ? null : toInt64(value);
}
function toVehicleColumns(positions: VehiclePosition[]): ColumnSource[] {
return [
{ name: 'timestamp', data: positions.map((p) => toInt64(p.timestamp)), type: 'INT64', nullable: false },
{ name: 'vehicleId', data: positions.map((p) => p.vehicleId), type: 'STRING', nullable: false },
{ name: 'routeId', data: positions.map((p) => p.routeId), type: 'STRING', nullable: false },
{ name: 'tripId', data: positions.map((p) => p.tripId ?? null), type: 'STRING' },
{ name: 'latitude', data: positions.map((p) => p.latitude), type: 'DOUBLE', nullable: false },
{ name: 'longitude', data: positions.map((p) => p.longitude), type: 'DOUBLE', nullable: false },
{ name: 'speed', data: positions.map((p) => p.speed ?? null), type: 'DOUBLE' },
{ name: 'bearing', data: positions.map((p) => p.bearing ?? null), type: 'DOUBLE' },
{ name: 'currentStatus', data: positions.map((p) => p.currentStatus ?? null), type: 'STRING' },
];
}
function toArrivalColumns(arrivals: ArrivalRecord[]): ColumnSource[] {
return [
{ name: 'timestamp', data: arrivals.map((a) => toInt64(a.timestamp)), type: 'INT64', nullable: false },
{ name: 'stopId', data: arrivals.map((a) => a.stopId), type: 'STRING', nullable: false },
{ name: 'routeId', data: arrivals.map((a) => a.routeId), type: 'STRING', nullable: false },
{ name: 'tripId', data: arrivals.map((a) => a.tripId ?? null), type: 'STRING' },
{ name: 'scheduledTime', data: arrivals.map((a) => toInt64(a.scheduledTime)), type: 'INT64', nullable: false },
{ name: 'predictedTime', data: arrivals.map((a) => toNullableInt64(a.predictedTime)), type: 'INT64' },
{ name: 'actualTime', data: arrivals.map((a) => toNullableInt64(a.actualTime)), type: 'INT64' },
{ name: 'delaySeconds', data: arrivals.map((a) => (a.delaySeconds == null ? null : Math.trunc(a.delaySeconds))), type: 'INT32' },
{ name: 'isRealtime', data: arrivals.map((a) => a.isRealtime), type: 'BOOLEAN', nullable: false },
{ name: 'headsign', data: arrivals.map((a) => a.headsign ?? null), type: 'STRING' },
];
}
function toSnapshotColumns(snapshots: VehicleFeedSnapshot[]): ColumnSource[] {
return [
{ name: 'timestamp', data: snapshots.map((s) => toInt64(s.timestamp)), type: 'INT64', nullable: false },
{ name: 'sourceTimestamp', data: snapshots.map((s) => toNullableInt64(s.sourceTimestamp)), type: 'INT64' },
{ name: 'vehicleId', data: snapshots.map((s) => s.vehicleId ?? null), type: 'STRING' },
{ name: 'inventoryNumber', data: snapshots.map((s) => s.inventoryNumber ?? null), type: 'STRING' },
{ name: 'latitude', data: snapshots.map((s) => (s.latitude == null ? null : s.latitude)), type: 'DOUBLE' },
{ name: 'longitude', data: snapshots.map((s) => (s.longitude == null ? null : s.longitude)), type: 'DOUBLE' },
{ name: 'speed', data: snapshots.map((s) => (s.speed == null ? null : s.speed)), type: 'DOUBLE' },
{ name: 'bearing', data: snapshots.map((s) => (s.bearing == null ? null : s.bearing)), type: 'DOUBLE' },
{ name: 'status', data: snapshots.map((s) => (s.status == null ? null : Math.trunc(s.status))), type: 'INT32' },
{ name: 'rawJson', data: snapshots.map((s) => s.rawJson), type: 'STRING', nullable: false },
];
}
function startNewSegment(id: string): void {
currentSegmentId = id;
currentVehicleFile = buildVehicleFile(id);
currentArrivalFile = buildArrivalFile(id);
currentSnapshotFile = buildSnapshotFile(id);
currentSegmentRows = { vehicle: 0, arrival: 0, snapshot: 0 };
vehicleWriter = createParquetWriter(currentVehicleFile, VEHICLE_SCHEMA);
arrivalWriter = createParquetWriter(currentArrivalFile, ARRIVAL_SCHEMA);
snapshotWriter = createParquetWriter(currentSnapshotFile, SNAPSHOT_SCHEMA);
}
async function uploadFileToObjectStorage(filePath: string): Promise<boolean> {
if (!OBJECT_STORAGE_ENABLED || !s3Client || !OBJECT_STORAGE_BUCKET) {
return true;
}
const keyPrefix = OBJECT_STORAGE_PREFIX ? `${OBJECT_STORAGE_PREFIX}/` : '';
const relativePath = path.relative(DATA_DIR, filePath);
const normalizedRelativePath =
!relativePath.startsWith('..') && !path.isAbsolute(relativePath)
? relativePath.split(path.sep).join('/')
: path.basename(filePath);
const key = `${keyPrefix}${normalizedRelativePath}`;
const body = await fs.readFile(filePath);
for (let attempt = 1; attempt <= OBJECT_STORAGE_UPLOAD_RETRIES; attempt += 1) {
try {
await s3Client.send(
new PutObjectCommand({
Bucket: OBJECT_STORAGE_BUCKET,
Key: key,
Body: body,
ContentLength: body.byteLength,
ContentType: 'application/octet-stream',
})
);
console.log(`[OK] Uploaded parquet to object storage: s3://${OBJECT_STORAGE_BUCKET}/${key}`);
return true;
} catch (error) {
const status = (error as { $metadata?: { httpStatusCode?: number } })?.$metadata?.httpStatusCode;
const shouldRetry = !!status && status >= 500;
const canRetry = shouldRetry && attempt < OBJECT_STORAGE_UPLOAD_RETRIES;
console.error(
`[ERROR] Upload failed for ${filePath} (attempt ${attempt}/${OBJECT_STORAGE_UPLOAD_RETRIES}):`,
error
);
if (!canRetry) {
break;
}
const backoffMs = OBJECT_STORAGE_UPLOAD_RETRY_BASE_MS * attempt;
await new Promise((resolve) => setTimeout(resolve, backoffMs));
}
}
return false;
}
async function deleteLocalFile(filePath: string): Promise<void> {
await fs.unlink(filePath).catch((error: unknown) => {
const code = (error as { code?: string }).code;
if (code !== 'ENOENT') {
console.warn(`[WARN] Failed to delete local parquet file ${filePath}:`, error);
}
});
}
async function finalizeSegment(reason: string, reopen: boolean): Promise<void> {
if (!initialized) {
return;
}
await Promise.all([vehicleWriteQueue, arrivalWriteQueue, snapshotWriteQueue]);
const meta: SegmentMeta = {
segmentId: currentSegmentId,
vehicleFile: currentVehicleFile,
arrivalFile: currentArrivalFile,
snapshotFile: currentSnapshotFile,
vehicleRows: currentSegmentRows.vehicle,
arrivalRows: currentSegmentRows.arrival,
snapshotRows: currentSegmentRows.snapshot,
};
if (vehicleWriter) {
vehicleWriter.finish();
vehicleWriter = undefined;
}
if (arrivalWriter) {
arrivalWriter.finish();
arrivalWriter = undefined;
}
if (snapshotWriter) {
snapshotWriter.finish();
snapshotWriter = undefined;
}
const hasRows = meta.vehicleRows > 0 || meta.arrivalRows > 0 || meta.snapshotRows > 0;
try {
if (hasRows) {
const uploadResults = await Promise.all([
uploadFileToObjectStorage(meta.vehicleFile),
uploadFileToObjectStorage(meta.arrivalFile),
uploadFileToObjectStorage(meta.snapshotFile),
]);
const allUploaded = uploadResults.every(Boolean);
if (OBJECT_STORAGE_ENABLED && DELETE_LOCAL_AFTER_UPLOAD && allUploaded) {
await Promise.all([
deleteLocalFile(meta.vehicleFile),
deleteLocalFile(meta.arrivalFile),
deleteLocalFile(meta.snapshotFile),
]);
console.log(`[OK] Deleted local parquet segment after upload: ${meta.segmentId}`);
} else if (OBJECT_STORAGE_ENABLED && DELETE_LOCAL_AFTER_UPLOAD && !allUploaded) {
console.warn(`[WARN] Kept local parquet segment due to upload errors: ${meta.segmentId}`);
}
} else if (OBJECT_STORAGE_ENABLED && DELETE_LOCAL_AFTER_UPLOAD) {
await Promise.all([
deleteLocalFile(meta.vehicleFile),
deleteLocalFile(meta.arrivalFile),
deleteLocalFile(meta.snapshotFile),
]);
}
} finally {
if (reopen) {
startNewSegment(segmentId());
console.log(`[OK] Rotated parquet segment (${reason}) from ${meta.segmentId}`);
}
}
}
function enqueueRotation(reason: string, reopen: boolean): Promise<void> {
rotationQueue = rotationQueue.then(() => finalizeSegment(reason, reopen)).catch((error) => {
console.error('Parquet rotation failed:', error);
});
return rotationQueue;
}
function startRollTimer(): void {
if (rollTimer) {
clearInterval(rollTimer);
}
rollTimer = setInterval(() => {
void enqueueRotation('interval', true);
}, ROLL_INTERVAL_MS);
}
async function ensureInitialized(): Promise<void> {
if (initialized) {
return;
}
await fs.mkdir(DATA_DIR, { recursive: true });
startNewSegment(segmentId());
startRollTimer();
initialized = true;
const storageInfo = OBJECT_STORAGE_ENABLED
? `, object storage enabled (bucket=${OBJECT_STORAGE_BUCKET}, deleteLocal=${DELETE_LOCAL_AFTER_UPLOAD})`
: ', object storage disabled';
console.log(`[OK] Hyparquet storage ready at ${DATA_DIR}, roll=${ROLL_INTERVAL_MS / 60000}m${storageInfo}`);
}
export async function initStorage(): Promise<void> {
await ensureInitialized();
}
export async function initDatabase(): Promise<void> {
await initStorage();
}
export async function logVehiclePosition(position: VehiclePosition): Promise<void> {
await logVehiclePositions([position]);
}
export async function logVehiclePositions(positions: VehiclePosition[]): Promise<void> {
if (positions.length === 0) {
return;
}
await ensureInitialized();
await rotationQueue;
vehicleWriteQueue = queuedWrite(vehicleWriteQueue, async () => {
const writer = vehicleWriter;
if (!writer) {
return;
}
writer.write({
columnData: toVehicleColumns(positions),
rowGroupSize: positions.length,
});
currentSegmentRows.vehicle += positions.length;
writes.vehiclePositions += positions.length;
});
await vehicleWriteQueue;
}
export async function logArrival(arrival: ArrivalRecord): Promise<void> {
await ensureInitialized();
await rotationQueue;
arrivalWriteQueue = queuedWrite(arrivalWriteQueue, async () => {
const writer = arrivalWriter;
if (!writer) {
return;
}
writer.write({
columnData: toArrivalColumns([arrival]),
rowGroupSize: 1,
});
currentSegmentRows.arrival += 1;
writes.arrivalRecords += 1;
});
await arrivalWriteQueue;
}
export async function logVehicleFeedSnapshot(snapshots: VehicleFeedSnapshot[]): Promise<void> {
if (snapshots.length === 0) {
return;
}
await ensureInitialized();
await rotationQueue;
snapshotWriteQueue = queuedWrite(snapshotWriteQueue, async () => {
const writer = snapshotWriter;
if (!writer) {
return;
}
writer.write({
columnData: toSnapshotColumns(snapshots),
rowGroupSize: snapshots.length,
});
currentSegmentRows.snapshot += snapshots.length;
writes.vehicleSnapshots += snapshots.length;
});
await snapshotWriteQueue;
}
export async function getVehicleHistory(_vehicleId: string, _startTime: number, _endTime: number) {
return [];
}
export async function getRouteVehiclePositions(_routeId: string, _startTime: number, _endTime: number) {
return [];
}
export async function getStopArrivalHistory(_stopId: string, _routeId: string, _startTime: number, _endTime: number) {
return [];
}
export async function getRouteDelayStats(_routeId: string, _hours: number = 24) {
return {
total_arrivals: 0,
avg_delay: null,
min_delay: null,
max_delay: null,
on_time_count: 0,
late_count: 0,
early_count: 0,
};
}
export async function getStopDelayStats(_stopId: string, _hours: number = 24) {
return [];
}
export async function getRouteHourlyPattern(_routeId: string, _days: number = 7) {
return [];
}
export async function cleanupOldData(_daysToKeep: number = 90): Promise<void> {
console.log('cleanupOldData skipped: parquet segment mode');
}
export async function getStorageStats() {
await ensureInitialized();
const [vehicleInfo, arrivalInfo, snapshotInfo] = await Promise.all([
fs.stat(currentVehicleFile).catch(() => null),
fs.stat(currentArrivalFile).catch(() => null),
fs.stat(currentSnapshotFile).catch(() => null),
]);
return {
vehiclePositions: writes.vehiclePositions,
arrivalRecords: writes.arrivalRecords,
vehicleSnapshots: writes.vehicleSnapshots,
oldestRecord: null,
newestRecord: null,
storageType: 'hyparquet(rolling-write)',
host: OBJECT_STORAGE_ENABLED ? 'object-storage+local' : 'local-filesystem',
storagePath: DATA_DIR,
rolling: {
minutes: ROLL_INTERVAL_MS / 60000,
currentSegmentId,
},
objectStorage: {
enabled: OBJECT_STORAGE_ENABLED,
bucket: OBJECT_STORAGE_BUCKET ?? null,
region: OBJECT_STORAGE_REGION,
endpoint: OBJECT_STORAGE_ENDPOINT ?? null,
prefix: OBJECT_STORAGE_PREFIX,
deleteLocalAfterUpload: DELETE_LOCAL_AFTER_UPLOAD,
},
files: {
vehicle: {
path: currentVehicleFile,
bytes: vehicleInfo?.size ?? 0,
modifiedAt: vehicleInfo?.mtime ?? null,
},
arrivals: {
path: currentArrivalFile,
bytes: arrivalInfo?.size ?? 0,
modifiedAt: arrivalInfo?.mtime ?? null,
},
snapshots: {
path: currentSnapshotFile,
bytes: snapshotInfo?.size ?? 0,
modifiedAt: snapshotInfo?.mtime ?? null,
},
},
};
}
export async function getDatabaseStats() {
return getStorageStats();
}
export async function closeStorage(): Promise<void> {
if (!initialized) {
return;
}
if (rollTimer) {
clearInterval(rollTimer);
rollTimer = undefined;
}
await Promise.all([vehicleWriteQueue, arrivalWriteQueue, snapshotWriteQueue]);
await enqueueRotation('shutdown', false);
initialized = false;
}
export async function closeDatabase(): Promise<void> {
await closeStorage();
}