gx
chenyc
2025-02-12 ea42ff3ebee1eeb3fb29423aa848a249441db81c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
 * FaceAPI Demo for NodeJS
 * - Starts multiple worker processes and uses them as worker pool to process all input images
 * - Images are enumerated in main process and sent for processing to worker processes via ipc
 */
 
const fs = require('fs');
const path = require('path');
const log = require('@vladmandic/pilogger'); // this is my simple logger with few extra features
const child_process = require('child_process');
// note that main process does not need to import faceapi or tfjs at all as processing is done in a worker process
 
const imgPathRoot = './demo'; // modify to include your sample images
const numWorkers = 4; // how many workers will be started
const workers = []; // this holds worker processes
const images = []; // this holds queue of enumerated images
const t = []; // timers
let numImages;
 
// trigered by main when worker sends ready message
// if image pool is empty, signal worker to exit otherwise dispatch image to worker and remove image from queue
async function detect(worker) {
  if (!t[2]) t[2] = process.hrtime.bigint(); // first time do a timestamp so we can measure initial latency
  if (images.length === numImages) worker.send({ test: true }); // for first image in queue just measure latency
  if (images.length === 0) worker.send({ exit: true }); // nothing left in queue
  else {
    log.state('Main: dispatching to worker:', worker.pid);
    worker.send({ image: images[0] });
    images.shift();
  }
}
 
// loop that waits for all workers to complete
function waitCompletion() {
  const activeWorkers = workers.reduce((any, worker) => (any += worker.connected ? 1 : 0), 0);
  if (activeWorkers > 0) setImmediate(() => waitCompletion());
  else {
    t[1] = process.hrtime.bigint();
    log.info('Processed:', numImages, 'images in', 'total:', Math.trunc(Number(t[1] - t[0]) / 1000000), 'ms', 'working:', Math.trunc(Number(t[1] - t[2]) / 1000000), 'ms', 'average:', Math.trunc(Number(t[1] - t[2]) / numImages / 1000000), 'ms');
  }
}
 
function measureLatency() {
  t[3] = process.hrtime.bigint();
  const latencyInitialization = Math.trunc(Number(t[2] - t[0]) / 1000 / 1000);
  const latencyRoundTrip = Math.trunc(Number(t[3] - t[2]) / 1000 / 1000);
  log.info('Latency: worker initializtion: ', latencyInitialization, 'message round trip:', latencyRoundTrip);
}
 
async function main() {
  log.header();
  log.info('FaceAPI multi-process test');
 
  // enumerate all images into queue
  const dir = fs.readdirSync(imgPathRoot);
  for (const imgFile of dir) {
    if (imgFile.toLocaleLowerCase().endsWith('.jpg')) images.push(path.join(imgPathRoot, imgFile));
  }
  numImages = images.length;
 
  t[0] = process.hrtime.bigint();
  // manage worker processes
  for (let i = 0; i < numWorkers; i++) {
    // create worker process
    workers[i] = await child_process.fork('demo/node-multiprocess-worker.js', ['special']);
    // parse message that worker process sends back to main
    // if message is ready, dispatch next image in queue
    // if message is processing result, just print how many faces were detected
    // otherwise it's an unknown message
    workers[i].on('message', (msg) => {
      if (msg.ready) detect(workers[i]);
      else if (msg.image) log.data('Main: worker finished:', workers[i].pid, 'detected faces:', msg.detected.length);
      else if (msg.test) measureLatency();
      else log.data('Main: worker message:', workers[i].pid, msg);
    });
    // just log when worker exits
    workers[i].on('exit', (msg) => log.state('Main: worker exit:', workers[i].pid, msg));
    // just log which worker was started
    log.state('Main: started worker:', workers[i].pid);
  }
 
  // wait for all workers to complete
  waitCompletion();
}
 
main();