Aggregation

Last modified by Danniar Firdausy on 2024/09/19 10:20

In this microlearning, we will introduce the relevant stateful components to configure the Aggregation operation in the context of State Generation. In case you want to learn the basics of the State Generation functionality, please check out eMagiz State Generation.

Should you have any questions, please get in touch with academy@emagiz.com.

1. Prerequisites

  • Basic knowledge of the eMagiz platform
  • Basic knowledge of the eMagiz State Generation

2. Key concepts

In this microlearning session, we will introduce the relevant stateful components needed to configure the Aggregation operation within the context of State Generation functionality. 

  • By Aggregation, we mean an operation in which multiple messages are received, stored, and grouped into a single message, which is then released either after a certain number of messages have been collected or after a specified wait time has elapsed. 

There are some check points to think about beforehand when setting up the Aggregation operation:

  • What is the incoming message that will be stored and aggregated into a single message?
  • How many messages need to be stored before they are grouped into a single message and released?
  • How long will the grouped messages wait before being released if they do not reach the specified amount for release?

3. Setting up Aggregation operation

To configure the Aggregation operation, you first need to set up the aggregator component to group the incoming messages. Next to that, if state persistence during runtime shutdowns or restarts is a requirement, then you can also set up the storage to maintain the states.

3.1 Aggregate Messages

First of all, you can start with setting up the component that is responsible for aggregating incoming messages and then release them when a certain number of messages have been collected or after a specified wait time has elapsed. eMagiz provides a flow component called the Standard Aggregator that is listed under the "Aggregator" category. 

Here, you will store groups of messages under their correlation key until they reached the specified amount of messages or timed out. To manage this storage, you have the option to select a Message Store support object if you have created one before, or leave it empty, which will default to an in-memory store that may result in data loss upon runtime shutdowns or restarts. More on this in the later section. Next to that, the Standard Aggregator offers the option to specify whether the released output message will be in JSON, XML, or Raw (see examples below).

intermediate-state-generation-aggregation-standard-aggregator-basic.png

Output types:

  • JSON
    • This requires you to specify the "Item object name". This means that if you set it to "item" and you have these JSON objects {"id":1,"name":"John"} and {"id":2,"name":"Doe"}, then you will get a batch of {"item":[{"id":1,"name":"John"},{"id":2,"name":"Doe"}]}
  • XML
    • This requires you to specify the "Item object name" and the "Root object name". This means that if you set the "Item object name" to "element" and the "Root object name" to "root" and you have these XML elements <item><id>1</id><name>John</name></item> and <item><id>2</id><name>Doe</name></item>, then you will get a batch of <root><element><id>1</id><name>John</name></element><element><id>2</id><name>Doe</name></element></root>
  • Raw
    • This does not require you to specify anything. This means that if you have the JSON objects "Lorem ipsum" and "dolor sit amet", then you will get a batch of ["Lorem ipsum","dolor sit amet"]

3.1.1 Correlation Strategy

Additionally, there are other settings that you can configure in the Advanced tab. First of all, the Correlation strategy expression defines, by means of SpEL expression, how to determine the correlation key in which the incoming messages will be grouped together. The example below tells that the Aggregator component will group messages together with the same value of testbus_messageType header.

After a batch of messages with that correlation key is released, you can specify whether subsequent messages with the same correlation key will be redirected to the discard channel using the Expire Groups Upon Completion setting. If set to Yes, as shown in the example below, subsequent messages with the same correlation key will form a new message group and follow the standard completion and expiry rules. Similarly, the "Expire Groups Upon Timeout" setting determines whether subsequent messages with the same correlation key should be redirected to the discard channel or form a new group that follows the defined completion and expiry rules, after the initial group of messages has passed the defined Group timeout.

intermediate-state-generation-aggregation-standard-aggregator-advanced.png

3.1.2 Release Strategy

Furthermore, with the Release Strategy Expression, you can determine the number of messages that need to be collected before the grouped message is released. Once defined, you can use either the Group Timeout setting for a static value in milliseconds or the Group Timeout Setting Expression for a dynamic value based on the environment. This determines the timeout for when the message group will be forcibly released if the incoming message does not release the group.

An example scenario for these settings is as follows. You have the Release Strategy Expression set to size() >= 10 and your Group Timeout set to 3600000 ms (1 hour). Then:

  • When an incoming message turns the size of the message group into 8, the Aggregator component will wait for an hour. 
  • If under that 1 hour no more messages arrive, then the component will release the group. 
  • Otherwise, if the Aggregator component receives another incoming message within that 1 hour, then the timeout counter will reset and wait for another hour.

Once you have done so and satisfied with your setup, you can save and close this configuration.

3.2 Storage Mechanism for State Persistance

As discussed above, in the case that you require persistence to your state (storing to disk instead of in-memory), then you need to link a Message store to your Standard Aggregator component. Therefore, you will need to set up these support objects as well if you have not done so already:

  • Infinispan cache manager
  • Infinispan message store

Once you have done so, you can set the "Simple cache" option in your message store to "no" and then set the "Persistent" option to "yes". For more information on configuring these support objects and understanding their settings, please refer to this State Persistence microlearning.

4. Key takeaways

  • With State Generation - Aggregation operation, you can receive, store, and group incoming messages into a single message that will be released after a certain number of messages have been collected or after a specified wait time has elapsed.
  • To set up a State Generation - Aggregation operation, you need a Standard Aggregator component to group the incoming messages and release the group based on the defined rules 
  • In the case that you need to persist the old message group in the disk, you need to set up an Infinispan Message Store and its Infinispan Cache Manager and then link the store to your Standard Aggregator. 
  • To minimize the hurdles and speed up the process of setting up a State Generation - Aggregation operation, we recommend using a store item that is available here and here.

5. Suggested additional readings

If you are interested in this topic and want more information on it, please read the help text provided by eMagiz and read the following microlearning on the related topic: