Cleanup and rename
This commit is contained in:
668
lib/storage.ts
Normal file
668
lib/storage.ts
Normal file
@@ -0,0 +1,668 @@
|
||||
import { promises as fs } from 'fs';
|
||||
import * as fsSync from 'fs';
|
||||
import * as path from 'path';
|
||||
import { PutObjectCommand, S3Client } from '@aws-sdk/client-s3';
|
||||
import { ByteWriter, ParquetWriter } from 'hyparquet-writer';
|
||||
import type { SchemaElement } from 'hyparquet';
|
||||
import type { Writer } from 'hyparquet-writer';
|
||||
|
||||
const env = process.env;
|
||||
const DATA_DIR = env.PARQUET_DIR ?? path.join(process.cwd(), 'data');
|
||||
const ROLL_MINUTES = Number.parseInt(env.PARQUET_ROLL_MINUTES ?? '5', 10);
|
||||
const ROLL_INTERVAL_MS = Math.max(1, Number.isNaN(ROLL_MINUTES) ? 5 : ROLL_MINUTES) * 60_000;
|
||||
|
||||
const OBJECT_STORAGE_BUCKET = env.S3_BUCKET;
|
||||
const OBJECT_STORAGE_REGION = env.S3_REGION ?? 'us-east-1';
|
||||
const OBJECT_STORAGE_ENDPOINT = env.S3_ENDPOINT;
|
||||
const OBJECT_STORAGE_PREFIX = (env.S3_PREFIX ?? 'parquet').replace(/^\/+|\/+$/g, '');
|
||||
const OBJECT_STORAGE_FORCE_PATH_STYLE = (env.S3_FORCE_PATH_STYLE ?? 'true').toLowerCase() === 'true';
|
||||
const OBJECT_STORAGE_ACCESS_KEY = env.S3_ACCESS_KEY_ID;
|
||||
const OBJECT_STORAGE_SECRET_KEY = env.S3_SECRET_ACCESS_KEY;
|
||||
const OBJECT_STORAGE_ENABLED =
|
||||
(env.S3_ENABLED ?? 'false').toLowerCase() === 'true' &&
|
||||
!!OBJECT_STORAGE_BUCKET &&
|
||||
!!OBJECT_STORAGE_ACCESS_KEY &&
|
||||
!!OBJECT_STORAGE_SECRET_KEY;
|
||||
const DELETE_LOCAL_AFTER_UPLOAD = (env.S3_DELETE_LOCAL_AFTER_UPLOAD ?? 'false').toLowerCase() === 'true';
|
||||
const OBJECT_STORAGE_UPLOAD_RETRIES = Math.max(1, Number.parseInt(env.S3_UPLOAD_RETRIES ?? '3', 10) || 3);
|
||||
const OBJECT_STORAGE_UPLOAD_RETRY_BASE_MS = Math.max(100, Number.parseInt(env.S3_UPLOAD_RETRY_BASE_MS ?? '1000', 10) || 1000);
|
||||
|
||||
type BasicType = 'BOOLEAN' | 'INT32' | 'INT64' | 'FLOAT' | 'DOUBLE' | 'STRING' | 'TIMESTAMP';
|
||||
|
||||
type ColumnSource = {
|
||||
name: string;
|
||||
data: Array<string | number | bigint | boolean | Date | null>;
|
||||
type: BasicType;
|
||||
nullable?: boolean;
|
||||
};
|
||||
|
||||
type IncrementalWriter = {
|
||||
write: (options: { columnData: ColumnSource[]; rowGroupSize?: number | number[]; pageSize?: number }) => void;
|
||||
finish: () => void;
|
||||
};
|
||||
|
||||
type SegmentMeta = {
|
||||
segmentId: string;
|
||||
vehicleFile: string;
|
||||
arrivalFile: string;
|
||||
snapshotFile: string;
|
||||
vehicleRows: number;
|
||||
arrivalRows: number;
|
||||
snapshotRows: number;
|
||||
};
|
||||
|
||||
const VEHICLE_SCHEMA: SchemaElement[] = [
|
||||
{ name: 'root', num_children: 9 },
|
||||
{ name: 'timestamp', type: 'INT64', repetition_type: 'REQUIRED' },
|
||||
{ name: 'vehicleId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'REQUIRED' },
|
||||
{ name: 'routeId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'REQUIRED' },
|
||||
{ name: 'tripId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
|
||||
{ name: 'latitude', type: 'DOUBLE', repetition_type: 'REQUIRED' },
|
||||
{ name: 'longitude', type: 'DOUBLE', repetition_type: 'REQUIRED' },
|
||||
{ name: 'speed', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
|
||||
{ name: 'bearing', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
|
||||
{ name: 'currentStatus', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
|
||||
];
|
||||
|
||||
const ARRIVAL_SCHEMA: SchemaElement[] = [
|
||||
{ name: 'root', num_children: 10 },
|
||||
{ name: 'timestamp', type: 'INT64', repetition_type: 'REQUIRED' },
|
||||
{ name: 'stopId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'REQUIRED' },
|
||||
{ name: 'routeId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'REQUIRED' },
|
||||
{ name: 'tripId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
|
||||
{ name: 'scheduledTime', type: 'INT64', repetition_type: 'REQUIRED' },
|
||||
{ name: 'predictedTime', type: 'INT64', repetition_type: 'OPTIONAL' },
|
||||
{ name: 'actualTime', type: 'INT64', repetition_type: 'OPTIONAL' },
|
||||
{ name: 'delaySeconds', type: 'INT32', repetition_type: 'OPTIONAL' },
|
||||
{ name: 'isRealtime', type: 'BOOLEAN', repetition_type: 'REQUIRED' },
|
||||
{ name: 'headsign', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
|
||||
];
|
||||
|
||||
const SNAPSHOT_SCHEMA: SchemaElement[] = [
|
||||
{ name: 'root', num_children: 10 },
|
||||
{ name: 'timestamp', type: 'INT64', repetition_type: 'REQUIRED' },
|
||||
{ name: 'sourceTimestamp', type: 'INT64', repetition_type: 'OPTIONAL' },
|
||||
{ name: 'vehicleId', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
|
||||
{ name: 'inventoryNumber', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL' },
|
||||
{ name: 'latitude', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
|
||||
{ name: 'longitude', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
|
||||
{ name: 'speed', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
|
||||
{ name: 'bearing', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
|
||||
{ name: 'status', type: 'INT32', repetition_type: 'OPTIONAL' },
|
||||
{ name: 'rawJson', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'REQUIRED' },
|
||||
];
|
||||
|
||||
let initialized = false;
|
||||
let vehicleWriter: IncrementalWriter | undefined;
|
||||
let arrivalWriter: IncrementalWriter | undefined;
|
||||
let snapshotWriter: IncrementalWriter | undefined;
|
||||
let vehicleWriteQueue: Promise<void> = Promise.resolve();
|
||||
let arrivalWriteQueue: Promise<void> = Promise.resolve();
|
||||
let snapshotWriteQueue: Promise<void> = Promise.resolve();
|
||||
let rotationQueue: Promise<void> = Promise.resolve();
|
||||
let rollTimer: ReturnType<typeof setInterval> | undefined;
|
||||
|
||||
let currentSegmentId = '';
|
||||
let currentVehicleFile = '';
|
||||
let currentArrivalFile = '';
|
||||
let currentSnapshotFile = '';
|
||||
let currentSegmentRows = { vehicle: 0, arrival: 0, snapshot: 0 };
|
||||
|
||||
const writes = {
|
||||
vehiclePositions: 0,
|
||||
arrivalRecords: 0,
|
||||
vehicleSnapshots: 0,
|
||||
};
|
||||
|
||||
const s3Client = OBJECT_STORAGE_ENABLED
|
||||
? new S3Client({
|
||||
region: OBJECT_STORAGE_REGION,
|
||||
endpoint: OBJECT_STORAGE_ENDPOINT,
|
||||
forcePathStyle: OBJECT_STORAGE_FORCE_PATH_STYLE,
|
||||
requestChecksumCalculation: 'WHEN_REQUIRED',
|
||||
responseChecksumValidation: 'WHEN_REQUIRED',
|
||||
credentials: {
|
||||
accessKeyId: OBJECT_STORAGE_ACCESS_KEY as string,
|
||||
secretAccessKey: OBJECT_STORAGE_SECRET_KEY as string,
|
||||
},
|
||||
})
|
||||
: undefined;
|
||||
|
||||
export interface VehiclePosition {
|
||||
timestamp: number;
|
||||
vehicleId: string;
|
||||
routeId: string;
|
||||
tripId?: string;
|
||||
latitude: number;
|
||||
longitude: number;
|
||||
speed?: number;
|
||||
bearing?: number;
|
||||
currentStatus?: string;
|
||||
}
|
||||
|
||||
export interface ArrivalRecord {
|
||||
timestamp: number;
|
||||
stopId: string;
|
||||
routeId: string;
|
||||
tripId?: string;
|
||||
scheduledTime: number;
|
||||
predictedTime?: number;
|
||||
actualTime?: number;
|
||||
delaySeconds?: number;
|
||||
isRealtime: boolean;
|
||||
headsign?: string;
|
||||
}
|
||||
|
||||
export interface VehicleFeedSnapshot {
|
||||
timestamp: number;
|
||||
sourceTimestamp?: number;
|
||||
vehicleId?: string;
|
||||
inventoryNumber?: string;
|
||||
latitude?: number;
|
||||
longitude?: number;
|
||||
speed?: number;
|
||||
bearing?: number;
|
||||
status?: number;
|
||||
rawJson: string;
|
||||
}
|
||||
|
||||
function segmentId(): string {
|
||||
return new Date().toISOString().replace(/[.:]/g, '-');
|
||||
}
|
||||
|
||||
function partitionPathFromSegmentId(id: string): string {
|
||||
const match = /^(\d{4})-(\d{2})-(\d{2})T/.exec(id);
|
||||
if (match) {
|
||||
const [, year, month, day] = match;
|
||||
return path.join(`year=${year}`, `month=${month}`, `day=${day}`);
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
const year = now.getUTCFullYear().toString();
|
||||
const month = String(now.getUTCMonth() + 1).padStart(2, '0');
|
||||
const day = String(now.getUTCDate()).padStart(2, '0');
|
||||
return path.join(`year=${year}`, `month=${month}`, `day=${day}`);
|
||||
}
|
||||
|
||||
function buildPartitionedFilePath(prefix: string, id: string): string {
|
||||
return path.join(DATA_DIR, partitionPathFromSegmentId(id), `${prefix}-${id}.parquet`);
|
||||
}
|
||||
|
||||
function buildVehicleFile(id: string): string {
|
||||
return buildPartitionedFilePath('vehicle_positions', id);
|
||||
}
|
||||
|
||||
function buildArrivalFile(id: string): string {
|
||||
return buildPartitionedFilePath('arrival_records', id);
|
||||
}
|
||||
|
||||
function buildSnapshotFile(id: string): string {
|
||||
return buildPartitionedFilePath('vehicle_snapshots', id);
|
||||
}
|
||||
|
||||
function createFileWriter(filename: string): Writer {
|
||||
const writer = new ByteWriter() as unknown as Writer & { index: number };
|
||||
const chunkSize = 1_000_000;
|
||||
|
||||
fsSync.mkdirSync(path.dirname(filename), { recursive: true });
|
||||
fsSync.writeFileSync(filename, '', { flag: 'w' });
|
||||
|
||||
const flush = () => {
|
||||
const chunk = new Uint8Array(writer.buffer, 0, writer.index);
|
||||
fsSync.writeFileSync(filename, chunk, { flag: 'a' });
|
||||
writer.index = 0;
|
||||
};
|
||||
|
||||
writer.ensure = (size: number) => {
|
||||
if (writer.index > chunkSize) {
|
||||
flush();
|
||||
}
|
||||
if (writer.index + size > writer.buffer.byteLength) {
|
||||
const newSize = Math.max(writer.buffer.byteLength * 2, writer.index + size);
|
||||
const newBuffer = new ArrayBuffer(newSize);
|
||||
new Uint8Array(newBuffer).set(new Uint8Array(writer.buffer));
|
||||
writer.buffer = newBuffer;
|
||||
writer.view = new DataView(writer.buffer);
|
||||
}
|
||||
};
|
||||
|
||||
writer.getBuffer = () => {
|
||||
throw new Error('getBuffer not supported for file writer');
|
||||
};
|
||||
writer.getBytes = () => {
|
||||
throw new Error('getBytes not supported for file writer');
|
||||
};
|
||||
writer.finish = () => {
|
||||
flush();
|
||||
};
|
||||
|
||||
return writer;
|
||||
}
|
||||
|
||||
function createParquetWriter(filename: string, schema: SchemaElement[]): IncrementalWriter {
|
||||
return new ParquetWriter({ writer: createFileWriter(filename), schema }) as unknown as IncrementalWriter;
|
||||
}
|
||||
|
||||
function queuedWrite(queue: Promise<void>, task: () => Promise<void>): Promise<void> {
|
||||
return queue.then(task).catch((error) => {
|
||||
console.error('Parquet write failed:', error);
|
||||
});
|
||||
}
|
||||
|
||||
function toInt64(value: number): bigint {
|
||||
return BigInt(Math.trunc(value));
|
||||
}
|
||||
|
||||
function toNullableInt64(value: number | undefined): bigint | null {
|
||||
return value == null ? null : toInt64(value);
|
||||
}
|
||||
|
||||
function toVehicleColumns(positions: VehiclePosition[]): ColumnSource[] {
|
||||
return [
|
||||
{ name: 'timestamp', data: positions.map((p) => toInt64(p.timestamp)), type: 'INT64', nullable: false },
|
||||
{ name: 'vehicleId', data: positions.map((p) => p.vehicleId), type: 'STRING', nullable: false },
|
||||
{ name: 'routeId', data: positions.map((p) => p.routeId), type: 'STRING', nullable: false },
|
||||
{ name: 'tripId', data: positions.map((p) => p.tripId ?? null), type: 'STRING' },
|
||||
{ name: 'latitude', data: positions.map((p) => p.latitude), type: 'DOUBLE', nullable: false },
|
||||
{ name: 'longitude', data: positions.map((p) => p.longitude), type: 'DOUBLE', nullable: false },
|
||||
{ name: 'speed', data: positions.map((p) => p.speed ?? null), type: 'DOUBLE' },
|
||||
{ name: 'bearing', data: positions.map((p) => p.bearing ?? null), type: 'DOUBLE' },
|
||||
{ name: 'currentStatus', data: positions.map((p) => p.currentStatus ?? null), type: 'STRING' },
|
||||
];
|
||||
}
|
||||
|
||||
function toArrivalColumns(arrivals: ArrivalRecord[]): ColumnSource[] {
|
||||
return [
|
||||
{ name: 'timestamp', data: arrivals.map((a) => toInt64(a.timestamp)), type: 'INT64', nullable: false },
|
||||
{ name: 'stopId', data: arrivals.map((a) => a.stopId), type: 'STRING', nullable: false },
|
||||
{ name: 'routeId', data: arrivals.map((a) => a.routeId), type: 'STRING', nullable: false },
|
||||
{ name: 'tripId', data: arrivals.map((a) => a.tripId ?? null), type: 'STRING' },
|
||||
{ name: 'scheduledTime', data: arrivals.map((a) => toInt64(a.scheduledTime)), type: 'INT64', nullable: false },
|
||||
{ name: 'predictedTime', data: arrivals.map((a) => toNullableInt64(a.predictedTime)), type: 'INT64' },
|
||||
{ name: 'actualTime', data: arrivals.map((a) => toNullableInt64(a.actualTime)), type: 'INT64' },
|
||||
{ name: 'delaySeconds', data: arrivals.map((a) => (a.delaySeconds == null ? null : Math.trunc(a.delaySeconds))), type: 'INT32' },
|
||||
{ name: 'isRealtime', data: arrivals.map((a) => a.isRealtime), type: 'BOOLEAN', nullable: false },
|
||||
{ name: 'headsign', data: arrivals.map((a) => a.headsign ?? null), type: 'STRING' },
|
||||
];
|
||||
}
|
||||
|
||||
function toSnapshotColumns(snapshots: VehicleFeedSnapshot[]): ColumnSource[] {
|
||||
return [
|
||||
{ name: 'timestamp', data: snapshots.map((s) => toInt64(s.timestamp)), type: 'INT64', nullable: false },
|
||||
{ name: 'sourceTimestamp', data: snapshots.map((s) => toNullableInt64(s.sourceTimestamp)), type: 'INT64' },
|
||||
{ name: 'vehicleId', data: snapshots.map((s) => s.vehicleId ?? null), type: 'STRING' },
|
||||
{ name: 'inventoryNumber', data: snapshots.map((s) => s.inventoryNumber ?? null), type: 'STRING' },
|
||||
{ name: 'latitude', data: snapshots.map((s) => (s.latitude == null ? null : s.latitude)), type: 'DOUBLE' },
|
||||
{ name: 'longitude', data: snapshots.map((s) => (s.longitude == null ? null : s.longitude)), type: 'DOUBLE' },
|
||||
{ name: 'speed', data: snapshots.map((s) => (s.speed == null ? null : s.speed)), type: 'DOUBLE' },
|
||||
{ name: 'bearing', data: snapshots.map((s) => (s.bearing == null ? null : s.bearing)), type: 'DOUBLE' },
|
||||
{ name: 'status', data: snapshots.map((s) => (s.status == null ? null : Math.trunc(s.status))), type: 'INT32' },
|
||||
{ name: 'rawJson', data: snapshots.map((s) => s.rawJson), type: 'STRING', nullable: false },
|
||||
];
|
||||
}
|
||||
|
||||
function startNewSegment(id: string): void {
|
||||
currentSegmentId = id;
|
||||
currentVehicleFile = buildVehicleFile(id);
|
||||
currentArrivalFile = buildArrivalFile(id);
|
||||
currentSnapshotFile = buildSnapshotFile(id);
|
||||
currentSegmentRows = { vehicle: 0, arrival: 0, snapshot: 0 };
|
||||
vehicleWriter = createParquetWriter(currentVehicleFile, VEHICLE_SCHEMA);
|
||||
arrivalWriter = createParquetWriter(currentArrivalFile, ARRIVAL_SCHEMA);
|
||||
snapshotWriter = createParquetWriter(currentSnapshotFile, SNAPSHOT_SCHEMA);
|
||||
}
|
||||
|
||||
async function uploadFileToObjectStorage(filePath: string): Promise<boolean> {
|
||||
if (!OBJECT_STORAGE_ENABLED || !s3Client || !OBJECT_STORAGE_BUCKET) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const keyPrefix = OBJECT_STORAGE_PREFIX ? `${OBJECT_STORAGE_PREFIX}/` : '';
|
||||
const relativePath = path.relative(DATA_DIR, filePath);
|
||||
const normalizedRelativePath =
|
||||
!relativePath.startsWith('..') && !path.isAbsolute(relativePath)
|
||||
? relativePath.split(path.sep).join('/')
|
||||
: path.basename(filePath);
|
||||
const key = `${keyPrefix}${normalizedRelativePath}`;
|
||||
|
||||
const body = await fs.readFile(filePath);
|
||||
|
||||
for (let attempt = 1; attempt <= OBJECT_STORAGE_UPLOAD_RETRIES; attempt += 1) {
|
||||
try {
|
||||
await s3Client.send(
|
||||
new PutObjectCommand({
|
||||
Bucket: OBJECT_STORAGE_BUCKET,
|
||||
Key: key,
|
||||
Body: body,
|
||||
ContentLength: body.byteLength,
|
||||
ContentType: 'application/octet-stream',
|
||||
})
|
||||
);
|
||||
|
||||
console.log(`[OK] Uploaded parquet to object storage: s3://${OBJECT_STORAGE_BUCKET}/${key}`);
|
||||
return true;
|
||||
} catch (error) {
|
||||
const status = (error as { $metadata?: { httpStatusCode?: number } })?.$metadata?.httpStatusCode;
|
||||
const shouldRetry = !!status && status >= 500;
|
||||
const canRetry = shouldRetry && attempt < OBJECT_STORAGE_UPLOAD_RETRIES;
|
||||
|
||||
console.error(
|
||||
`[ERROR] Upload failed for ${filePath} (attempt ${attempt}/${OBJECT_STORAGE_UPLOAD_RETRIES}):`,
|
||||
error
|
||||
);
|
||||
|
||||
if (!canRetry) {
|
||||
break;
|
||||
}
|
||||
|
||||
const backoffMs = OBJECT_STORAGE_UPLOAD_RETRY_BASE_MS * attempt;
|
||||
await new Promise((resolve) => setTimeout(resolve, backoffMs));
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
async function deleteLocalFile(filePath: string): Promise<void> {
|
||||
await fs.unlink(filePath).catch((error: unknown) => {
|
||||
const code = (error as { code?: string }).code;
|
||||
if (code !== 'ENOENT') {
|
||||
console.warn(`[WARN] Failed to delete local parquet file ${filePath}:`, error);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async function finalizeSegment(reason: string, reopen: boolean): Promise<void> {
|
||||
if (!initialized) {
|
||||
return;
|
||||
}
|
||||
|
||||
await Promise.all([vehicleWriteQueue, arrivalWriteQueue, snapshotWriteQueue]);
|
||||
|
||||
const meta: SegmentMeta = {
|
||||
segmentId: currentSegmentId,
|
||||
vehicleFile: currentVehicleFile,
|
||||
arrivalFile: currentArrivalFile,
|
||||
snapshotFile: currentSnapshotFile,
|
||||
vehicleRows: currentSegmentRows.vehicle,
|
||||
arrivalRows: currentSegmentRows.arrival,
|
||||
snapshotRows: currentSegmentRows.snapshot,
|
||||
};
|
||||
|
||||
if (vehicleWriter) {
|
||||
vehicleWriter.finish();
|
||||
vehicleWriter = undefined;
|
||||
}
|
||||
if (arrivalWriter) {
|
||||
arrivalWriter.finish();
|
||||
arrivalWriter = undefined;
|
||||
}
|
||||
if (snapshotWriter) {
|
||||
snapshotWriter.finish();
|
||||
snapshotWriter = undefined;
|
||||
}
|
||||
|
||||
const hasRows = meta.vehicleRows > 0 || meta.arrivalRows > 0 || meta.snapshotRows > 0;
|
||||
|
||||
try {
|
||||
if (hasRows) {
|
||||
const uploadResults = await Promise.all([
|
||||
uploadFileToObjectStorage(meta.vehicleFile),
|
||||
uploadFileToObjectStorage(meta.arrivalFile),
|
||||
uploadFileToObjectStorage(meta.snapshotFile),
|
||||
]);
|
||||
const allUploaded = uploadResults.every(Boolean);
|
||||
|
||||
if (OBJECT_STORAGE_ENABLED && DELETE_LOCAL_AFTER_UPLOAD && allUploaded) {
|
||||
await Promise.all([
|
||||
deleteLocalFile(meta.vehicleFile),
|
||||
deleteLocalFile(meta.arrivalFile),
|
||||
deleteLocalFile(meta.snapshotFile),
|
||||
]);
|
||||
console.log(`[OK] Deleted local parquet segment after upload: ${meta.segmentId}`);
|
||||
} else if (OBJECT_STORAGE_ENABLED && DELETE_LOCAL_AFTER_UPLOAD && !allUploaded) {
|
||||
console.warn(`[WARN] Kept local parquet segment due to upload errors: ${meta.segmentId}`);
|
||||
}
|
||||
} else if (OBJECT_STORAGE_ENABLED && DELETE_LOCAL_AFTER_UPLOAD) {
|
||||
await Promise.all([
|
||||
deleteLocalFile(meta.vehicleFile),
|
||||
deleteLocalFile(meta.arrivalFile),
|
||||
deleteLocalFile(meta.snapshotFile),
|
||||
]);
|
||||
}
|
||||
} finally {
|
||||
if (reopen) {
|
||||
startNewSegment(segmentId());
|
||||
console.log(`[OK] Rotated parquet segment (${reason}) from ${meta.segmentId}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function enqueueRotation(reason: string, reopen: boolean): Promise<void> {
|
||||
rotationQueue = rotationQueue.then(() => finalizeSegment(reason, reopen)).catch((error) => {
|
||||
console.error('Parquet rotation failed:', error);
|
||||
});
|
||||
return rotationQueue;
|
||||
}
|
||||
|
||||
function startRollTimer(): void {
|
||||
if (rollTimer) {
|
||||
clearInterval(rollTimer);
|
||||
}
|
||||
|
||||
rollTimer = setInterval(() => {
|
||||
void enqueueRotation('interval', true);
|
||||
}, ROLL_INTERVAL_MS);
|
||||
}
|
||||
|
||||
async function ensureInitialized(): Promise<void> {
|
||||
if (initialized) {
|
||||
return;
|
||||
}
|
||||
|
||||
await fs.mkdir(DATA_DIR, { recursive: true });
|
||||
startNewSegment(segmentId());
|
||||
startRollTimer();
|
||||
initialized = true;
|
||||
|
||||
const storageInfo = OBJECT_STORAGE_ENABLED
|
||||
? `, object storage enabled (bucket=${OBJECT_STORAGE_BUCKET}, deleteLocal=${DELETE_LOCAL_AFTER_UPLOAD})`
|
||||
: ', object storage disabled';
|
||||
|
||||
console.log(`[OK] Hyparquet storage ready at ${DATA_DIR}, roll=${ROLL_INTERVAL_MS / 60000}m${storageInfo}`);
|
||||
}
|
||||
|
||||
export async function initStorage(): Promise<void> {
|
||||
await ensureInitialized();
|
||||
}
|
||||
|
||||
export async function initDatabase(): Promise<void> {
|
||||
await initStorage();
|
||||
}
|
||||
|
||||
export async function logVehiclePosition(position: VehiclePosition): Promise<void> {
|
||||
await logVehiclePositions([position]);
|
||||
}
|
||||
|
||||
export async function logVehiclePositions(positions: VehiclePosition[]): Promise<void> {
|
||||
if (positions.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
await ensureInitialized();
|
||||
await rotationQueue;
|
||||
|
||||
vehicleWriteQueue = queuedWrite(vehicleWriteQueue, async () => {
|
||||
const writer = vehicleWriter;
|
||||
if (!writer) {
|
||||
return;
|
||||
}
|
||||
|
||||
writer.write({
|
||||
columnData: toVehicleColumns(positions),
|
||||
rowGroupSize: positions.length,
|
||||
});
|
||||
|
||||
currentSegmentRows.vehicle += positions.length;
|
||||
writes.vehiclePositions += positions.length;
|
||||
});
|
||||
|
||||
await vehicleWriteQueue;
|
||||
}
|
||||
|
||||
export async function logArrival(arrival: ArrivalRecord): Promise<void> {
|
||||
await ensureInitialized();
|
||||
await rotationQueue;
|
||||
|
||||
arrivalWriteQueue = queuedWrite(arrivalWriteQueue, async () => {
|
||||
const writer = arrivalWriter;
|
||||
if (!writer) {
|
||||
return;
|
||||
}
|
||||
|
||||
writer.write({
|
||||
columnData: toArrivalColumns([arrival]),
|
||||
rowGroupSize: 1,
|
||||
});
|
||||
|
||||
currentSegmentRows.arrival += 1;
|
||||
writes.arrivalRecords += 1;
|
||||
});
|
||||
|
||||
await arrivalWriteQueue;
|
||||
}
|
||||
|
||||
export async function logVehicleFeedSnapshot(snapshots: VehicleFeedSnapshot[]): Promise<void> {
|
||||
if (snapshots.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
await ensureInitialized();
|
||||
await rotationQueue;
|
||||
|
||||
snapshotWriteQueue = queuedWrite(snapshotWriteQueue, async () => {
|
||||
const writer = snapshotWriter;
|
||||
if (!writer) {
|
||||
return;
|
||||
}
|
||||
|
||||
writer.write({
|
||||
columnData: toSnapshotColumns(snapshots),
|
||||
rowGroupSize: snapshots.length,
|
||||
});
|
||||
|
||||
currentSegmentRows.snapshot += snapshots.length;
|
||||
writes.vehicleSnapshots += snapshots.length;
|
||||
});
|
||||
|
||||
await snapshotWriteQueue;
|
||||
}
|
||||
|
||||
export async function getVehicleHistory(_vehicleId: string, _startTime: number, _endTime: number) {
|
||||
return [];
|
||||
}
|
||||
|
||||
export async function getRouteVehiclePositions(_routeId: string, _startTime: number, _endTime: number) {
|
||||
return [];
|
||||
}
|
||||
|
||||
export async function getStopArrivalHistory(_stopId: string, _routeId: string, _startTime: number, _endTime: number) {
|
||||
return [];
|
||||
}
|
||||
|
||||
export async function getRouteDelayStats(_routeId: string, _hours: number = 24) {
|
||||
return {
|
||||
total_arrivals: 0,
|
||||
avg_delay: null,
|
||||
min_delay: null,
|
||||
max_delay: null,
|
||||
on_time_count: 0,
|
||||
late_count: 0,
|
||||
early_count: 0,
|
||||
};
|
||||
}
|
||||
|
||||
export async function getStopDelayStats(_stopId: string, _hours: number = 24) {
|
||||
return [];
|
||||
}
|
||||
|
||||
export async function getRouteHourlyPattern(_routeId: string, _days: number = 7) {
|
||||
return [];
|
||||
}
|
||||
|
||||
export async function cleanupOldData(_daysToKeep: number = 90): Promise<void> {
|
||||
console.log('cleanupOldData skipped: parquet segment mode');
|
||||
}
|
||||
|
||||
export async function getStorageStats() {
|
||||
await ensureInitialized();
|
||||
|
||||
const [vehicleInfo, arrivalInfo, snapshotInfo] = await Promise.all([
|
||||
fs.stat(currentVehicleFile).catch(() => null),
|
||||
fs.stat(currentArrivalFile).catch(() => null),
|
||||
fs.stat(currentSnapshotFile).catch(() => null),
|
||||
]);
|
||||
|
||||
return {
|
||||
vehiclePositions: writes.vehiclePositions,
|
||||
arrivalRecords: writes.arrivalRecords,
|
||||
vehicleSnapshots: writes.vehicleSnapshots,
|
||||
oldestRecord: null,
|
||||
newestRecord: null,
|
||||
storageType: 'hyparquet(rolling-write)',
|
||||
host: OBJECT_STORAGE_ENABLED ? 'object-storage+local' : 'local-filesystem',
|
||||
storagePath: DATA_DIR,
|
||||
rolling: {
|
||||
minutes: ROLL_INTERVAL_MS / 60000,
|
||||
currentSegmentId,
|
||||
},
|
||||
objectStorage: {
|
||||
enabled: OBJECT_STORAGE_ENABLED,
|
||||
bucket: OBJECT_STORAGE_BUCKET ?? null,
|
||||
region: OBJECT_STORAGE_REGION,
|
||||
endpoint: OBJECT_STORAGE_ENDPOINT ?? null,
|
||||
prefix: OBJECT_STORAGE_PREFIX,
|
||||
deleteLocalAfterUpload: DELETE_LOCAL_AFTER_UPLOAD,
|
||||
},
|
||||
files: {
|
||||
vehicle: {
|
||||
path: currentVehicleFile,
|
||||
bytes: vehicleInfo?.size ?? 0,
|
||||
modifiedAt: vehicleInfo?.mtime ?? null,
|
||||
},
|
||||
arrivals: {
|
||||
path: currentArrivalFile,
|
||||
bytes: arrivalInfo?.size ?? 0,
|
||||
modifiedAt: arrivalInfo?.mtime ?? null,
|
||||
},
|
||||
snapshots: {
|
||||
path: currentSnapshotFile,
|
||||
bytes: snapshotInfo?.size ?? 0,
|
||||
modifiedAt: snapshotInfo?.mtime ?? null,
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export async function getDatabaseStats() {
|
||||
return getStorageStats();
|
||||
}
|
||||
|
||||
export async function closeStorage(): Promise<void> {
|
||||
if (!initialized) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (rollTimer) {
|
||||
clearInterval(rollTimer);
|
||||
rollTimer = undefined;
|
||||
}
|
||||
|
||||
await Promise.all([vehicleWriteQueue, arrivalWriteQueue, snapshotWriteQueue]);
|
||||
await enqueueRotation('shutdown', false);
|
||||
initialized = false;
|
||||
}
|
||||
|
||||
export async function closeDatabase(): Promise<void> {
|
||||
await closeStorage();
|
||||
}
|
||||
Reference in New Issue
Block a user