Identity Gateway - Handling blocking operations

With latest Identity Gateway releases, the asynchronous programming pattern must be adopted in scriptable filters. Not doing so will definitely hit at the server performances. This has already been discussed at length in Identity Gateway 7.0: Async Programming 101 and Identity Gateway 7.1 Highway to Async Programming.

There are though some services that does not offer an asynchronous API, or some lengthy processing may be required. First of all, if that processing is really long, then likely it should not be handled by Identity Gateway. On the other hand, the processing may require accessing an external RDB, and JDBC does not naturally offers an asynchronous interface. This article shows how a lengthy, or blocking operation can be handled asynchronously in a scriptable filter.

The executor service

Identity Gateway exposed the ScheduledExecutorService, which allows to schedule tasks on a schedule, or immediately. A tasks run off one of the threads in the thread pool, where the number of threads is specified by the corePoolSize parameter. So let’s look at the configuration, and how this can be used in a ScriptableFilter to handle blocking operations.

Configuring the ScheduledExecutor service

The service is described at ScheduledExecutorService. Here is an example of configuration, in the heap defined in config.json:

"heap": [
        {
            "name": "ScheduledExecutorService-1",
            "type": "ScheduledExecutorService",
            "config": {
              "corePoolSize": 12
            }
        },

Using the Schedules Executor service in a scriptable filter

Let’s configure a ScriptableFilter, and pass the executor service into the script bindings:

{
  "name": "101-test",
  "condition": "${find(request.uri.path, '^/api/.+/?')}",
  "handler": {
      "type" : "Chain",
      "config" : {
        "filters" : [
          {
            "type": "ScriptableFilter",
            "config": {
              "type": "application/x-groovy",
              "file": "blockingTest.groovy",
              "args": {
                "executor" : "${heap['ScheduledExecutorService-1']}"
              }
            }
          }
        ],
        "handler" : "ReverseProxyHandler"
      }
  }
}

A reference to a heap object is obtained with the construct ${heap[$name]}, so here the name of the service as configured is ScheduledExecutorService-1 and therefore, it can be accessed with : ${heap["ScheduledExecutorService-1"]}. The configuration above indicates that the object will be made available with the executor variable binding.

In the script then, let’s encapsulate the blocking call into the executor runnable, and use a PromiseImpl to manage the asynchronous call, like this:

def runBlockingCall() {
     def promise = PromiseImpl.create()
     executor.execute {
         // Blocking operation
         ...
         // Set promise completion with a result
         promise.handleResult(someValue)
     }
    return promise
}

First, this instantiates a PromiseImpl, then invokes the executor with a runnable (which is a groovy closure). This schedules immediately the task on another thread and returns immediately. The promise is then returned by the groovy method. When the blocking call is complete, the result of the operation is then handled to the promise. So a typical usage of the promise is:

return runBlockingCall().thenAsync { value ->
    // do something with value
}.thenAsync {
    return next.handle(context.request)
}

So now let’s see what happens behind the stage. Identity Gateway handles incoming requests on Event Loop threads which are a Vert.x thing. By default, Identity Gateway will end up with 2 times the number of processor cores as the number of those threads. So with a 4 cores host, 8 event loop threads are created.

So with the route configuration shown above, the script runs in one of the event loop thread - and blocking it, indeed, blocks the entire Identity Gateway server (if that route is invoked 8 times simultaneously for example).

The interesting part is that the blocking call is running on an executor thread, and since the promise completion is performed on that thread, every subsequent execution stays on this thread, even with the next filters in the chain, until the next I/O is performed, in this case by the ReverseProxyHandler - and then the execution switches yet to another event loop thread.

Demonstration

To demonstrate this use case, we’re going to use response.entity#json as the blocking call. By default, streaming is not enabled, and therefore, the response is available once the entire payload has been received. But when streaming is enabled, the response is made available once response headers have been received, but payload has not been completely received. Calling #json is going to block the thread, so the script needs to call #jsonAsync instead:

.thenAsync { response ->
         return response.entity.jsonAsync.then { json ->
                 // Process the payload
         }
}

So to enable streaming:

in admin.json:

{
    "mode": "DEVELOPMENT",
    "streamingEnabled": true
  }

Also, the route for this test is (notice the second scriptable filter in the chain):

{
  "name": "105-test",
  "condition": "${find(request.uri.path, '^/api/.+/?')}",
  "baseURI": "https://dog.ceo",
  "handler": {
      "type" : "Chain",
      "config" : {
        "filters" : [
          {
            "type": "ScriptableFilter",
            "config": {
              "type": "application/x-groovy",
              "file": "getMemes.groovy",
              "args": {
                "executor" : "${heap['ScheduledExecutorService-1']}"
              }
            }
          },
          {
            "type": "ScriptableFilter",
            "config": {
              "type": "application/x-groovy",
              "file": "logThread.groovy"
            }
          }
        ],
        "handler" : "ReverseProxyHandler"
      }
  }
}

Now lets try this code:

import org.forgerock.util.promise.PromiseImpl

logger.info("THREAD LOG | Global Scope")

def sendRequest() {
    logger.info("THREAD LOG - getMemes  | sendRequest")
    def myRequest = new Request()
    myRequest.method = "GET"
    myRequest.uri = "https://api.imgflip.com/get_memes"
    return http.send(myRequest)
    .thenAsync { response ->
        logger.info("THREAD LOG - getMemes  | got response")
        return response.entity.json
    }
}

def handleNext(json) {
    logger.info("THREAD LOG - getMemes | handleNext")
    next.handle(context, request).thenOnResult {
        logger.info("THREAD LOG - getMemes  | thenOnResult")
    }
}

return sendRequest.thenAsync(this::handleNext)

And issue a request via cURL:

http://localhost:8080/api/breeds/list/all

In the log, this warning is showing:

[vertx-blocked-thread-checker] WARN  i.v.core.impl.BlockedThreadChecker @system - Thread Thread[vert.x-eventloop-thread-15,5,main] has been blocked for 21563 ms, time limit is 2000 ms
[CONTINUED]io.vertx.core.VertxException: Thread blocked
[CONTINUED]	at java.base@11.0.9.1/java.lang.Object.wait(Native Method)

which is not good, but was expected: response.entity.#json is blocking! The route logs shows that it blocks just after receiving the response (nothing is logged after that):

2023-07-24T00:22:09,981Z | INFO  | vert.x-eventloop-thread-15 | o.f.o.f.S.{ScriptableFilter}/handler/config/filters/0 | @105-test | THREAD LOG | Global Scope
2023-07-24T00:22:09,988Z | INFO  | vert.x-eventloop-thread-15 | o.f.o.f.S.{ScriptableFilter}/handler/config/filters/0 | @105-test | THREAD LOG - getMemes  | sendRequest
2023-07-24T00:22:10,087Z | INFO  | vert.x-eventloop-thread-15 | o.f.o.f.S.{ScriptableFilter}/handler/config/filters/0 | @105-test | THREAD LOG - getMemes  | got response

So the best practice in this case is to useresponse.entity#jsonAsync. However, since the topic of this blog is to demonstrate the use of the ScheduledExecutorService, lets’ encapsulate the #json call into an executor Runnable (but this is for demonstration only, don’t do this!):

def sendRequest() {
    logger.info("THREAD LOG - getMemes  | sendRequest")
    def myRequest = new Request()
    myRequest.method = "GET"
    myRequest.uri = "https://api.imgflip.com/get_memes"
    return http.send(myRequest)
    .thenAsync { response ->
        logger.info("THREAD LOG - getMemes  | got response")
        def promise = PromiseImpl.create()
        executor.execute {
            logger.info("THREAD LOG - getMemes | In Executor Thread")
            def json = response.entity.json
            promise.handleResult(json)
        }
        logger.info("THREAD LOG - getMemes | After Executor#execute")
        return promise
    }
}

Let’s see the route traces now:

2023-07-24T00:42:13,563Z | INFO  | vert.x-eventloop-thread-17 | o.f.o.f.S.{ScriptableFilter}/handler/config/filters/0 | @105-test | THREAD LOG | Global Scope
2023-07-24T00:42:13,564Z | INFO  | vert.x-eventloop-thread-17 | o.f.o.f.S.{ScriptableFilter}/handler/config/filters/0 | @105-test | THREAD LOG - getMemes  | sendRequestExecutor
2023-07-24T00:42:13,595Z | INFO  | vert.x-eventloop-thread-17 | o.f.o.f.S.{ScriptableFilter}/handler/config/filters/0 | @105-test | THREAD LOG - getMemes  | got response
2023-07-24T00:42:13,596Z | INFO  | pool-4-thread-1 | o.f.o.f.S.{ScriptableFilter}/handler/config/filters/0 | @105-test | THREAD LOG - getMemes | In Executor Thread
2023-07-24T00:42:13,596Z | INFO  | vert.x-eventloop-thread-17 | o.f.o.f.S.{ScriptableFilter}/handler/config/filters/0 | @105-test | THREAD LOG - getMemes | After Executor#execute
2023-07-24T00:42:13,602Z | INFO  | pool-4-thread-1 | o.f.o.f.S.{ScriptableFilter}/handler/config/filters/0 | @105-test | THREAD LOG - getMemes | handleNext
2023-07-24T00:42:13,603Z | INFO  | pool-4-thread-1 | o.f.o.f.S.{ScriptableFilter}/handler/config/filters/1 | @105-test | THREAD LOG - logThread | Global Scope
2023-07-24T00:42:14,029Z | INFO  | vert.x-eventloop-thread-18 | o.f.o.f.S.{ScriptableFilter}/handler/config/filters/1 | @105-test | THREAD LOG - logThread | thenOnResult
2023-07-24T00:42:14,030Z | INFO  | vert.x-eventloop-thread-18 | o.f.o.f.S.{ScriptableFilter}/handler/config/filters/0 | @105-test | THREAD LOG - getMemes  | thenOnResult

As you can see, the response handling happens in the event loop thread. Then the json processing is handled by a pool thread (from the executor service thread pool). And from then on, all processing takes place in the pool thread, even for the next filter:

2023-07-24T00:42:13,603Z | INFO  | pool-4-thread-1 | o.f.o.f.S.{ScriptableFilter}/handler/config/filters/1 | @105-test | THREAD LOG - logThread | Global Scope

The code for the second scriptable filter is:

logger.info("THREAD LOG - logThread | Global Scope")

return next.handle(context, request).thenOnResult { response ->
    logger.info("THREAD LOG - logThread | thenOnResult")
}

At the next I/O (so once the handler has been executed), the processing switches back to an event loop thread :

2023-07-24T00:42:14,029Z | INFO  | vert.x-eventloop-thread-18 | o.f.o.f.S.{ScriptableFilter}/handler/config/filters/1 | @105-test | THREAD LOG - logThread | thenOnResult
2023-07-24T00:42:14,030Z | INFO  | vert.x-eventloop-thread-18 | o.f.o.f.S.{ScriptableFilter}/handler/config/filters/0 | @105-test | THREAD LOG - getMemes  | thenOnResult

Conclusion

Identity Gateway is already off loading some intensive or blocking tasks to background threads. We have shown that this approach is also made available to scriptable filters, and is most likely useful to interact with a relational database via JDBC for example.

1 Like