>

Ways to use promises - Part 2

_
May 19, 20236 min read

In the first part, I covered some JS utilities that integrate naturally with promises, in this part I will focus on more advanced patterns and use cases that can involve promises:

  1. Promises pool
  2. Worker threads
  3. Observer pattern

Promises pool

Promises pool illustration

Sometimes you have to perform a very large or infinite number of tasks.

  1. The naive solution is to handle them one by one sequentially. But that's not a good utilization of the resources if the tasks can be processed asynchronously (e.g IO operations).
  2. On the other hand, making thousands of events in the event loop and bombing the IO is not a good idea either.

Promises pool is a good solution that enables a balance between resource utilization and over-concurrency.

Possible use-cases:

  • Iterate files recursively.
  • Handle messages from a queue or stream.
  • Parallel network calls.

Let's see how it works with a dummy example:

import { setTimeout } from "timers/promises";

async function runPromisesPool(tasksIterator, handler, poolSize) {
  return Promise.all(
    Array(poolSize)
      .fill(undefined)
      .map(async (_, index) => {
        for await (const task of tasksIterator) {
          await handler(task, index);
        }
      }),
  );
}

const timeoutsIterator = (function* () {
  const startTime = Date.now();
  while (true) yield setTimeout(1000, Date.now() - startTime);
})();

runPromisesPool(timeoutsIterator, console.log, 3);

The output is:

0 0
0 1
0 2
1007 0
1008 1
1008 2
2009 0
2010 1
2011 2

At first sight, it looks like some fancy hard-core JS code, but it is a template that is easy to take to any use case. We give X promises to iterate over the same iterator and handle the results.

The fixed part is runPromisesPool. You can play with its parameters to support your use case.

Line by line explanation

You can skip this part if you don't want a more detailed explanation.

runPromisesPool receives three parameters:

  • taskIterator - Iterator with tasks to handle.
  • handler - Function to handle the tasks.
  • poolSize - Amount of promises that will handle the tasks.

With the Promise.all utility we create a promise that waits for every promise in the pool to complete.

We initialize the pool with Array(poolSize).fill(undefined), and then we map each undefined to a promise that iterates the iterator, and handle the tasks.

We do that with the following callback async function:

async () => {
  for await (const task of tasksIterator) {
    await handler(task);
  }
};

The other parts can change according to your needs.

Note on Iterator and Iterable

Make sure you pass an Iterator and not Iterable. Because when one worker reaches the next iteration (next() under the hood), it proceeds to the next item for all workers.

For example, Array and Map are Iterable and not an Iterator. To create an iterator out of Iterable, use the Symbol.iterator function. Here is an example:

const a = [1, 2, 3];
const iterator = a[Symbol.iterator]();
console.log(iterator); // Object [Array Iterator] {}
console.log(iterator.next()); // { value: 1, done: false }
for (const x of iterator) console.log(x); // 2 3
console.log(iterator.next()); // { value: undefined, done: true }

Worker Threads

Quick recap on how JavaScript works... The JavaScript runtime runs on a single thread. But what happens with non-blocking operations like IO? The engine creates a thread and proceeds to the next event in the loop. The event loop triggers the callback once the non-blocking operation finishes its work.

But what happens if we have blocking operations or CPU-intensive work? The subsequent events starve because the event loop thread is occupied with some long-running event.

"Blocking is when the execution of additional JavaScript in the Node.js process must wait until a non-JavaScript operation completes. This happens because the event loop is unable to continue running JavaScript while a blocking operation is occurring."

Overview of Blocking vs Non-Blocking | Node.js

It may be an issue if responsiveness is required. API that works over HTTP is a simple example: Every request must respond before a timeout occurs, and a blocking event will not give others a chance to complete in time.

Dummy example for blocking the CPU:

new Promise((resolve) => {
  for (let i = 0; i < 20_000_000_000; i++);
  resolve();
});
new Promise((resolve) => {
  console.log("I am waiting here for too long...");
  resolve();
});

In that case, you can forcibly run a new thread for the CPU-intensive task.

Workers (threads) are useful for performing CPU-intensive JavaScript operations. They do not help much with I/O-intensive work. The Node.js built-in asynchronous I/O operations are more efficient than Workers can be.

Worker threads | Node.js

Let's see how we do it and how it works with promises!

This is the script that will run in a new thread:

// worker-thread-child.mjs

import { workerData, parentPort } from "worker_threads";

console.log("🐯", "posting a message to parent...");
parentPort.postMessage(`Hi ${workerData}`);

This is the script that runs the new thread:

import { Worker } from "worker_threads";

console.log("🐅", "Running a new worker thread...");

const result = await new Promise((resolve) => {
  new Worker("./worker-thread-child.mjs", { workerData: "John" }).on(
    "message",
    resolve,
  );
});

console.log("🐅", `Worker thread answered "${result}"`);

The result:

🐅 Running a new worker thread...
🐯 posting a message to parent...
🐅 Worker thread answered "Hi John"
  • A new thread spawns when calling new Worker.
  • You can pass data to the new thread through the workerData option.
  • The on function creates listeners for the thread from the parent.
  • The message listener is invoked when the child posts a message.
  • You can post a message with parentPort.postMessage.

Observer pattern

Observer pattern illustration

This pattern is common in frontend applications where you want someplace in the code to get notified when a state changes. It decouples the state management from other logic that depends on the state updates.

It is also common to see a similar pattern in event-driven architectures with pub/sub messaging (e.g AWS SNS, GCP Pub/Sub, Kafka). But that's another topic.

Let's see a dummy example that uses promises:

Note: this is a single-use observer. There are better ways to do that. However, I think it is interesting to see how promises are in use here.

import { setTimeout } from "timers/promises";

const startTime = Date.now();

class LazyHelloSubject {
  #observers = [];
  #result;

  async #setTimer() {
    this.#result = null;
    console.log(Date.now() - startTime, "Starting the timer...");
    await setTimeout(3000);
    this.#result = "Hello";

    while (this.#observers.length > 0) {
      this.#observers.pop().resolve(this.#result);
    }
  }

  async getHello() {
    if (this.#result === undefined) await this.#setTimer();
    if (this.#result) return this.#result;
    return new Promise((resolve) => this.#observers.push({ resolve }));
  }
}

const subject = new LazyHelloSubject();

subject
  .getHello()
  .then((result) => console.log(Date.now() - startTime, result, "event 1"));
subject
  .getHello()
  .then((result) => console.log(Date.now() - startTime, result, "event 2"));
const result = await subject.getHello();
console.log(Date.now() - startTime, result, "event 3");

The result of the above:

0 Starting the timer...
3009 Hello event 3
3009 Hello event 2
3009 Hello event 1

Conclusion

In this part, we saw more advanced examples of how to use promises in various ways. It is a tool for solving concurrency problems in performance and code design.

I hope you feel more comfortable with promises after reading this!