feat: first working version of new allocator
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
import { RemoteNodeRegistry } from '@nodarium/registry';
|
||||
import type {
|
||||
Graph,
|
||||
NodeDefinition,
|
||||
@@ -7,10 +8,9 @@ import type {
|
||||
SyncCache
|
||||
} from '@nodarium/types';
|
||||
import {
|
||||
concatEncodedArrays,
|
||||
createLogger,
|
||||
createWasmWrapper,
|
||||
encodeFloat,
|
||||
fastHashArrayBuffer,
|
||||
type PerformanceStore
|
||||
} from '@nodarium/utils';
|
||||
import type { RuntimeNode } from './types';
|
||||
@@ -18,6 +18,8 @@ import type { RuntimeNode } from './types';
|
||||
const log = createLogger('runtime-executor');
|
||||
log.mute();
|
||||
|
||||
const remoteRegistry = new RemoteNodeRegistry('');
|
||||
|
||||
function getValue(input: NodeInput, value?: unknown) {
|
||||
if (value === undefined && 'value' in input) {
|
||||
value = input.value;
|
||||
@@ -52,19 +54,30 @@ function getValue(input: NodeInput, value?: unknown) {
|
||||
return value;
|
||||
}
|
||||
|
||||
console.error({ input, value });
|
||||
throw new Error(`Unknown input type ${input.type}`);
|
||||
}
|
||||
|
||||
export class MemoryRuntimeExecutor implements RuntimeExecutor {
|
||||
private definitionMap: Map<string, NodeDefinition> = new Map();
|
||||
type Pointer = {
|
||||
start: number;
|
||||
end: number;
|
||||
};
|
||||
|
||||
private seed = Math.floor(Math.random() * 100000000);
|
||||
export class MemoryRuntimeExecutor implements RuntimeExecutor {
|
||||
private nodes: Map<
|
||||
string,
|
||||
{ definition: NodeDefinition; execute: (outputPos: number, args: number[]) => number }
|
||||
> = new Map();
|
||||
|
||||
private offset = 0;
|
||||
private memory = new WebAssembly.Memory({
|
||||
initial: 1024,
|
||||
maximum: 8192
|
||||
});
|
||||
|
||||
seed = 123123;
|
||||
|
||||
perf?: PerformanceStore;
|
||||
|
||||
private results: Record<string, Int32Array> = {};
|
||||
|
||||
constructor(
|
||||
private registry: NodeRegistry,
|
||||
public cache?: SyncCache<Int32Array>
|
||||
@@ -79,12 +92,20 @@ export class MemoryRuntimeExecutor implements RuntimeExecutor {
|
||||
|
||||
await this.registry.load(graph.nodes.map((node) => node.type));
|
||||
|
||||
const typeMap = new Map<string, NodeDefinition>();
|
||||
const typeMap = new Map<string, {
|
||||
definition: NodeDefinition;
|
||||
execute: (outputPos: number, args: number[]) => number;
|
||||
}>();
|
||||
for (const node of graph.nodes) {
|
||||
if (!typeMap.has(node.type)) {
|
||||
const type = this.registry.getNode(node.type);
|
||||
const buffer = await remoteRegistry.fetchArrayBuffer('nodes/' + node.type + '.wasm');
|
||||
const wrapper = createWasmWrapper(buffer, this.memory);
|
||||
if (type) {
|
||||
typeMap.set(node.type, type);
|
||||
typeMap.set(node.type, {
|
||||
definition: type,
|
||||
execute: wrapper.execute
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -93,7 +114,7 @@ export class MemoryRuntimeExecutor implements RuntimeExecutor {
|
||||
|
||||
private async addMetaData(graph: Graph) {
|
||||
// First, lets check if all nodes have a definition
|
||||
this.definitionMap = await this.getNodeDefinitions(graph);
|
||||
this.nodes = await this.getNodeDefinitions(graph);
|
||||
|
||||
const graphNodes = graph.nodes.map(node => {
|
||||
const n = node as RuntimeNode;
|
||||
@@ -145,16 +166,33 @@ export class MemoryRuntimeExecutor implements RuntimeExecutor {
|
||||
return [outputNode, nodes] as const;
|
||||
}
|
||||
|
||||
async execute(graph: Graph, settings: Record<string, unknown>) {
|
||||
this.perf?.addPoint('runtime');
|
||||
private writeToMemory(v: number | number[] | Int32Array) {
|
||||
let length = 1;
|
||||
const view = new Int32Array(this.memory.buffer);
|
||||
if (typeof v === 'number') {
|
||||
view[this.offset] = v;
|
||||
length = 1;
|
||||
} else {
|
||||
view.set(v, this.offset);
|
||||
length = v.length;
|
||||
}
|
||||
|
||||
let a = performance.now();
|
||||
const start = this.offset;
|
||||
const end = this.offset + length;
|
||||
|
||||
this.offset += length;
|
||||
|
||||
return {
|
||||
start,
|
||||
end
|
||||
};
|
||||
}
|
||||
|
||||
async execute(graph: Graph, _settings: Record<string, unknown>) {
|
||||
this.offset = 0;
|
||||
|
||||
// Then we add some metadata to the graph
|
||||
const [outputNode, nodes] = await this.addMetaData(graph);
|
||||
let b = performance.now();
|
||||
|
||||
this.perf?.addPoint('collect-metadata', b - a);
|
||||
|
||||
/*
|
||||
* Here we sort the nodes into buckets, which we then execute one by one
|
||||
@@ -173,113 +211,81 @@ export class MemoryRuntimeExecutor implements RuntimeExecutor {
|
||||
);
|
||||
|
||||
// here we store the intermediate results of the nodes
|
||||
this.results = {};
|
||||
|
||||
if (settings['randomSeed']) {
|
||||
this.seed = Math.floor(Math.random() * 100000000);
|
||||
}
|
||||
const results: Record<number, Pointer> = {};
|
||||
|
||||
for (const node of sortedNodes) {
|
||||
const node_type = this.definitionMap.get(node.type)!;
|
||||
const node_type = this.nodes.get(node.type)!;
|
||||
|
||||
console.log('EXECUTING NODE', node_type.definition.id);
|
||||
console.log(node_type.definition.inputs);
|
||||
const inputs = Object.entries(node_type.definition.inputs || {}).map(
|
||||
([key, input]) => {
|
||||
// We should probably initially write this to memory
|
||||
if (input.type === 'seed') {
|
||||
return this.writeToMemory(this.seed);
|
||||
}
|
||||
|
||||
// We should probably initially write this to memory
|
||||
// If the input is linked to a setting, we use that value
|
||||
// if (input.setting) {
|
||||
// return getValue(input, settings[input.setting]);
|
||||
// }
|
||||
|
||||
// check if the input is connected to another node
|
||||
const inputNode = node.state.inputNodes[key];
|
||||
if (inputNode) {
|
||||
if (results[inputNode.id] === undefined) {
|
||||
throw new Error(
|
||||
`Node ${node.type} is missing input from node ${inputNode.type}`
|
||||
);
|
||||
}
|
||||
return results[inputNode.id];
|
||||
}
|
||||
|
||||
// If the value is stored in the node itself, we use that value
|
||||
if (node.props?.[key] !== undefined) {
|
||||
return this.writeToMemory(getValue(input, node.props[key]));
|
||||
}
|
||||
|
||||
return this.writeToMemory(getValue(input));
|
||||
}
|
||||
);
|
||||
|
||||
if (!node_type || !node.state || !node_type.execute) {
|
||||
log.warn(`Node ${node.id} has no definition`);
|
||||
continue;
|
||||
}
|
||||
|
||||
a = performance.now();
|
||||
|
||||
// Collect the inputs for the node
|
||||
const inputs = Object.entries(node_type.inputs || {}).map(
|
||||
([key, input]) => {
|
||||
if (input.type === 'seed') {
|
||||
return this.seed;
|
||||
}
|
||||
|
||||
// If the input is linked to a setting, we use that value
|
||||
if (input.setting) {
|
||||
return getValue(input, settings[input.setting]);
|
||||
}
|
||||
|
||||
// check if the input is connected to another node
|
||||
const inputNode = node.state.inputNodes[key];
|
||||
if (inputNode) {
|
||||
if (this.results[inputNode.id] === undefined) {
|
||||
throw new Error(
|
||||
`Node ${node.type} is missing input from node ${inputNode.type}`
|
||||
);
|
||||
}
|
||||
return this.results[inputNode.id];
|
||||
}
|
||||
|
||||
// If the value is stored in the node itself, we use that value
|
||||
if (node.props?.[key] !== undefined) {
|
||||
return getValue(input, node.props[key]);
|
||||
}
|
||||
|
||||
return getValue(input);
|
||||
}
|
||||
);
|
||||
b = performance.now();
|
||||
|
||||
this.perf?.addPoint('collected-inputs', b - a);
|
||||
const args = inputs.map(s => [s.start, s.end]).flat();
|
||||
console.log('ARGS', args);
|
||||
|
||||
try {
|
||||
a = performance.now();
|
||||
const encoded_inputs = concatEncodedArrays(inputs);
|
||||
b = performance.now();
|
||||
this.perf?.addPoint('encoded-inputs', b - a);
|
||||
|
||||
a = performance.now();
|
||||
let inputHash = `node-${node.id}-${fastHashArrayBuffer(encoded_inputs)}`;
|
||||
b = performance.now();
|
||||
this.perf?.addPoint('hash-inputs', b - a);
|
||||
|
||||
let cachedValue = this.cache?.get(inputHash);
|
||||
if (cachedValue !== undefined) {
|
||||
log.log(`Using cached value for ${node_type.id || node.id}`);
|
||||
this.perf?.addPoint('cache-hit', 1);
|
||||
this.results[node.id] = cachedValue as Int32Array;
|
||||
continue;
|
||||
}
|
||||
this.perf?.addPoint('cache-hit', 0);
|
||||
|
||||
log.group(`executing ${node_type.id}-${node.id}`);
|
||||
log.log(`Inputs:`, inputs);
|
||||
a = performance.now();
|
||||
this.results[node.id] = node_type.execute(encoded_inputs);
|
||||
log.log('Executed', node.type, node.id);
|
||||
b = performance.now();
|
||||
|
||||
if (this.cache && node.id !== outputNode.id) {
|
||||
this.cache.set(inputHash, this.results[node.id]);
|
||||
}
|
||||
|
||||
this.perf?.addPoint('node/' + node_type.id, b - a);
|
||||
log.log('Result:', this.results[node.id]);
|
||||
log.groupEnd();
|
||||
const bytesWritten = node_type.execute(this.offset, args);
|
||||
results[node.id] = {
|
||||
start: this.offset,
|
||||
end: this.offset + bytesWritten
|
||||
};
|
||||
this.offset += bytesWritten;
|
||||
console.log('FINISHED EXECUTION', {
|
||||
bytesWritten,
|
||||
offset: this.offset
|
||||
});
|
||||
} catch (e) {
|
||||
log.groupEnd();
|
||||
log.error(`Error executing node ${node_type.id || node.id}`, e);
|
||||
console.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
// return the result of the parent of the output node
|
||||
const res = this.results[outputNode.id];
|
||||
const mem = new Int32Array(this.memory.buffer);
|
||||
console.log('OUT', mem.slice(0, 10));
|
||||
|
||||
if (this.cache) {
|
||||
this.cache.size = sortedNodes.length * 2;
|
||||
}
|
||||
// return the result of the parent of the output node
|
||||
const res = results[outputNode.id];
|
||||
|
||||
this.perf?.endPoint('runtime');
|
||||
|
||||
return res as unknown as Int32Array;
|
||||
}
|
||||
|
||||
getIntermediateResults() {
|
||||
return this.results;
|
||||
}
|
||||
|
||||
getPerformanceData() {
|
||||
return this.perf?.get();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user