Applying Golang concepts in Node.js

Posted by Dan Octavian on Monday, March 25, 2019

TOC

Applying Golang concepts in Node.js

Yeah, I think it’s… for a particular class of application, which is like, if you’re building a server, I can’t imagine using anything other than Go. That said, I think Node’s non-blocking paradigm worked out well for JavaScript, where you don’t have threads. And I think that a lot of the problems with kind of the call-back soup problem, where you have to jump into many anonymous functions to complete what you’re doing has been alleviated these days, with the async keyword, the async feature that’s in Javascript now. - Ryan Dahl interview

As recognized by the Node creator himself, the Golang paradigm of using so called goroutines that communicate through Go channels, is deemed superior for high performance servers that handle a very large number of concurrent connections.

This article proposes thinking in terms of coroutines when building node.js applications and potentially using inter-coroutine communication with constructs inspired by those found in Golang, Kotlin and Haskell (eg. channels), to build code that is easier to understand and debug. Supporting full code for this article is provided on github.

We don’t need any special library or framework. Under the presumption that we structure our code using async/await syntax, all we need is to simply conceptualize any async function call as running a new coroutine as such:

full code here

const axios = require('axios')
;(async () => {
  const r = await axios.get('https://www.etherchain.org/api/gasPriceOracle')
  console.log(`root coroutine Oracle answer: ${r.data.standard}`)
  
  ;(async () => {
    try {
      console.log('Running coroutine 0..')
      const r = await axios.get('https://www.etherchain.org/api/gasPriceOracle')
      console.log(`coroutine 0 Oracle answer: ${r.data.standard}`)
    } catch (e) {
      console.log(`coroutine 1 failed with ${e.stack}`)
    }
  })()
  
  ;(async () => {
    try {
      console.log('Running coroutine 1..')
      const r = await axios.get('https://www.etherchain.org/api/gasPriceOracle')
      console.log(`coroutine 1 Oracle answer: ${r.data.standard}`)
    } catch (e) {
      console.log(`coroutine 1 failed with ${e.stack}`)
    }
  })()
})().catch(e => console.log(`root coroutine failed with ${e.stack}`))

We recognize that calling an async function simply returns a promise, but now the async/await syntax allows us to easily represent in code a set of sequential instructions, be it synchronous or asynchronous, and we can now conceptualize this sequence as one “thread” of execution, which for the purpose of the article, we will refer to as a coroutine.

The case for structuring your code using coroutines was made by TJ Hollowaychuk before the creation of async/await in an excellent article accompanied by the co library for coroutines built on the generators functionality.

I find that reasoning about and structuring Node.js code in terms of coroutines at a high level instead of thinking in terms of promises and callbacks yields better quality code. Therefore, we’re going to take a look at how we can built on `async/await and “borrow” ideas from Golang to achieve better readability, code structure and easier debugging in production environments with continuation logging which we will analyze in the second part of this article.

Example use case: max N concurrent actions

Suppose you have a queue of crypto assets to be scanned for the latest USD pricing values, and that the API you are using allows N max concurrent requests (haven’t checked the actual rules for the API used here, this supposition is for argument’s sake).

Assume the queue is being filled at an arbitrary rate by the caller of addToQueue which is desired to be non-blocking (eg. it’s called as part of an HTTP request handler and wants to return the response as quickly as possible).

full code here

const MAX_CONCURRENT_REQUESTS = 3

async function fetchPrices(from, to) {
  const response = await axios.get(`https://min-api.cryptocompare.com/data/price?fsym=${from}&tsyms=${to}`)
  return response.data
}

const queue = []

function addToQueue(items) {
  queue.push(...items)
}

;(async () => {

  while (true) {
    const toBeScanned = []
    let next = queue.pop()
    while(next && toBeScanned.length < MAX_CONCURRENT_REQUESTS) {
      toBeScanned.push(next)
      next = queue.pop()
    }
    if (toBeScanned.length > 0) {
      await Promise.all(toBeScanned.map(async (target) => {
        const prices = await fetchPrices(target.from, target.to)
        console.log(`1 ${target.from} = ${JSON.stringify(prices)}`)
      }))
    } else {
      // yield to allow the queue to be filled
      await new Promise(resolve => setImmediate(resolve))
    }
  }
})().catch(e => {
  console.log(`FATAL: ${e.stack}`)
  process.exit(1)
})

This solution ensures that no more than MAX_CONCURRENT_REQUESTS are executed at any given time which fulfills the API requirement.

However, there’s a few caveats for this solution. One is that it busy-loops and checks the queue continously. A sleep can be added to slightly mitigate that but it slows down the processing by introducing artificial delay.

The other issue is because of the use of Promise.all the loop waits for all requests to finish before proceeding, thus the wait time is that of the slowest requests in the batch before moving forward to pick up another batch. This can be slightly mitigated with a timeout, but nonetheless the queue processing is sub-optimal.

To address these issues we can conceptualize the problem at a high level as a producer-consumer problem. There are 1 or more producers that submit items to be scanned to the queue. Since the API allows MAX_CONCURRENT_REQUESTS we can define N = MAX_CONCURRENT_REQUESTS consumers that consume items from the queue and each processes one item at a time.

This addresses the max concurrent requests restriction. Now in order to address the busy waiting let’s assume that the queue has a async read() method in its interface that blocks until items become available and a write()method that allows to write items in the queue. Let’s call this type an BufferedAsyncChan. Here’s a TypeScript interface for that for clarity’s sake only.

interface IBufferedAsyncChan<T> {
	write(v: T): void
	read(): Promise<T>
}

This how the code would look like if we replace the queue array with an instance of BufferedAsyncChan.

full code here


const bufferedAsyncChan = new BufferedAsyncChan()

function addToQueue(items) {
  for (let i = 0; i < items.length; i++) {
    bufferedAsyncChan.write(items[i])
  }
}

;(async () => {
  for (let i = 0; i < MAX_CONCURRENT_REQUESTS; i++) {

    // launch coroutine
    ;(async () => {
      while (true) {
        try {
          const target = await bufferedAsyncChan.read()
          const prices = await fetchPrices(target.from, target.to)
          console.log(`1 ${target.from} = ${JSON.stringify(prices)}`)
        } catch (e) {
          console.log(`Failed to process value with ${e.stack}`)
        }
      }
    })()
  }
})().catch(e => {
  console.log(`FATAL: ${e.stack}`)
  process.exit(1)
})

The other added bonus of this strategy is that you can implement an error handling and retry strategy around fetchPrices at the level of each coroutine that does not impact the other consumer coroutines.

This channel is similar to a buffered Golang channel, where the consumer blocks on read until items are available (note that the write here is non-blocking, we did not put an upper bound on the buffer for simplicity - in the case the of the Golang buffered channel the write blocks when max buffer capacity is reached)

How would you go about implementing BufferedAsyncChan ? here is one possible implementation:

full code here

class BufferedAsyncChan {
  // private queue Promise<T>[]
  // private resolverQueue: ((T) => void)[]
  // accumulatedValues: T[]
  constructor() {
    this.queue = [
      new Promise(resolve => {
        this.resolverQueue = [resolve]
      })
    ];
    this.accumulatedValues = []
  }

  async read() {
    if (this.accumulatedValues.length > 0) {
      const item = this.accumulatedValues.shift()
      return item
    } else {
      const queueHead = this.queue.shift()

      this.queue.push(
        new Promise(resolve => {
          this.resolverQueue.push(resolve)
        })
      )
      const v = await queueHead
      return v
    }
  }

  write(v) {
    if (this.resolverQueue.length > 1) {
      const resolveHead = this.resolverQueue.shift()
      resolveHead(v)
    } else {
      this.accumulatedValues.push(v)
    }
  }
}

It basically builds up an internal queue of Promises to be resolved once items are fed to the channel through the use of the write() method. When a Promise resolves, it unblocks the coroutine which was blocking on the read() call. If there are no consumers blocking for items, the values simply accumulate in an array when calling write().

On error handling

Note how the try/catch block is used here to keep the loop going in case failures inside the coroutine (failed values are not retried for simplicity). If we let exceptions bubble up to the top, they will cause a UnhandledPromiseRejectionWarning which is currently deprecated. Therefore all exceptions inside the coroutines if left uncaught will cause the Node.js process to fail in the future!

Therefore, we must make sure that all errors are caught by using a try catch block or a final .catch() on the return value of the async function call.

Important differences between Golang and Node.js

We took a look at Go as an inspiration, but let’s look at some critical differences between the Node.js coroutines we just described and the Golang goroutines.

It must be noted that the Golang runtime is different from that of Node.js in the sense that it has a user-space M:N scheduler that makes efficient use of multiple CPU cores. It uses OS threads (N threads) on a machine to distribute the goroutines (M goroutines). The goroutines are preemptive which means the scheduler takes care automatically of making sure each gets a fair share of the CPU (Go here for a good summary of that).

A Node.js uses one OS thread to run all events in its event loop. Therefore, it cannot take advantage of using multiple processor cores. Furthermore, our Node.js coroutines are cooperative and not preemptive. There is no scheduler to interrupt code execution, and the routine effectively yields to allow other routines to run only when you call await or when it terminates.

Note how in the first example we used the forced “yield” to prevent the infinite loop from blocking the node process, implemented as such:

      await new Promise(resolve => setImmediate(resolve))

Thus the problem of overly long CPU-bound tasks that can starve other events from being handled is still present of course.

Although at first glance the underlying single threaded implementation of Node.js looks inferior because of non-automatic utilization of all cores and the lack of pre-emption (see the funny piece of Ted Dziuba), we can argue in terms of debugging and code simplicity a Node.js process is easier to deal with.

For one, because of lack of pre-emption you can safely assume that a code section that does not perform await will be executed without being interrupted so in case of having to mutate a set of mutable variables, you do not need to concern yourself at all with locking around a critical code section to prevent corrupting the shared mutable structures (an array or linked list for example).