Identity Gateway 7.1 Highway to Async Programming

By Patrick Diligent and Laurent Vaills

Introduction

In the previous paper, Async Programming 101, we examined the shift from synchronous programming to asynchronous, and highlighted an approach to convert your legacy code to this paradigm. Though adopting an asynchronous programming idiom feels unnatural and leads to challenges, the ultimate reward is dramatically improved performance. In this blog, we are going to explore further advanced constructs that were not tackled in the previous blog, but are commonplace, and not so obvious to solve with the asynchronous design pattern.
This post also examines a new feature in 7.1 - Asynchronous JSON access -
which improves even further Identity Gateway performances.

Pre-requisites: reading Async Programming 101 is essential before proceeding further.

The use case

Identity Gateway is a powerful and versatile tool that allows the ForgeRock platform functionality to integrate smoothly with the organization's ecosystem. One that comes to mind is the Open Banking domain which requires interfacing with different third parties, Identity Management (IDM) which holds the knowledge about the agreement between users and third parties within the financial institution - and Access Management (AM), whose role is to authenticate users and authorize third party financial services to act on users behalf with their consent. In the middle, Identity Gateway enriches the user and application interactions with the identity platform, validating requests and consolidating the information by interacting with both IDM and AM. Since most of the implementation is accomplished with scripting, the best performances are achieved with Identity Gateway 7.1, in a standalone deployment, and therefore, with Asynchronous Programming.

To implement this use case, here are the basic idioms discussed in this paper:

  • Asynchronous Responses (entity#jsonAsync)
  • for loops (Promises#join, Promises#when, or recursivity)
  • Promise Synchronisation - e.g., thread safe access to globals for example.

Asynchronous Responses

ScriptableFilter and ScriptableHandler scripts have in their scope the default HTTP ClientHandler - the http object. Note that it is possible to provide your own instance of ClientHandler by using the clientHandler config setting of a Scriptable component. The sample below demonstrates a barebone code that performs an asynchronous call:

def myRequest = new Request()
...
return http.send(myRequest)

.then { response ->
      // Process the response and then return the response
      return response
}
.thenAsync { response ->
      // process the response
      def obj = response.entity.json
...
      // Pass control along the chain
      return next.handle(context, request)
}

At the point of return http.send, the request is immediately sent, while control returns to the caller (e.g., the IGā€™s event loop), and the closure is executed once the entire response is received. While the closure awaits to be executed, IG can process in parallel other requests, taking advantage of the I/O waits.

A further performance improvement can be made with the IG streaming feature. You need to add in admin.json ā€œstreamingEnabledā€: true to enable it. This will cause the closure to be activated once the response headers have been received, then you need to access the body asynchronously with the entity.jsonAsync promise :

def myRequest = new Request()
...
return http.send(myRequest)

.thenAsync { response ->
      // Process the response and then return the response
      return response.entity.jsonAsync.then { json -> return json }
}
.thenAsync { json ->
// Do something with json
...
// Pass control along the chain
return next.handle(context, request)
}

When this feature is enabled, entity.json canā€™t be used anymore, entity.jsonAsync must always be called to get the response body in an asynchronous manner. Note also that this impacts runtime expressions - that is #{expression} must be used to access properties in the entity, and not anymore ${expression}. See more details in the documentation: Runtime Expressions and Streaming Configuration.

This design pattern is highly recommended with IG 7.1 in a standalone deployment. To get convinced, please do the following:

  • Run performance tests with Response#entity#json, then enable the streaming feature, update the script to use Response#entity#jsonAsync, then run the test again. You will be surprised with the results.

Asynchronous loops

Here is a common use case, as it happens often with IDM, first accessing an IDM resource, which is linked to many resources via relationships. You could actually perform a query with the appropriate _fields parameter to expand the referenced objects, but let's say that the response is potentially very large, asking the whole lot may drain resources. So the strategy is to just fetch the references, and then for each, obtain the full related object with a separate request. Letā€™s say that the managed resource MO1 is related to MO2 in a 1-n relationship. To obtain all relations of one MO1 resource to MO2 resources, one can invoke:

GET managed/MO1/<mo1_id>/MO2/?_queryFilter=true&_fields=_ref

Which returns something like this:

{
   "result": [
       {
           "_id": "d0dbb21e-a0fa-4cac-9db3-0a19b0657342",
           "_rev": "0000000005cdb54c",
           "_ref": "managed/mo2/1e9a6758-897e-559f-a19e-56627d2b09cc"
       },
       {
           "_id": "494fecea-fded-4355-a997-65f8204a239a",
"_rev": "000000006ab0b52c",
          "_ref": "managed/mo2/b84d0a6d-efbd-5241-9496-68058c3e79ac"
      }
  ],
  "resultCount": 2,
  "pagedResultsCookie": null,
  "totalPagedResultsPolicy": "NONE",
  "totalPagedResults": -1,
  "remainingPagedResults": -1
}

With a Synchronous idiom, you would simply write this code:

response.entity.json.result.each { obj ->
def idmRequest = new Request()
request.uri = idmUrl + "/openidm/" + obj._ref
request.uri.query="_queryFilter=true&_fields=_ref"
def response = http.send(idmRequest).get()
// Process the response body
}

Now pause a while, and think how you would transform this code into using an asynchronous design idiom? Thatā€™s right, itā€™s not that obvious, since the code needs to relinquish control while the response awaits and is received, and somehow that iteration needs to resume - but how is this possible when the response is processed on another closure? If you knew in advance that there are only three references all the time, then you would chain three promises in a row, but here, that number is unpredictable.

The answer is: it depends. The building blocks we are going to talk about are the join and when promises, and recursing through promises.

Using join and when

The join and when functions take a list of promises as arguments, and return a promise that completes when all the promises complete with a result, or when the first one fails (e.g throws an exception). In this case, this is not really implementing a loop, e.g executing one call after the other, but rather initiating all the calls at once. This method is more appropriate for a moderate number of dependencies to retrieve. Otherwise, the alternative is to perform one (or few) requests at a time, one after the other, and this is achieved with recursion which we talk about later.

Letā€™s start with a join example; the javadoc says:

Returns a Promise which will be completed once all of the provided promises have succeeded, or as soon as one of them fails.

On successful completion, an immutable Promises.Results instance containing all respective results of the joined promise (in the same order) is provided.

The code is as follows:

import static org.forgerock.util.promise.Promises.*

def ids = [ "test1", "test2", "test3", "test4" ] //-1

def getMOWithID(id) {
   idmRequest = new Request()
   idmRequest.method = "GET"
   idmRequest.uri = "${idmBaseUrl}/openidm/managed/mo/${id}"
   return http.send(idmRequest).thenAsync { response ->
       return response.entity.jsonAsync //-4
   }
}

def getMOs() {
    size = ids.size()
    return join(ids.collect(this.&getMOWithID)).then { results -> //-2
        return (0..<size).collect { index -> results.get(index) }  //-5
    }
}

return getMOs.then { mos -> //-3
   attributes.mos = mos
   return next.handle(context, request)
}


The first step is to turn the list of ids (line marked with the comment
//-1) into a list of promises which are given to the Promises#join call (//-2); for this the list#collect method is used. Each promise in the list (//-3) is built from the getMOWithID function, which returns a promise that completes on receiving the response (//-4). Notice here the use of jsonAsync. Once all promises are completed, all responses are collected and aggregated, using the list#collect method, and the Promises.Results#get method (//-5). The entry point in the script is at line (//-0) which returns the promise computed in getMOs, and which on completion sets the result in the attributes context, and then hand over to the next filter in the chain.

The Promises#when method has a slight difference with Promises#join, in that it returns a promise which on successful completion returns the aggregated results from the completed promises instead of one Promises.Results value. This simplifies greatly the processing of the final result:

return when(ids.collect(this.&getMOWithID)).then { mos -> .... }


A good reason for accessing the related resources might be answering a question like this:
does the resource link to any ā€˜inactiveā€™ object? A natural way in implementing this is to analyze the result, letā€™s say, looking for any object in the ā€˜INACTIVE statusā€™, by chaining yet another promise. The sample calls the function isNotActive which returns a boolean, but is not left out here, as you can easily infer that itā€™s testing the mo content:

return when(ids.collect(this.&getMOWithID)).then { mos ->        
    mos.find(this.&isNotActive) 
}

Notice that in this case we really donā€™t need to preserve the result of each promise, we just want to detect the condition as soon as possible, and ideally stop the processing once the decision is made. So an alternative way is taking advantage of the failure behavior, which completes the Promises#when or Promises#join when one of the promises fails, regardless of whether the other promises are completed or not. Here, we can make the promise fail when the condition is met:

return when(ids.collect { id ->
   return getMOWithID(id).then { mo ->
       if (isInactive(mo)) {
           throw new Exception("MO is INACTIVE")
       }
   }
}).then {
   attributes.status = "No Inactive MO found"
}.thenCatch { e ->
   attributes.status = "One Inactive MO found"
}

Using recursivity

When invoking several requests at once is not possible without straining the server, we must resort to iterating the list one by one, for each item fetching the resource, and stop processing once the condition is met. To solve this problem with an Asynchronous idiom, we need recursive programming.

def searchForActiveMo(ids) {
   if (ids.isEmpty()) {
      return newResultPromise([:]) //-4
   }
   def id = ids.pop() //-1
   return getMOWithID(id).thenAsync { mo -> //-2
           return isInactive(mo) 
                      ? newResultPromise(mo) 
                      : searchForActiveMo(ids) //-3
   }
}

return searchForActiveMo(ids).thenAsync { mo ->
   attributes.status = (mo ? "Inactive MO found" : "Inactive MO not found")
   return next.handle(context, request)
}

The idea here is to chain the promises with recursivity. The first promise (//-1, //-2) is submitted, then if the result is negative, the next promise is submitted by calling recursively searchForActiveMo, otherwise it returns the (non null) mo via a result promise (//-3) which ends the chain. When the list is exhausted without meeting the condition, the chain is ended with a result promise holding an empty value (//-4).

Promise synchronisation

A ā€˜simpleā€™ use case is accessing the AM admin API, which requires authentication. So the first idea is to obtain the session token within the script, before calling the AM config endpoint. This is a bad idea, because each route invocation will incur an authentication transaction with AM; optimally, the session token should be shared among all route invocations. So letā€™s save it in the globals map. But again, a bad idea! Due to race conditions, especially under high traffic, this still causes several authentication requests to be made against AM until the next route invocation can retrieve a non-null token.

In addition, the token is going to expire, and therefore, this condition needs to be detected, and in which case the token should be refreshed - again in a coordinated fashion.

Note that we canā€™t implement this use case by using the out of the box SingleSignOnFilter, as it only works for browser requests.

A good habit with IG configuration is to abstract connection requirements into the client handler, and in particular, here, the requirement is to inject the session token into the request that is performed by the script (e.g http.send ). ScriptableFilter and ScriptableHandler scripts have in scope the default ClientHandler (ForgeRockClientHandler) as the variable http , but you can configure another one with the ā€œclientHandlerā€ property, like this :

{
           "name" : "PatchAMClientResponseFilter",
           "type" : "ScriptableFilter",
           "config" : {
               "type" : "application/x-groovy",
               "file" : "patchOAuth2Client.groovy",
               "clientHandler" : "AMClientHandler",
               "args" : {
                   "amBaseURL" : "${amURL}"
               }
           }

}

So letā€™s define AMClientHandler in config.json:

{
     "name" : "AMClientHandler",
     "type" : "Chain",
     "config" : {
       "filters" : [ "AMSSOFilter" ],
       "handler" : "ForgeRockClientHandler"
     }
}

And the associated Scriptable Filter:

{
     "name" : "AMSSOFilter",
     "type" : "ScriptableFilter",
     "config" : {
       "type" : "application/x-groovy",
       "file" : "AMAuth.groovy",
       "args" : {"amBaseURL" : "${amURL}",
         "amPassword" : "&{am.passwords.amadmin.clear}"
       },
       "clientHander" : "ForgeRockClientHandler"
     }
   }

All that remains now is to write the script:

import java.util.concurrent.atomic.AtomicReference
import org.forgerock.util.promise.PromiseImpl
import static org.forgerock.http.handler.Handlers.chainOf
import static org.forgerock.http.filter.Filters.requestCopyFilter

def authenticate() {
   def amRequest = new Request()
   amRequest.uri = "${amBaseURL}/am/json/realms/root/authenticate"
   amRequest.entity.json = [:]
   pwdHeader = new GenericHeader("X-OpenAM-Password", amPassword)
   userHeader = new GenericHeader("X-OpenAM-Username", "amadmin")
   amRequest.putHeaders(pwdHeader, userHeader)
   amRequest.method = "POST"
   return http.send(amRequest).thenAsync { response ->
       return response.entity.jsonAsync.then { json -> json.tokenId }
   }
}

def refreshToken(previousPromise) {
   def newPromise = PromiseImpl.create();
   if (globals.amadminSsoToken.compareAndSet(previousPromise, newPromise)) {
       // We set a new Promise which will be completed when the fetch of a new token will be completed
       authenticate().thenOnCompletion(newPromise) //-4
   }
   // We didn't set a new Promise, let's return the current one
   return globals.amadminSsoToken.get(); //-5
}

if (!globals.amadminSsoToken) { //-2
   globals.amadminSsoToken = new AtomicReference();
}
// Let's handle the very first call by passing null: that should trigger only one call to authenticate() until the token is expired.
def currentPromise = refreshToken(null) //-6
return currentPromise.thenAsync { tokenId ->
   request.putHeaders(new GenericHeader("iPlanetDirectoryPro", tokenId))
   def salvagedFilter = requestCopyFilter()
   return chainOf(next, salvagedFilter).handle(context, request).thenAsync { response ->
       if (response.status.code >= 300) { //-7
           // Retry with a new token
           return refreshToken(currentPromise).thenAsync { _tokenId -> //-1
               request.putHeaders(new GenericHeader("iPlanetDirectoryPro", _tokenId)) //-8
               return next.handle(context, request)
           }
       } else {
          return newResultPromise(response)
       }
   }
}

Caution: Using the ā€˜amadminā€™ user is not recommended in production, instead configure a user that has the minimum required permissions to fulfill its task. Also, ForgeRock recommends to change the iPlanetDirectoryPro cookie name as a security hardening measure. This can be configured with a configuration property, or inquired through the AM /serverinfo endpoint, in which case the cookie name has to be fetched and cached at first invocation of the script using the same synchronisation method.

There is a lot to take in that script, so let us explain it step by step.

The main principle in this code is to use an AtomicReference value to ensure coordination between the threads in updating the token value, using a ā€œcompare-and-swapā€ method implemented by AtomicReference#compareAndSet. The atomic reference value is the promise that completes when the session token has been fetched. The first script invocation to detect an invalid, or absent, session token, is the one that sets the promise, while those coming later will resume on the completion of that promise.

The token refresh is acquired in the previous code block at (//-6) and refreshed at (//-1). This task is performed by the refreshToken function.

Notice that the atomic reference is set on the first script invocation (//-2). The function creates a new promise, and attempts to set the atomic reference. The AtomicReference#compareAndSet method (//-3) takes the first argument the value that is expected to be current and the new value to be set - if another concurrent execution changed it in the meantime, then there is a mismatch and the call does not perform the update and returns a false value.
So if setting the new promise succeeds, then the authentication is initiated with the
authenticate method (//-4) - from the refreshToken function - and which on completion completes the promise referred by the atomic reference. Otherwise, the authentication is skipped, and the new promise that completes when the authentication succeeds is returned (//-5). Once the session token is obtained, it is injected as a header into the request (//-8).

There are three cases in this logic:

  • The first call after IG has initialized, the atomic reference is null. That is why the script attempts to refresh the token assuming the token is absent (//-6). Since this is the first ever call, that token is really absent, and authentication is happening.
  • For subsequent script invocations, since the authentication has already been performed, the refreshToken function returns the promise which is already completed (or about to complete), and which holds the session token as value.
  • The session token will eventually expire. Therefore the request will fail (//-7). The code then attempts to refresh the token assuming currentPromise is the promise currently stored in the atomic reference, which is true if this is the first invocation that gets an authentication error. But if another concurrent invocation initiated the authentication in the meantime, then it gets the new promise initiated by this other invocation, and execution resumes when it completes.

The last point is how the request is sent after injecting the header:

def salvagedFilter = requestCopyFilter()
   return chainOf(next, salvagedFilter).handle(context, request).thenAsync { response ->


This consists in creating a new filter with the current request and creating a new chain by appending this new filter, and submitting it with the
handle call. This has to be done, because the request body can only be consumed once. This is not needed for a GET call, but definitely is for POST or PUT. Then if the request fails (because the token has expired), after obtaining a new token, the original request can be played.

So finally, the actual script that performs requests to the AM rest API,
e.g., patchOAuth2Client.groovy as defined in the filter:

{
    "name" : "PatchAMClientResponseFilter",
    "type" : "ScriptableFilter",
    "config" : {
        "type" : "application/x-groovy",
        "file" : "patchOAuth2Client.groovy",
        "clientHandler" : "AMClientHandler",
        "args" : {
            "amBaseURL" : "${amURL}"
         }
}

You can directly invoke the http object (which is the AMClientHandler) without worrying about obtaining the session token, it is done transparently:

def getOAuth2Client(clientId) {
   amRequest = new Request()
   amRequest.uri = "${amBaseURL}/am/json/realms/root/realm-config/agents/OAuth2Client/${clientId}"
   amRequest.method = "GET"
   return http.send(amRequest).thenAsync { response ->
       return response.entity.jsonAsync
   }
}

To verify that the logic is performing as expected, add a debug log statement in the authenticate method, reduce the AM session expiry to letā€™s say 2 minutes - then exercise the IG route with multiple, concurrent requests. You should see one - and only one - debug trace in the log every 2 minutes.


A last subtle point: since AMClientFilter and its associated scriptable filter AMSSOFilter are defined in config.json, the same session token is shared by all routes that define a scriptable filter using the AMClientHandler. If you want it to be only shared by a single route, then place the definitions in the route heap.

Conclusion

This completes the subject on Asynchronous Programming with IG, and should give you the basic building blocks to build your solution. Most of the time, with straightforward use cases, you should be able to use the readily available filters without resorting to scripting. But for cases where writing a scriptable filter is the only way, then this has to be done the proper way to obtain optimum performances. This paper has stretched very far the use of asynchronous programming - especially the last use case. Going beyond that will resort to writing a JAVA extension to gain full control, but that is entirely another topic.

Other articles by this author

1 Like