feat: move registry and runtime into separate packages
All checks were successful
Deploy to GitHub Pages / build_site (push) Successful in 2m32s

This commit is contained in:
2024-05-05 15:11:53 +02:00
parent f4853821d4
commit a01a409b97
29 changed files with 205 additions and 156 deletions

View File

@ -0,0 +1,17 @@
{
"name": "@nodes/registry",
"version": "0.0.0",
"description": "",
"main": "src/index.ts",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"@nodes/types": "link:../types",
"@nodes/utils": "link:../utils",
"idb": "^8.0.0"
}
}

View File

@ -0,0 +1,2 @@
export * from "./node-registry-cache";
export * from "./node-registry-client";

View File

@ -0,0 +1,38 @@
import type { AsyncCache } from '@nodes/types';
import { openDB, type IDBPDatabase } from 'idb';
export class IndexDBCache implements AsyncCache<ArrayBuffer> {
size: number = 100;
db: Promise<IDBPDatabase<ArrayBuffer>>;
private _cache = new Map<string, ArrayBuffer>();
constructor(id: string) {
this.db = openDB<ArrayBuffer>('cache/' + id, 1, {
upgrade(db) {
db.createObjectStore('keyval');
},
});
}
async get(key: string) {
let res = this._cache.get(key);
if (!res) {
res = await (await this.db).get('keyval', key);
}
if (res) {
this._cache.set(key, res);
}
return res;
}
async set(key: string, value: ArrayBuffer) {
this._cache.set(key, value);
const db = await this.db;
await db.put('keyval', value, key);
}
clear() {
this.db.then(db => db.clear('keyval'));
}
}

View File

@ -0,0 +1,125 @@
import { type NodeRegistry, type NodeDefinition, NodeDefinitionSchema, type AsyncCache } from "@nodes/types";
import { createWasmWrapper, createLogger } from "@nodes/utils";
const log = createLogger("node-registry");
log.mute();
export class RemoteNodeRegistry implements NodeRegistry {
status: "loading" | "ready" | "error" = "loading";
private nodes: Map<string, NodeDefinition> = new Map();
cache?: AsyncCache<ArrayBuffer>;
fetch: typeof fetch = globalThis.fetch.bind(globalThis);
constructor(private url: string) { }
async fetchUsers() {
const response = await this.fetch(`${this.url}/nodes/users.json`);
if (!response.ok) {
throw new Error(`Failed to load users`);
}
return response.json();
}
async fetchUser(userId: `${string}`) {
const response = await this.fetch(`${this.url}/nodes/${userId}.json`);
if (!response.ok) {
throw new Error(`Failed to load user ${userId}`);
}
return response.json();
}
async fetchCollection(userCollectionId: `${string}/${string}`) {
const response = await this.fetch(`${this.url}/nodes/${userCollectionId}.json`);
if (!response.ok) {
throw new Error(`Failed to load collection ${userCollectionId}`);
}
return response.json();
}
async fetchNodeDefinition(nodeId: `${string}/${string}/${string}`) {
const response = await this.fetch(`${this.url}/nodes/${nodeId}.json`);
if (!response.ok) {
throw new Error(`Failed to load node definition ${nodeId}`);
}
return response.json()
}
private async fetchNodeWasm(nodeId: `${string}/${string}/${string}`) {
const response = await this.fetch(`${this.url}/nodes/${nodeId}.wasm`);
if (!response.ok) {
if (this.cache) {
let value = await this.cache.get(nodeId);
if (value) {
return value;
}
}
throw new Error(`Failed to load node wasm ${nodeId}`);
}
return response.arrayBuffer();
}
async load(nodeIds: `${string}/${string}/${string}`[]) {
const a = performance.now();
const nodes = await Promise.all([...new Set(nodeIds).values()].map(async id => {
if (this.nodes.has(id)) {
return this.nodes.get(id)!;
}
const wasmBuffer = await this.fetchNodeWasm(id);
return this.register(wasmBuffer);
}));
const duration = performance.now() - a;
log.group("loaded nodes in", duration, "ms");
log.info(nodeIds);
log.info(nodes);
log.groupEnd();
this.status = "ready";
return nodes
}
async register(wasmBuffer: ArrayBuffer) {
const wrapper = createWasmWrapper(wasmBuffer);
const definition = NodeDefinitionSchema.safeParse(wrapper.get_definition());
if (definition.error) {
console.error(definition.error);
throw definition.error;
}
if (this.cache) {
await this.cache.set(definition.data.id, wasmBuffer);
}
let node = {
...definition.data,
execute: wrapper.execute
}
this.nodes.set(definition.data.id, node);
return node;
}
getNode(id: string) {
return this.nodes.get(id);
}
getAllNodes() {
return [...this.nodes.values()];
}
}

View File

@ -0,0 +1,21 @@
{
"name": "@nodes/runtime",
"version": "0.0.0",
"description": "",
"main": "src/index.ts",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"@nodes/registry": "link:../registry",
"@nodes/types": "link:../types",
"@nodes/utils": "link:../utils"
},
"devDependencies": {
"comlink": "^4.4.1",
"vite-plugin-comlink": "^4.0.3"
}
}

View File

@ -0,0 +1,3 @@
export * from "./runtime-executor"
export * from "./runtime-executor-cache"
export * from "./worker-runtime-executor"

View File

@ -0,0 +1,18 @@
import type { Graph, RuntimeExecutor } from "@nodes/types";
export class RemoteRuntimeExecutor implements RuntimeExecutor {
constructor(private url: string) { }
async execute(graph: Graph, settings: Record<string, any>): Promise<Int32Array> {
const res = await fetch(this.url, { method: "POST", body: JSON.stringify({ graph, settings }) });
if (!res.ok) {
throw new Error(`Failed to execute graph`);
}
return new Int32Array(await res.arrayBuffer());
}
}

View File

@ -0,0 +1,19 @@
import { type SyncCache } from "@nodes/types";
export class MemoryRuntimeCache implements SyncCache {
private cache: [string, unknown][] = [];
size = 50;
get<T>(key: string): T | undefined {
return this.cache.find(([k]) => k === key)?.[1] as T;
}
set<T>(key: string, value: T): void {
this.cache.push([key, value]);
this.cache = this.cache.slice(-this.size);
}
clear(): void {
this.cache = [];
}
}

View File

@ -0,0 +1,263 @@
import type { Graph, NodeRegistry, NodeDefinition, RuntimeExecutor, NodeInput } from "@nodes/types";
import { concatEncodedArrays, encodeFloat, fastHashArrayBuffer, createLogger, type PerformanceStore } from "@nodes/utils"
import type { SyncCache } from "@nodes/types";
const log = createLogger("runtime-executor");
log.mute()
function getValue(input: NodeInput, value?: unknown) {
if (value === undefined && "value" in input) {
value = input.value
}
if (input.type === "float") {
return encodeFloat(value as number);
}
if (Array.isArray(value)) {
if (input.type === "vec3") {
return [0, value.length + 1, ...value.map(v => encodeFloat(v)), 1, 1] as number[];
}
return [0, value.length + 1, ...value, 1, 1] as number[];
}
if (typeof value === "boolean") {
return value ? 1 : 0;
}
if (typeof value === "number") {
return value;
}
if (value instanceof Int32Array) {
return value;
}
throw new Error(`Unknown input type ${input.type}`);
}
export class MemoryRuntimeExecutor implements RuntimeExecutor {
private definitionMap: Map<string, NodeDefinition> = new Map();
private randomSeed = Math.floor(Math.random() * 100000000);
perf?: PerformanceStore;
constructor(private registry: NodeRegistry, private cache?: SyncCache<Int32Array>) { }
private async getNodeDefinitions(graph: Graph) {
if (this.registry.status !== "ready") {
throw new Error("Node registry is not ready");
}
await this.registry.load(graph.nodes.map(node => node.type));
const typeMap = new Map<string, NodeDefinition>();
for (const node of graph.nodes) {
if (!typeMap.has(node.type)) {
const type = this.registry.getNode(node.type);
if (type) {
typeMap.set(node.type, type);
}
}
}
return typeMap;
}
private async addMetaData(graph: Graph) {
// First, lets check if all nodes have a definition
this.definitionMap = await this.getNodeDefinitions(graph);
const outputNode = graph.nodes.find(node => node.type.endsWith("/output"));
if (!outputNode) {
throw new Error("No output node found");
}
outputNode.tmp = outputNode.tmp || {};
outputNode.tmp.depth = 0;
const nodeMap = new Map(graph.nodes.map(node => [node.id, node]));
// loop through all edges and assign the parent and child nodes to each node
for (const edge of graph.edges) {
const [parentId, _parentOutput, childId, childInput] = edge;
const parent = nodeMap.get(parentId);
const child = nodeMap.get(childId);
if (parent && child) {
parent.tmp = parent.tmp || {};
parent.tmp.children = parent.tmp.children || [];
parent.tmp.children.push(child);
child.tmp = child.tmp || {};
child.tmp.parents = child.tmp.parents || [];
child.tmp.parents.push(parent);
child.tmp.inputNodes = child.tmp.inputNodes || {};
child.tmp.inputNodes[childInput] = parent;
}
}
const nodes = []
// loop through all the nodes and assign each nodes its depth
const stack = [outputNode];
while (stack.length) {
const node = stack.pop();
if (!node) continue;
node.tmp = node.tmp || {};
if (node?.tmp?.depth === undefined) {
node.tmp.depth = 0;
}
if (node?.tmp?.parents !== undefined) {
for (const parent of node.tmp.parents) {
parent.tmp = parent.tmp || {};
if (parent.tmp?.depth === undefined) {
parent.tmp.depth = node.tmp.depth + 1;
stack.push(parent);
} else {
parent.tmp.depth = Math.max(parent.tmp.depth, node.tmp.depth + 1);
}
}
}
nodes.push(node);
}
return [outputNode, nodes] as const;
}
async execute(graph: Graph, settings: Record<string, unknown>) {
this.perf?.addPoint("runtime");
let a = performance.now();
// Then we add some metadata to the graph
const [outputNode, nodes] = await this.addMetaData(graph);
let b = performance.now();
this.perf?.addPoint("collect-metadata", b - a);
/*
* Here we sort the nodes into buckets, which we then execute one by one
* +-b2-+-b1-+---b0---+
* | | | |
* | n3 | n2 | Output |
* | n6 | n4 | Level |
* | | n5 | |
* | | | |
* +----+----+--------+
*/
// we execute the nodes from the bottom up
const sortedNodes = nodes.sort((a, b) => (b.tmp?.depth || 0) - (a.tmp?.depth || 0));
// here we store the intermediate results of the nodes
const results: Record<string, Int32Array> = {};
for (const node of sortedNodes) {
const node_type = this.definitionMap.get(node.type)!;
if (!node_type || !node.tmp || !node_type.execute) {
log.warn(`Node ${node.id} has no definition`);
continue;
};
a = performance.now();
// Collect the inputs for the node
const inputs = Object.entries(node_type.inputs || {}).map(([key, input]) => {
if (input.type === "seed") {
if (settings["randomSeed"] === true) {
return Math.floor(Math.random() * 100000000)
} else {
return this.randomSeed
}
}
// If the input is linked to a setting, we use that value
if (input.setting) {
return getValue(input, settings[input.setting]);
}
// check if the input is connected to another node
const inputNode = node.tmp?.inputNodes?.[key];
if (inputNode) {
if (results[inputNode.id] === undefined) {
throw new Error("Input node has no result");
}
return results[inputNode.id];
}
// If the value is stored in the node itself, we use that value
if (node.props?.[key] !== undefined) {
return getValue(input, node.props[key]);
}
return getValue(input);
});
b = performance.now();
this.perf?.addPoint("collected-inputs", b - a);
try {
a = performance.now();
const encoded_inputs = concatEncodedArrays(inputs);
b = performance.now();
this.perf?.addPoint("encoded-inputs", b - a);
a = performance.now();
let inputHash = `node-${node.id}-${fastHashArrayBuffer(encoded_inputs)}`;
b = performance.now();
this.perf?.addPoint("hash-inputs", b - a);
let cachedValue = this.cache?.get(inputHash);
if (cachedValue !== undefined) {
log.log(`Using cached value for ${node_type.id || node.id}`);
this.perf?.addPoint("cache-hit", 1);
results[node.id] = cachedValue as Int32Array;
continue;
}
this.perf?.addPoint("cache-hit", 0);
log.group(`executing ${node_type.id || node.id}`);
log.log(`Inputs:`, inputs);
a = performance.now();
results[node.id] = node_type.execute(encoded_inputs);
b = performance.now();
if (this.cache) {
this.cache.set(inputHash, results[node.id]);
}
this.perf?.addPoint("node/" + node_type.id, b - a);
log.log("Result:", results[node.id]);
log.groupEnd();
} catch (e) {
log.groupEnd();
log.error(`Error executing node ${node_type.id || node.id}`, e);
}
}
// return the result of the parent of the output node
const res = results[outputNode.id];
if (this.cache) {
this.cache.size = sortedNodes.length * 2;
}
this.perf?.endPoint("runtime");
return res as unknown as Int32Array;
}
getPerformanceData() {
return this.perf?.get();
}
}

View File

@ -0,0 +1,26 @@
import { MemoryRuntimeExecutor } from "./runtime-executor";
import { RemoteNodeRegistry, IndexDBCache } from "@nodes/registry";
import type { Graph } from "@nodes/types";
import { createPerformanceStore } from "@nodes/utils";
import { MemoryRuntimeCache } from "./runtime-executor-cache";
const cache = new MemoryRuntimeCache();
const indexDbCache = new IndexDBCache("node-registry");
const nodeRegistry = new RemoteNodeRegistry("");
nodeRegistry.cache = indexDbCache;
const executor = new MemoryRuntimeExecutor(nodeRegistry, cache);
const performanceStore = createPerformanceStore("worker");
executor.perf = performanceStore;
export async function executeGraph(graph: Graph, settings: Record<string, unknown>): Promise<Int32Array> {
await nodeRegistry.load(graph.nodes.map((n) => n.type));
performanceStore.startRun();
let res = await executor.execute(graph, settings);
performanceStore.stopRun();
return res;
}
export function getPerformanceData() {
return performanceStore.get();
}

View File

@ -0,0 +1,16 @@
/// <reference types="vite-plugin-comlink/client" />
import type { Graph, RuntimeExecutor } from "@nodes/types";
export class WorkerRuntimeExecutor implements RuntimeExecutor {
private worker = new ComlinkWorker<typeof import('./worker-runtime-executor-backend.ts')>(new URL("./worker-runtime-executor-backend.ts", import.meta.url));
constructor() {
}
async execute(graph: Graph, settings: Record<string, unknown>) {
return this.worker.executeGraph(graph, settings);
}
async getPerformanceData() {
return this.worker.getPerformanceData();
}
}

View File

@ -1,6 +1,6 @@
{
"name": "@nodes/types",
"version": "1.0.0",
"version": "0.0.0",
"description": "",
"main": "src/index.ts",
"exports": {

View File

@ -1,8 +1,6 @@
import { Graph, NodeDefinition, NodeId } from "./types";
export interface NodeRegistry {
/**
* The status of the node registry
* @remarks The status should be "loading" when the registry is loading, "ready" when the registry is ready, and "error" if an error occurred while loading the registry
@ -36,7 +34,7 @@ export interface NodeRegistry {
register: (wasmBuffer: ArrayBuffer) => Promise<NodeDefinition>;
cache?: RuntimeCache<ArrayBuffer>;
cache?: AsyncCache<ArrayBuffer>;
}
export interface RuntimeExecutor {
@ -48,8 +46,7 @@ export interface RuntimeExecutor {
execute: (graph: Graph, settings: Record<string, unknown>) => Promise<Int32Array>;
}
export interface RuntimeCache<T = unknown> {
export interface SyncCache<T = unknown> {
/**
* The maximum number of items that can be stored in the cache
* @remarks When the cache size exceeds this value, the oldest items should be removed
@ -61,16 +58,41 @@ export interface RuntimeCache<T = unknown> {
* @param key - The key to get the value for
* @returns The value for the given key, or undefined if no such value exists
*/
get: (key: string) => T | Promise<T> | undefined;
get: (key: string) => T | undefined;
/**
* Set the value for the given key
* @param key - The key to set the value for
* @param value - The value to set
*/
set: (key: string, value: T) => void | Promise<void>;
set: (key: string, value: T) => void;
/**
* Clear the cache
*/
clear: () => void;
}
export interface AsyncCache<T = unknown> {
/**
* The maximum number of items that can be stored in the cache
* @remarks When the cache size exceeds this value, the oldest items should be removed
*/
size: number;
/**
* Get the value for the given key
* @param key - The key to get the value for
* @returns The value for the given key, or undefined if no such value exists
*/
get: (key: string) => Promise<T | undefined>;
/**
* Set the value for the given key
* @param key - The key to set the value for
* @param value - The value to set
*/
set: (key: string, value: T) => Promise<void>;
/**
* Clear the cache
*/
clear: () => void;
}

View File

@ -1,5 +1,5 @@
export type { NodeInput } from "./inputs";
export type { NodeRegistry, RuntimeExecutor, RuntimeCache } from "./components";
export type { NodeRegistry, RuntimeExecutor, SyncCache, AsyncCache } from "./components";
export type { Node, NodeDefinition, Socket, NodeId, Edge, Graph } from "./types";
export { NodeSchema, GraphSchema } from "./types";
export { NodeDefinitionSchema } from "./types";

View File

@ -1,6 +1,6 @@
{
"name": "@nodes/utils",
"version": "1.0.0",
"version": "0.0.1",
"description": "",
"main": "src/index.ts",
"scripts": {

View File

@ -2,3 +2,5 @@ export * from "./wasm-wrapper";
export * from "./flatTree"
export * from "./encoding"
export * from "./fastHash"
export * from "./logger"
export * from "./performance"

View File

@ -0,0 +1,29 @@
export const createLogger = (() => {
let maxLength = 5;
return (scope: string) => {
maxLength = Math.max(maxLength, scope.length);
let muted = false;
let isGrouped = false;
function s(color: string, ...args: any) {
return isGrouped ? [...args] : [`[%c${scope.padEnd(maxLength, " ")}]:`, `color: ${color}`, ...args];
}
return {
log: (...args: any[]) => !muted && console.log(...s("#888", ...args)),
info: (...args: any[]) => !muted && console.info(...s("#888", ...args)),
warn: (...args: any[]) => !muted && console.warn(...s("#888", ...args)),
error: (...args: any[]) => console.error(...s("#f88", ...args)),
group: (...args: any[]) => { if (!muted) { console.groupCollapsed(...s("#888", ...args)); isGrouped = true; } },
groupEnd: () => { if (!muted) { console.groupEnd(); isGrouped = false } },
mute() {
muted = true;
},
unmute() {
muted = false;
}
}
}
})();

View File

@ -0,0 +1,103 @@
export type PerformanceData = Record<string, number[]>[];
export interface PerformanceStore {
startRun(): void;
stopRun(): void;
addPoint(name: string, value?: number): void;
endPoint(name?: string): void;
mergeData(data: PerformanceData[number]): void;
get: () => PerformanceData;
subscribe: (cb: (v: PerformanceData) => void) => () => void;
}
export function createPerformanceStore(): PerformanceStore {
let data: PerformanceData = [];
let currentRun: Record<string, number[]> | undefined;
let temp: Record<string, number> | undefined;
let lastPoint: string | undefined;
const listeners: ((v: PerformanceData) => void)[] = [];
function subscribe(cb: (v: PerformanceData) => void) {
listeners.push(cb);
return () => {
const i = listeners.indexOf(cb);
if (i > -1) listeners.splice(i, 1);
}
}
function set(v: PerformanceData) {
listeners.forEach((l) => l(v));
}
function startRun() {
if (currentRun) return;
currentRun = {};
lastPoint = undefined;
temp = {
start: performance.now()
}
}
function stopRun() {
if (currentRun && temp) {
currentRun["total"] = [performance.now() - temp.start];
data.push(currentRun);
data = data.slice(-100);
currentRun = undefined;
temp = undefined;
if (set) set(data);
}
}
function addPoint(name: string, value?: number) {
if (!currentRun) return;
if (value === undefined) {
if (temp) {
lastPoint = name;
temp[name] = performance.now();
}
} else {
currentRun[name] = currentRun[name] || [];
currentRun[name].push(value);
}
}
function get() {
return data;
}
function mergeData(newData: PerformanceData[number]) {
let r = currentRun;
if (!r) return;
Object.keys(newData).forEach((name) => {
if (name in r) {
r[name].push(...newData[name]);
} else {
r[name] = newData[name];
}
});
}
function endPoint(name = lastPoint) {
if (name === lastPoint) lastPoint = undefined;
if (name && currentRun && temp && name in temp) {
currentRun[name] = currentRun[name] || [];
currentRun[name].push(performance.now() - temp[name]);
delete temp[name];
}
}
return {
subscribe,
startRun,
stopRun,
addPoint,
endPoint,
mergeData,
get
}
}