Integrating GemFire systems over WAN using REST

GemFire 8 added development and management REST APIs, making it really easy to access, perform queries and functions over data. But what about using RESTful API to make two different GemFire systems exchange information ? Well, with the help of event listeners and Asynchronous Event Queues, you can easily build a very scalable solution to keep different systems in sync.

The overall use case represented here is a system with some points of sale (GemFire nodes) that process transactions on a partitioned GemFire region with redundant copies, but that periodically needs to synchronize information with the back office system, where extra processing and analytics may need to happen.

The back office has a partitioned region that’s going to be exposed through REST and where the PoS (Point of sales) nodes are going to send the transaction data.

A load balancer (Apache httpd) in front of GemFire back office servers will handle failover and load balancing from any PoS client to the back office nodes thanks to native Apache modules that provides different options and SLAs if back end systems become unavailable or unresponsive. This also allows the system on the back-end to dynamically scale.

Overall

Customers can perform transactions at any point of sale of a Store and still recover or undo in a different node within the store, since the region is partitioned and has a redundant copy, even after a node failure it’s still possible to operate on the remaining nodes. The diagram illustrate two nodes on each system, but that could of course be increased to N as required by and if resources are available.

Note: GemFire provides native WAN integration through Gateways which is the recommended solution for most use cases. This particular scenario explores an alternative solution that may suit specific customer needs such as intermittent connections (3G, mobile…) , multicast distributed systems and or need for HTTP/REST endpoints.

In the next sections, there will be a sample implementation of such scenario running on a single node, using Apache HTTP server, JDK 7 and GemFire 8 on top of OSX. The complete source code is available on GitHub.

Back Office

Let’s start by creating the back office application, that will be a couple of shell scripts and some gfsh instructions. The folder structure is the following:

BackOffice/
└── servers
├── backOfficeA
├── backOfficeB
├── conf
│   └── gemfire-bo.properties
├── locator1
├── setEnv.sh
├── startBackOffice.sh
└── stopBackOffice.sh

The gemfire-bo.properties has common settings like statistics, conserve-sockets set to false, but it has also the new start-dev-rest-api property set to true, which enables REST capabilities on GemFire servers.

# gemfire-bo.properties
...
# Multi-cast port number ('0' disables multi-cast) #
mcast-port=0
# Conserve Sockets #
conserve-sockets=false
# enable rest service
start-dev-rest-api=true

You do need to change the setEnv.sh script to match your GemFire installation directory and point to the correct JAVA_HOME.

#!/bin/bash
export GEMFIRE_HOME=/GemFire/installers/gemfire-latest
export CONF_DIR=./conf
export JAVA_HOME=`/usr/libexec/java_home -v1.7`
export PATH=$GEMFIRE_HOME/bin:$PATH
export SERVER_HEAP=4G

Then in the startBackOffice.sh script, start a GemFire locator and two GemFire data nodes.

#!/bin/bash
################################
# Start back office servers
################################
. ./setEnv.sh

# code omitted for brevity ...
echo "Starting locator...";
gfsh start locator --name=locator1 --dir=locator1 --J=-Dgemfire.jmx-manager-port=2099 --port=10333 --bind-address=$SERVER_BIND

echo "Starting back office servers...";

gfsh start server --server-bind-address=$SERVER_BIND --name=backOfficeA --locators=$SERVER_BIND[10333] --properties-file=conf/gemfire-bo.properties --dir=backOfficeA --initial-heap=$SERVER_HEAP --max-heap=$SERVER_HEAP \
--J="-XX:+UseConcMarkSweepGC" --J="-XX:+UseParNewGC" --server-port=50404 --J=-Dgemfire.http-service-port=8081 

gfsh start server --server-bind-address=$SERVER_BIND --name=backOfficeB --locators=$SERVER_BIND[10333] --properties-file=conf/gemfire-bo.properties --dir=backOfficeB --initial-heap=$SERVER_HEAP --max-heap=$SERVER_HEAP \

--J="-XX:+UseConcMarkSweepGC" --J="-XX:+UseParNewGC" --server-port=50405 --J=-Dgemfire.http-service-port=8082

echo "Done."

The important parts of the startup script are: setting different values for the http-service-port, so you can run multiple nodes with REST service enabled in a single machine; and of course –server-port which sets different communication ports for GemFire itself.

One of the few requirements of the GemFire REST services is to enable PDX properties on the cluster, since that’s the base of its implementation.  For more details about PDX serialization please check out this article which covers a lot of details.

In order to enable PDX on a running system you can use gfsh. For example:

(wmarkito@anakin)$ gfsh
    _________________________     __
   / _____/ ______/ ______/ /____/ /
  / /  __/ /___  /_____  / _____  /
 / /__/ / ____/  _____/ / /    / /
/______/_/      /______/_/    /_/    v8.0.0.1
Monitor and Manage GemFire
gfsh>connect --locator=localhost[10333]
...
gfsh>configure pdx --read-serialized=true --disk-store
persistent = true
disk-store = DEFAULT
read-serialized = true
ignore-unread-fields = false

Alternatively that can also be set on cache.xml, but that’s not going to be used in the back office.

The last step is to actually create the region that’s going to be exposed through REST and it doesn’t actually require anything special than usual region creation.  Still connected in gfsh:

gfsh>create region --name=transaction --type=PARTITION_REDUNDANT_PERSISTENT_OVERFLOW --redundant-copies=1
Member    | Status
----------- | ----------------------------------------------
backOfficeB | Region "/transaction" created on "backOfficeB"
backOfficeA | Region "/transaction" created on "backOfficeA"

All these gfsh commands have been bundled in a gfsh script (boSetup.gfsh), available in the example code bundle.

That’s leveraging the region types shortcuts available in gfsh, such as PARTITION_REDUNDANT_PERSISTENT_OVERFLOW. This shortcut means that the region created will be partitioned across multiple nodes, with an additional copy of each entry in memory (redundant), and on disk (persistent). In addition, the “OVERFLOW” part means that if the JVM starts running low on memory, GemFire will start removing region entries from memory, keeping them only on disk. All keys will be kept in memory for faster access.

As part of GemFire REST implementation, it includes Swagger (http://swagger.io) – A powerful RESTFul specification for service discovery, production, consumption and in some sense, an API console. It can be accessed on GemFire through the following URL: http://:/gemfire-api/docs/index.html. The GemFire documentation cover all details about Swagger so readers can check details there.

This concludes the initial back office implementation and now there are two GemFire data nodes with REST enabled.

backOffice

Apache as load balancer

The configuration for Apache HTTP is simple and the location for the configuration files is OS dependent, so since this example is running on OSX that’s the standard file location used here, but change accordingly on Linux or Windows.

Open httpd.conf under /etc/apache2/ and make sure you have the following modules loaded and if not, uncomment these lines to enable them.

LoadModule proxy_module libexec/apache2/mod_proxy.so
LoadModule proxy_connect_module libexec/apache2/mod_proxy_connect.so
LoadModule deflate_module libexec/apache2/mod_deflate.so
LoadModule proxy_balancer_module libexec/apache2/mod_proxy_balancer.so
LoadModule lbmethod_byrequests_module libexec/apache2/mod_lbmethod_byrequests.so
LoadModule lbmethod_bytraffic_module libexec/apache2/mod_lbmethod_bytraffic.so
LoadModule lbmethod_bybusyness_module libexec/apache2/mod_lbmethod_bybusyness.so

Still in httpd.conf add entries to enable requests on /gemfire URL to be proxied and load balanced to the two GemFire servers in the back office.

<Location /gemfire>;
    SetInputFilter DEFLATE
    Order allow,deny
    Allow from all
</Location>

<Proxy balancer://gemfire>
    BalancerMember http://localhost:8081
    BalancerMember http://localhost:8082

    ProxySet lbmethod=byrequests
    SetOutputFilter DEFLATE
    Header set Vary *
    Order deny,allow
    Allow from all
</Proxy>

An avid HTTPD user may have noticed that these settings also enabled mod_deflate for input and output, which means that input and output content will be compressed using Gzip sending less bytes over the network from the PoS to back office and vice versa.

There is no need to implement compression logic on GemFire itself since Apache HTTP Server (“httpd”) will be already dealing with that and “inflating” the content before delivering to GemFire nodes.  During some lab tests it’s proven to reduce the amount of traffic by approximately 7x but that depends on the business object itself and since JSON is a very simple data structure, the CPU cost to compress such objects is very low using standard compression level.

Untitled_1132014-180837_pcap

In order to test the configuration use the sudo apachectl configtest command and if everything is OK restart the server with sudo apachectl restart.

Open a browser and try to access http://localhost/gemfire/  – That should return a list of the current regions in the system. With the help of Google Chrome Developer tools it’s simple to check HTTP headers and double check if content coming from GemFire RESTful services is being already compressed.

localhost_gemfire_

Front Office – Point of Sales

The point of sale (PoS) of this implementation is a simple GemFire node setup in a multicast cluster, so there are no locators, and nodes are communicating over the “store” LAN.  If a new PoS is added to the store, there is no need to change anything in the cluster or if any node fails or even the machine goes bad, it can be replaced without system interruption.

The regions in this system are partitioned redundant, so each node has its own data plus a backup copy of any other node’s data.  This is a very straight forward scenario and more details can be found on GemFire documentation.

The data needs to be delivered to back office for processing and the solution will use GemFire Event Queues, which among other options, can be processed asynchronously, has high-availability (failover + persistence), ordering, conflation and parallel processing.

This pattern is called write-behind where regions receive data and later in time process in a back-end system, store on a database or other types of storage systems, such as HDFS.

PoS – Project structure

The project structure is shown below. It’s a Gradle project with a set of shell scripts to start two instances of the PoS GemFire server running locally and using multicast to form a distributed system.

PoS/
├── build
│   ├── dependency-cache
│   └── docs
├── build.gradle
├── gradle
│   └── wrapper
├── gradle.properties
├── gradlew
├── gradlew.bat
├── servers
│   ├── conf
│   │   ├── gemfire-pos.properties
│   │   └── cache.xml
│   ├── pointOfSaleA
│   ├── pointOfSaleB
│   ├── restart.sh
│   ├── setEnv.sh
│   ├── startPoS.sh
│   ├── stopPoS.sh
├── settings.gradle
└── src
└── main

The startPoS.sh is very similar to the back office version and will be ommited here for brevity. The build.gradle already contains a task for bundling the jar with required libraries in a single file, to it can be deployed to a running GemFire cluster through gfsh deploy command.

Dispatcher

The HttpDispatcher that is part of the solution use an Apache HttpClient for sending the requests, with error handling, timeout settings and retry capabilities. It’s sending data compressed through GZIP and could leverage SSL as well.

//...
try {
            HttpEntity entity = EntityBuilder.create()
                    .setText(payload)
                    .setContentType(ContentType.APPLICATION_JSON)
                    .setContentEncoding("gzip")
                    .gzipCompress()
                    .build();

            httpPut.setEntity(entity);

            int retryCount = 0;
            do {
                try (CloseableHttpResponse putResponse = httpClient.execute(httpPut)) {
                    statusCode = putResponse.getStatusLine().getStatusCode();

                    if (statusCode == HttpStatus.SC_OK) {
                        isPutSuccessful = true;
                    } else {
                        retryCount++;
                        LOGGER.warning(String.format("HTTP request failed (status %s)- Retry in %d ms %d/%d", statusCode, RETRY_INTERVAL, retryCount, MAX_RETRY));

                        if (retryCount < MAX_RETRY) {
                            waitBeforeRetry(RETRY_INTERVAL);
                        }
                    }
                }
            } while ((!isPutSuccessful) && (retryCount < MAX_RETRY));
//...

The dispatcher will retry sending the batch on specified known exceptions and will wait for each retry.

TransactionEventListener

This is the actual implementation of AsyncEventListener and will process the events sending to the dispatcher while extracting the keys from each event in order to use it for HTTP REST service call.

Let’s define the Transaction POJO with some fields to represent the business object:

public class Transaction implements PdxSerializable {

    private String id;
    private long sequenceNumber;
    private String operation;
    private int pointOfSaleId;
    private List&amp;lt;TransactionItem&amp;gt; items;

    @Override
    public void toData(PdxWriter pdxWriter) {
        pdxWriter.markIdentityField("id").markIdentityField("pointOfSaleId")
                .writeString("id", id)
                .writeLong("sequenceNumber", sequenceNumber)
                .writeInt("pointOfSaleId", pointOfSaleId)
                .writeString("operation", operation)
                .writeObject("items", items);
    }

    @Override
    public void fromData(PdxReader pdxReader) {
        id = pdxReader.readString("id");
        sequenceNumber = pdxReader.readLong("sequenceNumber");
        pointOfSaleId = pdxReader.readInt("pointOfSaleId");
        operation = pdxReader.readString("operation");
        items = (List&amp;lt;Transaction.TransactionItem") pdxReader.readObject("items");
    }

    //... getters/setters

It could have used PdxAutoSerialization as well, but for the sake of giving an example there is an implementation of the serialization methods of PdxSerializable.

In order to process the information in the queue you need to implement an AsyncEventListener which will receive events in batch, as specified by batch-size, or after a given period of time (batch-time-interval). This event listener can be attached to multiple regions, but since this has been implemented to work specifically with Transaction objects, there will be a validation in the listener.

//....
 public boolean processEvents(List<AsyncEvent> events) {
        List<PdxInstance> pendingBatch = new ArrayList<>();
        List<String> pendingKeys = new ArrayList<>();

        LOGGER.fine(String.format("Batch size: %d", events.size()));

        try {
            for (AsyncEvent event : events) {
                PdxInstance txEvent = (PdxInstance) event.getDeserializedValue();

                if (isEventValid(txEvent)) {
                    pendingBatch.add(txEvent);
                    // keys need to be converted to string
                    pendingKeys.add(String.valueOf(txEvent.getField(ID_FIELD)));
                } else {
                    // invalid events will be 'just' ignored/dropped
                    LOGGER.severe(String.format("Dropping unrecognized event: %s", txEvent.getClassName()));
                }
            }

            if (pendingBatch.size() > 0) {
                processTransactionBatch(pendingBatch, pendingKeys);
            } else {
                LOGGER.info("No items to process.");
            }

        } catch (Exception ex) {
            // if all fails, send to a temporary error/dead letter region to process later
            processErrorBatch(pendingBatch);
            LOGGER.log(Level.SEVERE, ex.getMessage(), ex);
        }
//        } finally {
//            // clear batch
//            pendingBatch.clear();
//        }

        // always return true to avoid queue loop
        return true;
    }
...

If after processing the events and/or retrying the specified amount of times, we still have a failure on the message batch, the listener has an error handling path which stores the message on an error region that thanks to GemFire –listener-param, can be specified when the queue is created and associated to its listener (ERROR_REGION).

GemFire function for testing

In order to simulate transactions on each PoS for testing, there is a GemFire function named  RandomTxGenerator that when called generate a specified number of random transactions for each PoS that will be dispatched to the Asynchronous Event Queue.

//...
    public void execute(FunctionContext functionContext) {
        int batchSize = Integer.parseInt( ((String[]) functionContext.getArguments())[0] );
        if (batchSize <= 0)
            throw new IllegalArgumentException("Function argument is batch size and should be > 0");

        Region<String, Transaction> region = gemFireCache.getRegion(REGION);
        DistributedMember member = gemFireCache.getDistributedSystem().getDistributedMember();
        LOGGER.info(String.format("##### Running at %s ...", member.getId()));
        Map<String, Transaction> transactionBatch = generateTransactions(batchSize);
        LOGGER.info(String.format("##### Generating %d transaction entries.", transactionBatch.size()));
        region.putAll(transactionBatch);
    }
//...

That’s also going to be part of the application jar.

PoS – Assemble!

Assuming that the PoS Gradle/Java project is already built and you have started the servers, here is the gfsh script that connects to a running node and assemble everything, performing the following actions:

  • Deploy the PoS-1.0.jar containing a listener, the dispatcher and test function classes;
  • Create an asynchronous event queue using the provided listener and settings;
  • Create a transaction region associated with the event queue;
  • Create the error region used when requests fails;
#### POS GFSH script
# connect
connect --jmx-manager=localhost[1099]

# JAR location (change location properly)
deploy --jar=/tmp/samples/PoS/build/libs/PoS.jar

# Create Async
create async-event-queue --id="transactionQueue" --listener=io.pivotal.example.listener.TransactionEventListener --persistent --parallel --enable-batch-conflation --batch-size=30 --batch-time-interval=20000 --listener-param=ENDPOINT#http://localhost/gemfire/transaction/,ERROR_REGION#transaction_error,ID_FIELD#id

# Create region
create region --name=transaction --type=PARTITION_PERSISTENT_OVERFLOW --async-event-queue-id=transactionQueue --redundant-copies=1 

## error region
create region --name=transaction_error --type=PARTITION_PERSISTENT

Check the script on posSetup.gfsh file provided and execute using gfsh run command and make sure you have started the PoS. If needed, you can tweak the --dispatcher-threads property and add more threads for processing. Note that this will affect ordering and will generate more HTTP calls of course.

The expected output is the following:

gfsh run --file=posSetup.gfsh
1. Executing - connect --jmx-manager=localhost[1099]

Connecting to Manager at [host=localhost, port=1099] ..
Successfully connected to: [host=localhost, port=1099]

2. Executing - deploy --jar=/tmp/samples/PoS/build/libs/PoS-1.0.jar

   Member    | Deployed JAR | Deployed JAR Location
------------ | ------------ | -------------------------------------------------------------------------------------------
pointOfSaleA | PoS-1.0.jar  | /tmp/samples/PoS/servers/pointOfSaleA/vf.gf#PoS-1.0.jar#1
pointOfSaleB | PoS-1.0.jar  | /tmp/samples/PoS/servers/pointOfSaleB/vf.gf#PoS-1.0.jar#1

3. Executing - create async-event-queue --id="transactionQueue" --listener=io.pivotal.example.listener.TransactionEventListener --persistent --parallel --enable-batch-conflation --batch-size=30 --batch-time-interval=20000 --listener-param=ENDPOINT#http://localhost/gemfire/transaction/,ERROR_REGION#transaction_error,ID_FIELD#id

   Member    | Result
------------ | -------
pointOfSaleA | Success
pointOfSaleB | Success

4. Executing - create region --name=transaction --type=PARTITION_PERSISTENT --async-event-queue-id=transactionQueue --redundant-copies=1

   Member    | Status
------------ | -----------------------------------------------
pointOfSaleB | Region "/transaction" created on "pointOfSaleB"
pointOfSaleA | Region "/transaction" created on "pointOfSaleA"

5. Executing - create region --name=transaction_error --type=PARTITION_PERSISTENT_OVERFLOW

   Member    | Status
------------ | -----------------------------------------------------
pointOfSaleA | Region "/transaction_error" created on "pointOfSaleA"
pointOfSaleB | Region "/transaction_error" created on "pointOfSaleB"

Observe that each PoS has it’s own queue and regions, containing it’s own data and a redundant copy.  You may increase the number of copies in order to increase high availability.

pos

Note that all data is in memory and persisted to disk to prevent data loss in case of failure, including the queue data, and it’s also being replicated to the other(s) nodes in the store.

Testing

Now you can test the system by calling the GemFire function that generates random transactions. There is another gfsh script provided on the PoS that can be used for testing named testFunction.gfsh – Execute the script through the gfsh run command. You can see that it’s basically calling the function passing the number of transactions to be generated:

execute function --id=RandomTxGenerator --arguments=100

Now connect through gfsh on both systems and check the number of entries:

On the Point of Sale system:

gfsh>connect --jmx-manager=localhost[1099]
gfsh>show metrics --region=/transaction --categories=cluster

Cluster-wide Region Metrics

Category |       Metric       | Value
-------- | ------------------ | -------
cluster  | member count       | 2
         | region entry count | 1009800

On the Back Office system:

gfsh>connect --locator=localhost[10333]
gfsh>show metrics --region=/transaction --categories=cluster

Cluster-wide Region Metrics

Category |       Metric       | Value
-------- | ------------------ | -------
cluster  | member count       | 2
         | region entry count | 1009800

Depending on the number of threads, entries and batch size it may take a few seconds for data to available on the back office. Check the logs on the PoS nodes for more details and feel free to tweak the system and play with fail over capabilities.

Note: The number of entries here reflects the result of a bigger test that I’ve running.

Possible enhancements

There are some obvious enhancements to the proposed solution, such as:

  • Enable SSL on the HttpClient to requests are secure and do the SSL termination on Apache Http Server;
  • Create a GemFire function to reprocess messages pending in the error region leveraging the same Asynchronous Event Queue;
  • Implement a CacheWriter in the Back Office region, so it can validate the events and check for possible duplicates;
  • Leverage Tomcat plugin for Apache HTTP for better load balancing monitoring and fail over

Conclusion

GemFire REST API offers a powerful and flexible alternative for interacting with data on a GemFire system and with its already proven internal queueing system it is possible to create scalable and reliable solutions leveraging HTTP protocol for a stateless and horizontally scalable alternative for systems integration.

This article illustrated a real world scenario using many GemFire features, such as system administration through gfsh, queues, listeners, functions, PDX serialization, persistence and overflow,  all part of the same solution.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s