Initial commit
This commit is contained in:
@@ -0,0 +1,22 @@
|
||||
{
|
||||
"name": "@goodgrief/worker",
|
||||
"version": "0.1.0",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"dev": "tsx src/index.ts",
|
||||
"dev:watch": "tsx watch src/index.ts",
|
||||
"build": "tsc --noEmit",
|
||||
"check": "tsc --noEmit"
|
||||
},
|
||||
"dependencies": {
|
||||
"@goodgrief/shared-types": "file:../../packages/shared-types",
|
||||
"fastify": "^5.2.1",
|
||||
"sharp": "^0.33.5"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/node": "^24.0.0",
|
||||
"tsx": "^4.19.4",
|
||||
"typescript": "^5.8.3"
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
import { existsSync } from "node:fs";
|
||||
import path from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
const sourceDir = path.dirname(fileURLToPath(import.meta.url));
|
||||
|
||||
const isRepoRoot = (dirPath: string) =>
|
||||
existsSync(path.join(dirPath, "package.json")) &&
|
||||
existsSync(path.join(dirPath, "apps")) &&
|
||||
existsSync(path.join(dirPath, "packages")) &&
|
||||
existsSync(path.join(dirPath, "services"));
|
||||
|
||||
const findRepoRoot = (...startDirs: string[]) => {
|
||||
for (const startDir of startDirs) {
|
||||
let current = path.resolve(startDir);
|
||||
while (true) {
|
||||
if (isRepoRoot(current)) {
|
||||
return current;
|
||||
}
|
||||
const parent = path.dirname(current);
|
||||
if (parent === current) {
|
||||
break;
|
||||
}
|
||||
current = parent;
|
||||
}
|
||||
}
|
||||
|
||||
return process.cwd();
|
||||
};
|
||||
|
||||
const rootDir = findRepoRoot(process.cwd(), sourceDir);
|
||||
|
||||
export const config = {
|
||||
port: Number(process.env.PORT ?? 4301),
|
||||
host: process.env.HOST ?? "0.0.0.0",
|
||||
apiBaseUrl: process.env.API_BASE_URL ?? "http://localhost:4300",
|
||||
storageDir: path.join(rootDir, "storage"),
|
||||
pollIntervalMs: Number(process.env.POLL_INTERVAL_MS ?? 2500)
|
||||
};
|
||||
@@ -0,0 +1,51 @@
|
||||
import Fastify from "fastify";
|
||||
import { config } from "./config.ts";
|
||||
import { runWorkerOnce } from "./processor.ts";
|
||||
|
||||
const app = Fastify({
|
||||
logger: true
|
||||
});
|
||||
|
||||
let lastRun: { processed: boolean; assetId?: string; error?: string } | null = null;
|
||||
|
||||
app.get("/health", async () => ({
|
||||
status: "ok",
|
||||
service: "worker",
|
||||
lastRun
|
||||
}));
|
||||
|
||||
app.post("/run-once", async () => {
|
||||
lastRun = await runWorkerOnce();
|
||||
return lastRun;
|
||||
});
|
||||
|
||||
const interval = setInterval(() => {
|
||||
void runWorkerOnce()
|
||||
.then((result) => {
|
||||
lastRun = result;
|
||||
if (result.processed) {
|
||||
app.log.info({ assetId: result.assetId }, "Processed queued asset.");
|
||||
}
|
||||
})
|
||||
.catch((error) => {
|
||||
app.log.error(error);
|
||||
lastRun = {
|
||||
processed: false,
|
||||
error: error instanceof Error ? error.message : "Unknown worker error."
|
||||
};
|
||||
});
|
||||
}, config.pollIntervalMs);
|
||||
|
||||
process.on("SIGINT", () => clearInterval(interval));
|
||||
process.on("SIGTERM", () => clearInterval(interval));
|
||||
|
||||
try {
|
||||
await app.listen({
|
||||
port: config.port,
|
||||
host: config.host
|
||||
});
|
||||
} catch (error) {
|
||||
app.log.error(error);
|
||||
clearInterval(interval);
|
||||
process.exit(1);
|
||||
}
|
||||
@@ -0,0 +1,113 @@
|
||||
import { mkdir, readFile } from "node:fs/promises";
|
||||
import { createHash } from "node:crypto";
|
||||
import path from "node:path";
|
||||
import sharp from "sharp";
|
||||
import type { PhotoAsset, RepositoryState } from "@goodgrief/shared-types";
|
||||
import { config } from "./config.ts";
|
||||
|
||||
const toStoragePath = (publicUrl: string) => path.join(config.storageDir, publicUrl.replace(/^\/uploads\//, ""));
|
||||
|
||||
const computeOrientation = (width: number, height: number) => {
|
||||
if (width === height) {
|
||||
return "square";
|
||||
}
|
||||
return width > height ? "landscape" : "portrait";
|
||||
};
|
||||
|
||||
const createDerivativePath = (assetId: string, kind: "thumbs" | "previews" | "renders") =>
|
||||
path.join(config.storageDir, "runtime", kind, `${assetId}.jpg`);
|
||||
|
||||
const publicKeyFor = (assetId: string, kind: "thumbs" | "previews" | "renders") =>
|
||||
`/uploads/runtime/${kind}/${assetId}.jpg`;
|
||||
|
||||
const fetchState = async (): Promise<RepositoryState> => {
|
||||
const response = await fetch(`${config.apiBaseUrl}/api/state`);
|
||||
if (!response.ok) {
|
||||
throw new Error("Could not fetch API state.");
|
||||
}
|
||||
return (await response.json()) as RepositoryState;
|
||||
};
|
||||
|
||||
const notifyProcessed = async (assetId: string, payload: Record<string, unknown>) => {
|
||||
await fetch(`${config.apiBaseUrl}/api/assets/${assetId}/processed`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json"
|
||||
},
|
||||
body: JSON.stringify(payload)
|
||||
});
|
||||
};
|
||||
|
||||
const notifyFailure = async (assetId: string, message: string) => {
|
||||
await fetch(`${config.apiBaseUrl}/api/assets/${assetId}/failed`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json"
|
||||
},
|
||||
body: JSON.stringify({ message })
|
||||
});
|
||||
};
|
||||
|
||||
export const processAsset = async (asset: PhotoAsset) => {
|
||||
const sourcePath = toStoragePath(asset.originalKey);
|
||||
const inputBuffer = await readFile(sourcePath);
|
||||
const sha256 = createHash("sha256").update(inputBuffer).digest("hex");
|
||||
|
||||
await mkdir(path.join(config.storageDir, "runtime", "thumbs"), { recursive: true });
|
||||
await mkdir(path.join(config.storageDir, "runtime", "previews"), { recursive: true });
|
||||
await mkdir(path.join(config.storageDir, "runtime", "renders"), { recursive: true });
|
||||
|
||||
const image = sharp(inputBuffer, { failOn: "none" }).rotate();
|
||||
const metadata = await image.metadata();
|
||||
const stats = await image.stats();
|
||||
const width = metadata.width ?? 0;
|
||||
const height = metadata.height ?? 0;
|
||||
|
||||
await image.clone().resize({ width: 320, height: 320, fit: "inside" }).jpeg({ quality: 78 }).toFile(
|
||||
createDerivativePath(asset.id, "thumbs")
|
||||
);
|
||||
await image.clone().resize({ width: 960, height: 960, fit: "inside" }).jpeg({ quality: 84 }).toFile(
|
||||
createDerivativePath(asset.id, "previews")
|
||||
);
|
||||
await image.clone().resize({ width: 1920, height: 1920, fit: "inside" }).jpeg({ quality: 88 }).toFile(
|
||||
createDerivativePath(asset.id, "renders")
|
||||
);
|
||||
|
||||
const dominant = stats.dominant;
|
||||
const dominantColor = `#${[dominant.r, dominant.g, dominant.b]
|
||||
.map((value) => value.toString(16).padStart(2, "0"))
|
||||
.join("")}`;
|
||||
|
||||
return {
|
||||
thumbKey: publicKeyFor(asset.id, "thumbs"),
|
||||
previewKey: publicKeyFor(asset.id, "previews"),
|
||||
renderKey: publicKeyFor(asset.id, "renders"),
|
||||
width,
|
||||
height,
|
||||
orientation: computeOrientation(width, height),
|
||||
sha256,
|
||||
dominantColor,
|
||||
qualityFlags: {
|
||||
tooSmall: width < 800 || height < 800,
|
||||
lowContrast: stats.channels[0]?.stdev ? stats.channels[0].stdev < 12 : false
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
export const runWorkerOnce = async () => {
|
||||
const state = await fetchState();
|
||||
const queued = state.photoAssets.find((asset) => asset.processingStatus === "queued");
|
||||
if (!queued) {
|
||||
return { processed: false };
|
||||
}
|
||||
|
||||
try {
|
||||
const payload = await processAsset(queued);
|
||||
await notifyProcessed(queued.id, payload);
|
||||
return { processed: true, assetId: queued.id };
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : "Unknown processing error.";
|
||||
await notifyFailure(queued.id, message);
|
||||
return { processed: false, assetId: queued.id, error: message };
|
||||
}
|
||||
};
|
||||
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"extends": "../../tsconfig.base.json",
|
||||
"include": [
|
||||
"src"
|
||||
],
|
||||
"compilerOptions": {
|
||||
"lib": [
|
||||
"ES2022"
|
||||
],
|
||||
"allowImportingTsExtensions": true
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user