GemFire functions with Java 8, Nashorn and Groovy

GemFire functions offers a powerful and flexible way to send distributed work to multiple servers, where this work can be data-dependent as smart units of work that act in parallel on a given region or parallel on all available members of the system. This work can even be filtered to only work on a set of keys or only a sub-set of specified members, which can be really convenient according to the use case being implemented. For example:

✓ If you have some kind of external resource provisioning or initialization of a third-party services (as a Linux service for example) you may implement that wrapped into a GemFire function and distribute command on a sub-set of members.
✓ If you have a partitioned data set that you require to perform an aggregation or any kind of data processing, you can implement a data-dependent GemFire function.

These functions can leverage high availability (HA) features such when a failure occurs GemFire will automatically retry, as set on com.gemstone.gemfire.cache.client.Pool.retryAttempts, can fail-over to a different node, can collect partial execution results on the clients and many other options.

Function execution scheme

Another important feature of GemFire functions is the remote deployment capability that can be performed through gfsh -deploy command, specifying the JAR file to be deployed containing the function code and the target, a single server, multiple servers (groups) or an entire cluster. This is a powerful solution that allows developers to add new functionalities dynamically to an already running system that may have hundreds nodes in convenient way. On this post I’m going to create some functions that will be deployed to multiple servers (JVMs) and executed remotely from a client application. Getting a ride on the JDK8 train I’m going to leverage JDK 8 Nashorn and implement a GemFire function using JavaScript.

JDK 8 + Nashorn

JDK 8 is finally out (March/18/2014) after a late train, secure train and holding the train, the train arrived. Lambdas are of course one of the hottest things in town, but another great feature of JDK 8 is Nashorn. Nashorn is a JavaScript engine created by Oracle, just like V8 (https://code.google.com/p/v8/) from Google, but running directly on the JVM. The project was public announced 2011, becoming open source on 2012, as part of OpenJDK. Since JDK 6+ there is a built-in JavaScript engine in the Oracle JVM, based on Rhino (Mozilla) developed as part of JSR 223. All these efforts are related to the Da Vinci Machine (JSR 292) which aims to make the JVM support dynamic languages (invokedynamic) such as Groovy, Jython, JRuby. The greatest advantages of Nashorn are focus on speed and usage of newer technologies and specs that was not available on the JVM when Rhino was created.

Requirements:

  • JDK 8 (for Nashorn function)
  • Pivotal GemFire 7.+
  • Any IDE with JDK 8 support (IntelliJ, NetBeans or Eclipse)
  • Gradle (optional)

DISCLAIMER: JDK 8 is not officially supported on GemFire 7 yet and this article is only an experiment of what’s possible to implement.

If you do not want to follow the step-by-step or are already familiar with GemFire functions just clone the git repository, build and play with code.

git clone https://github.com/markito/gemfire-functions-sample/

Once you have cloned the project you should have the following structure:

├── build
│   ├── classes
│   ├── dependency-cache
│   ├── libs
│   ├── resources
│   └── tmp
├── build.gradle
├── gradle
│   └── wrapper
├── gradle.properties
├── gradlew
├── gradlew.bat
├── out
│   └── production
├── servers
│   ├── cache.xml
│   ├── locator1
│   ├── server1
│   ├── server2
│   ├── setEnv.sh
│   ├── startServers.sh
│   └── stopServers.sh
├── settings.gradle
└── src
└── main

Edit gradle.properties and modify the gemfireHome variable to point to your GemFire installation directory.

gemfireHome=/opt/gemfire/install/Pivotal_GemFire_702_b45797

Run the build command. This may take some time in the first execution since it’s going to download the dependencies.

./gradlew build

Now let’s move into the servers folder and update the setEnv.sh script in order to set the GEMFIRE_HOME variable to your current GemFire installation. In my machine this script looks like the following:

export GEMFIRE_HOME=/opt/gemfire/install/Pivotal_GemFire_702_b45797/

Now you can manage the servers using the shell scripts provided in this folder (startServers.sh/stopServers.sh) which will start one locator and two GemFire servers (data nodes). Start the servers.

$./startServers.sh

The cache.xml file has a single partitioned region named exampleRegion, using the bundled cache.xml sample of GemFire installation.

<?xml version="1.0"?>
<!DOCTYPE cache PUBLIC
    "-//GemStone Systems, Inc.//GemFire Declarative Caching 7.0//EN"
    "http://www.gemstone.com/dtd/cache7_0.dtd">

<cache>
        <region name="exampleRegion">
                <region-attributes refid="PARTITION">
                </region-attributes>
        </region>
</cache>

The environment is ready to go. I’m going to continue the next steps using IntelliJ IDE but they could easily be adapted to for any other common Java IDE.

Creating Functions

Assuming you’re new to GemFire functions, we’re going to run a very simple function that will run on every node and print a message, really just an example. Create a new Java class named HelloFunction.java and extends com.gemstone.gemfire.cache.execute.FunctionAdapter. Implement the required methods execute() and getId() and let’s just print a message in the server logs. It’s important to give a meaningful ID for the function since it’s how you’re going to call it on the system. By default GemFire functions expects results and since we’re not dealing with data and not resulting anything we’re going to change that setting by implementing hasResult() and isHA() methods and returning false.

@Override
public void execute(FunctionContext functionContext) {
System.out.println("Hello, I'm running here");
}

@Override
public String getId() {
return HelloFunction.class.getCanonicalName();
}

@Override
public boolean hasResult() {
return false;
}

@Override
public boolean isHA() {
return false;
}

Note: An alternative to implementing hasResult() and isHA() is to just return an empty result or a status from the execute() method with functionContext.getResultSender().lastResult(0) for example; Of course that by doing this your clients will wait for function execution to complete, so it may be a better approach to set hasResult() to false.

Now the usual process would be to generate a jar file from this application and deploy it to a running GemFire cluster using gfsh. But I’ve have created a Gradle task to simplify even more this process and run it directly after the build or from an IDE for example. Assuming you’re following the article and have GemFire servers running and cloned FunctionSamples project, just run deployJar Gradle task.

$./gradlew deployJar
Creating properties on demand (a.k.a. dynamic properties) has been deprecated and is scheduled to be removed in Gradle 2.0. Please read http://gradle.org/docs/current/dsl/org.gradle.api.plugins.ExtraPropertiesExtension.html for information on the replacement for dynamic properties.
Deprecated dynamic property: "transitive" on "root project 'FunctionSamples'", value: "true".
Deprecated dynamic property: "libDir" on "task ':deployJar'", value: "build/libs/FunctionSam...".
:compileJava
:compileGroovy
:processResources UP-TO-DATE
:classes
:jar
:deployJar

(1) Executing -  connect

Connecting to Locator at [host=localhost, port=10334] ..
Connecting to Manager at [host=anakin.local, port=1099] ..
Successfully connected to: [host=anakin.local, port=1099]

(2) Executing -  deploy --jar=build/libs/FunctionSamples-1.0.jar

Member  |      Deployed JAR       | Deployed JAR Location
------- | ----------------------- | --------------------------------------------------------------------------------------------------------------------
server1 | FunctionSamples-1.0.jar | /Users/markito/Projects/Pivotal/workspaces/articles/FunctionSamples/servers/server1/vf.gf#FunctionSamples-1.0.jar#1
server2 | FunctionSamples-1.0.jar | /Users/markito/Projects/Pivotal/workspaces/articles/FunctionSamples/servers/server2/vf.gf#FunctionSamples-1.0.jar#1

stty: stdin isn't a terminal

BUILD SUCCESSFUL

Total time: 15.56 secs

First it obviously compiled the new class and generated the jar file, then executing the deployJar task it connected into the running locator (the GemFire equivalent of a namenode/load-balancer) and discovered the other two members of the system. After that it just copied jar file into each server folder, assigning a versioning number, so you can see check if a server has a problem during this process and identify which servers has older versions. Now let’s connect to the server and call the function from gfsh command line utility, which is a great tool for such tests. If you don’t have gfsh on your PATH, just use the provided setEnv.sh and then proceed by connecting ( connect ) to the system and list all available functions (list functions).

(markito@anakin)$ . ./servers/setEnv.sh
Gemfire environment set...
(markito@anakin)$ gfsh
_________________________     __
/ _____/ ______/ ______/ /____/ /
/ /  __/ /___  /_____  / _____  /
/ /__/ / ____/  _____/ / /    / /
/______/_/      /______/_/    /_/    v7.0.2

Monitor and Manage GemFire
gfsh>connect
Connecting to Locator at [host=localhost, port=10334] ..
Connecting to Manager at [host=anakin.local, port=1099] ..
Successfully connected to: [host=anakin.local, port=1099]

gfsh>list functions
Member  | Function
------- | -------------------------------------------------------------------
server1 | NashornFunction
server1 | com.pivotal.gemfire.samples.functions.ExternalScriptFunctionAdapter
server1 | com.pivotal.gemfire.samples.functions.HelloFunction
server1 | com.pivotal.gemfire.samples.functions.SimpleFunction
server1 | com.pivotal.gemfire.samples.functions.SimpleGroovyFunction
server2 | NashornFunction
server2 | com.pivotal.gemfire.samples.functions.ExternalScriptFunctionAdapter
server2 | com.pivotal.gemfire.samples.functions.HelloFunction
server2 | com.pivotal.gemfire.samples.functions.SimpleFunction
server2 | com.pivotal.gemfire.samples.functions.SimpleGroovyFunction

Here you can see all functions available on our project, including the ones I’m going to cover later on the article using Nashorn and Groovy (a bonus). In order to execute the function from gfsh we need to, guess what, call the execute function command passing the function id and depending on the function type you need to give it a target (region, group or member). Since this current function will only print a message it can be called on every member.

gfsh>execute function --id=com.pivotal.gemfire.samples.functions.HelloFunction
Execution summary

Member ID/Name         | Function Execution Result
------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
anakin(server2:7723):51377 | While executing function : com.pivotal.gemfire.samples.functions.HelloFunction on member : anakin(server2:7723):51377 error occured : Cannot return any result as the Function#hasResult() is false
anakin(server1:7711):9790  | While executing function : com.pivotal.gemfire.samples.functions.HelloFunction on member : anakin(server1:7711):9790 error occured : Cannot return any result as the Function#hasResult() is false

By default the gfsh client expects some result from the function and will complain that the function hasResult() method is false, which is fine. You can now verify both server1 and server2 logs for the printed “Hello” message.

==> server1/server1.log <==
Hello, I'm running here
==> server2/server2.log <==
Hello, I'm running here

Or alternatively you can check server logs from gfsh as well using the show log command.

gfsh>show log --member=server2 --lines=3
SystemLog:
Hello, I'm running here
Hello, I'm running here
Hello, I'm running here

Use case: Data clean up

So far so good, now let’s do something useful with functions implementing a use case. In the project there is a com.pivotal.gemfire.samples.loader.LoadData class that can be executed to produce some entries in the /exampleRegion, run this class before proceeding. It’s a simple GemFire client that connect to the system and put some Customer objects that we’re generating with some fake data for testing. The Customer class has only 4 fields, ID, NAME, E-MAIL and CREDIT CARD NUMBER and for testing purposes it’s generating some customers with invalid e-mails and some customers with invalid credit cards so we can implement GemFire functions for simple data clean up, one of the 21st century biggest problems, right ? And to make things more interesting there are versions of the same function using Java 8 syntax, Groovy and Javascript (through Java 8 Nashorn).

Java Function

SimpleFunction.java has credit card validation and set invalid ones to empty String (“”). Also, since the region is partitioned and this is a data-dependent function we need to give it a region to work which can be specified on gfsh with —region=/regionName

…
public void execute(FunctionContext functionContext) {

RegionFunctionContext rfc = (RegionFunctionContext) functionContext;
Region<Object, Customer> region = PartitionRegionHelper.getLocalDataForContext(rfc);

// check every credit card and clear invalid ones
region.forEach((Object id, Customer customer) -> {
if (!RandomCreditCardGenerator.isValidCreditCardNumber(customer.getCcNumber())) {
customer.setCcNumber("");
System.out.println(String.format("Customer %s has an invalid credit card.", id));
region.put(id, customer);
}
});

rfc.getResultSender().lastResult("Done.");
}
…

Call the function from gfsh:

gfsh>execute function --id=com.pivotal.gemfire.samples.functions.SimpleFunction --region=/exampleRegion

Check the both cache server logs and note that the function is being executed on both nodes and data updates are happening locally.

==> server1/server1.log <==
Customer 405 has an invalid credit card.
Customer 123 has an invalid credit card.
Customer 407 has an invalid credit card.
Customer 13 has an invalid credit card.
……
==> server2/server2.log <==
Customer 81 has an invalid credit card.
Customer 391 has an invalid credit card.
Customer 201 has an invalid credit card.
……

Note that calling the function again will not produce these messages since there are no invalid credit cards anymore.

Nashorn Function

SimpleNashornFunction.java was implemented from the Java side pretty much as a wrapper of the JavaScript file where the actual business logic is. The execute() method is performing an e-mail validation on the customer objects, using traditional e-mail validations for JavaScript available everywhere on internet, then cleaning up invalid ones. Let’s look at some code:

public SimpleNashornFunction() throws ScriptException, FileNotFoundException, UnsupportedEncodingException {
engineManager = new ScriptEngineManager();
engine = engineManager.getEngineByName("nashorn");

InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(jsFile);
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));

engine.eval(reader);

Invocable invocable = (Invocable) engine;
_function = invocable.getInterface(com.gemstone.gemfire.cache.execute.Function.class);
}
...

I’m assuming that the JavaScript file is an acceptable implementation of GemFire Function interface then assigning it to a shadow object function object which will be used on other methods as follows:

...
@Override
public void execute(FunctionContext fc) {
_function.execute(fc);
}

@Override
public String getId() {
return _function.getId();
}

Now on the JavaScript side there is a NashornFunction.js file under resources folder which has functions as getId() , execute() or hasResult(), the methods required by GemFire Function interface. Here is the getID() and execute() methods in JavaScript:

function getId() {
return "NashornFunction";
}

function execute(context) {
var PartitionHelper = Java.type("com.gemstone.gemfire.cache.partition.PartitionRegionHelper");
counter = 0;
var region = PartitionHelper.getLocalDataForContext(context);
region.forEach(function (id,customer) {
//context.getResultSender().sendResult("Processing " + id);
if ( (customer.email.length > 0) && (!isEmailValid(customer.email)) ) {
print("Customer " + customer.name + " has an invalid e-mail");
customer.email = "";
region.put(id,customer);
counter++;
}
});
context.getResultSender().lastResult("Done. " + counter + " changed objects");
}

The commented line can be used if you want to receive partial results from the function execution, in this case, the IDs already processed as soon as they’re processed. Other than that the code is very similar to the Java 8 version of credit card validation. Very simple, huh? With the jar deployed you can call it from gfsh as follows:

gfsh>execute function --id=NashornFunction --region=/exampleRegion
Execution summary

Member ID/Name          | Function Execution Result
------------------------------- | ----------------------------------------------------
anakin(server2:11844):18390 | Done. 124 changed objects
Done. 126 changed objects

Note that you have returned the number of changed objects on each JVM. Then you may want to check the server logs (server1.log and server2.log) for processing information:

==> server1/server1.log <==
Customer John80 has an invalid e-mail
Customer John390 has an invalid e-mail
Customer John278 has an invalid e-mail
Customer John82 has an invalid e-mail
Customer John392 has an invalid e-mail
Customer John200 has an invalid e-mail
…

This wrapper approach is looking for the NashornFunction.js on the classpath but since Nashorn is actually parsing the JavaScript dynamically, why not let the JavaScript file out of the jar package so you can update the file and run a new version or even a new function code without compilation and deployment to the servers ? Of course that the file must be available for the JVMs, but that’s simple to solve on real world scenario through SAN/NAS, etc… That’s exactly what ExternalScriptFunction does by receiving two parameters, the location of the JavaScript file and the function you want to call on that file, it will execute your JavaScript file inside GemFire JVMs, which is a powerful combination to have distributed JavaScript execution on the server side, collocated with data. Let’s take look at some code:

@Override
public void execute(FunctionContext fc) {

ScriptEngineManager engineManager = new ScriptEngineManager();
ScriptEngine engine = engineManager.getEngineByName("nashorn");

if ((fc.getArguments() != null)) {
// full path to javascript file
final String jsFile = ((String[]) fc.getArguments())[0];      // javascript file
final String method = ((String[]) fc.getArguments())[1];     // method to be called

try {
engine.eval(new FileReader(jsFile));

Invocable invocable = (Invocable) engine;
RegionFunctionContext rfc = (RegionFunctionContext) fc;

// call execute function on javaScript side
invocable.invokeFunction(method, rfc);

} catch (FileNotFoundException | ScriptException | NoSuchMethodException ex) {
Logger.getLogger(ExternalScriptFunction.class.getName()).log(Level.SEVERE, null, ex);

}
}
}

Very straightforward implementation on Java side and from Javascript you don’t actually need to implement all methods required by GemFire Function interface since the Java side is only calling one specific function there anyway. Now calling it from gfsh very simple too, just pass arguments to the GemFire function:

gfsh>execute function --id=com.pivotal.gemfire.samples.functions.ExternalScriptFunction --arguments="/Users/markito/Projects/Pivotal/workspaces/articles/FunctionSamples/src/main/resources/NashornFunction.js","execute" --region=/exampleRegion

Remember to run LoadData again so you have some invalid data for testing. Here I’m calling the exact same code we called before by passing “execute” as the name of function to execute on the JavaScript file, performing e-mail validation again. Let’s now call another method of this JavaScript file:

gfsh>execute function --id=com.pivotal.gemfire.samples.functions.ExternalScriptFunction --arguments="/Users/markito/Projects/Pivotal/workspaces/articles/FunctionSamples/src/main/resources/NashornFunction.js","validateCards" --region=/exampleRegion
…

And if you check the both server logs you can see that we’re performing the credit card validation as expected.

==> server2/server2.log <==
Customer Mary125 has an invalid credit card:48720386601453991
Customer Mary325 has an invalid credit card:49297365787823271
Customer Mary163 has an invalid credit card:46489340791567091
…
==> server1/server1.log <==
Customer Mary233 has an invalid credit card:46864508028522361
Customer Mary119 has an invalid credit card:49160166021681981
Customer Mary237 has an invalid credit card:44850318956088501
…

Using Java (SimpleFunction.java) or even the bundled version of NashornFunction.js file (SimpleNashornFunction.java), if you want to change anything on the code you would have to compile, deploy and run, but since we’re loading an external file here you can modify the NashornFunction.js file and just call the function from gfsh. The current validateCards() on NashornFunction.js is not cleaning invalid cards, this code is commented out, so let’s remove those comments and call it again from gfsh.

...
function validateCards(context) {
var PartitionHelper = Java.type("com.gemstone.gemfire.cache.partition.PartitionRegionHelper");
counter = 0;
var region = PartitionHelper.getLocalDataForContext(context);
region.forEach(function (id,customer) {
//context.getResultSender().sendResult("Processing " + id);
if ( (customer.ccNumber.length > 0) && (!Mod10(customer.ccNumber)) ) {
print("Customer " + customer.name + " has an invalid credit card:" + customer.ccNumber);
customer.ccNumber = ""; // uncomment
region.put(id, customer); // uncomment
counter++; // uncomment
}
});
context.getResultSender().lastResult("Done. " + counter + " changed objects");
}

Then call the function again from gfsh and check the server logs. Note that the 1st execution will return the number of changed objects, but next calls will just return 0.

gfsh>execute function --id=com.pivotal.gemfire.samples.functions.ExternalScriptFunction --arguments="/Users/markito/Projects/Pivotal/workspaces/articles/FunctionSamples/src/main/resources/NashornFunction.js","validateCards" --region=/exampleRegion

Bonus: Groovy Function

Groovy is a 1st class citizen on the JVM for a very long time and there is no surprise in the fact that you can implement GemFire Functions in pure Groovy. The only requirement here is to obviously have Groovy library on GemFire server classpath. The Groovy class implementation can extend GemFire FunctionAdapter implementing the required methods exactly the same way you would do in pure Java. One may ask, “Why include a Groovy example if it’s that similar to the Java implementation ?” The answer is simple, remember that GemFire is not yet certified to run on Java 8 production environments, so if you do want leverage the power of lambdas and closures in GemFire functions, Groovy provides a nice and clean alternative that works on Java 6 and Java 7 and that some people may still prefer to use over Java 8. There are also some advantages of the Groovy syntax being simpler and provide other features…

void execute(FunctionContext functionContext) {
RegionFunctionContext rfc = (RegionFunctionContext) functionContext;
Region<Object,Object> region = PartitionRegionHelper.getLocalDataForContext(rfc);

// check every credit card and clear invalid ones
region.collect({ id, customer ->
if (!creditCadGen.isValidCreditCardNumber(customer.ccNumber)) {
customer.ccNumber = ""
region.put(id, customer);
rfc.getResultSender().sendResult("Customer $id modified");
println("Customer $id has an invalid credit card.");

}
});
rfc.getResultSender().lastResult("Done.");
}

And if you are following the post until here you should already know how to call the Groovy version using gfsh:

gfsh>execute function --id=com.pivotal.gemfire.samples.functions.SimpleGroovyFunction --region=/exampleRegion
Execution summary

Member ID/Name          | Function Execution Result
------------------------------- | ----------------------------------------------------
anakin(server2:11844):18390 | Done. 107 changed objects
Done. 115 changed objects

Conclusion

✓ GemFire functions offers a very flexible mechanism to run distributed code on multiple JVMs and leverage data locality in order to improve data processing.
✓ Java 8 syntax with Lambdas and other enhancements on the Java collections can really save some time to implement data processing.
✓ Java 8 Nashorn is really simple to use and a fast JavaScript implementation that allows developers to leverage existing JavaScript knowledge and code to run on the JVM and mix and match Java objects and JavaScript syntax.
✓ Groovy is still a very powerful alternative for people that is looking for Lambdas and Closures on Java 6, Java 7 and still on Java 8. People looking to remove some boilerplate that Java requires will also benefit from Groovy syntax without sacrificing performance.
✓ GemFire remote deployment for functions offers a powerful tool for developers that want to add functionality to a running system without having to restart or worry about package distribution and versioning.

There are still a bunch of areas to explore using lambdas and parallel processing features of Java 8 that could be used on GemFire functions but that may be something for the next post…

References

Mining Twitter data using Python and GemFire XD

Python is a very popular and powerful programming language, that’s a fact. Some people may find useful to write a Python client for GemFireXD, this article describes one of the easiest alternatives to accomplish it while explaining some details about Twitter API.

The standard for database connectivity in Python is DB-API 2.0 (PEP 249) which is almost like JDBC for Java. There are a bunch of implementations for most traditional databases like Oracle and MySQL, but very few alternatives for NewSQL databases like GemFire XD. As you may already know, GemFire XD is based on Apache Derby (10.4) and as very few will remember, Derby was initially distributed by IBM as Cloudspace until 2004, when the code was contributed to Apache Software Foundation as an incubator project. And why is important to remember this piece of history  ? Because we’re going to leverage this IBM background of Apache Derby and use pyDB2 library for GemFire XD connectivity.

To make the tutorial a little bit more interesting we’re going to write a simple Twitter client that populate tweets into GemFire XD and some examples on how we can analyze the collected data.

Requirements and dependencies

Let’s get started by installing the dependencies:

  • DB2 Express-C – A no-charge (to develop/deploy), community edition of DB2 available for multiple platforms.
  • pyDB2 – Written by Man-Yong Lee, is licensed under LGPL and is the DB-API 2.0 compliant interface for IBM DB2 database.
  • TwitterAPI – A Python wrapper for Twitter API with support for Streaming.

Of course Python (2.7+) and GemFire XD are also required and download links are available under References.

Note: Alternatively you could also use GemFire XD ODBC drivers and mxODBC module written by Marc-Andre Lemburg, which has a commercial license or even pyODBC which is an open source alternative, but would rely on platform specific ODBC drivers.
Note 2: An alternative to DB2 Express-C is the DB2 Universal Run-Time client, but AFAIK it’s only available for Windows.
Note 3: I’m using OSX/Unix based instructions here but most of the instructions can easily be matched in a Windows environment.

Installation

DB2 Client Install

Download and install DB2 Express-C – For Mac OS X the package is called db2_v101_macos_expc.tar.gz – Uncompress it anywhere you want and run the installer command:

$ tar -xzvpf db2_v101_macos_expc.tar.gz
$ cd expc/
$ ./db2_install   (can be ran using sudo for system-wide installation, but it’s not required)

Follow the installation instructions, they’re intuitive.

pyDB2 install

After the installation is completed, download and install pyDB2. You need to modify the setup.py script to include the location you have installed the DB2 client:

$ tar -xzvpf PyDB2_1.1.1-1.tar.gz
$ cd PyDB2_1.1.1/
$ vi setup.py

Look for DB2_ROOT variable and point it to your DB2 installation – In my case, using the defaults for installation without using sudo, it’s under a folder called sqllib in user home directory.

DB2_ROOT = "/Users/markito/sqllib/"

Save and quit the setup.py file and perform the installation (requires root/sudo):

$ sudo python setup.py install

TwitterAPI install

Using easy_install simply do:

$ sudo easy_install TwitterAPI

Configuration and startup

Starting Gemfire XD

We’re going to use two data nodes and one locator with default settings, just changing listening ports since we’re running it on a single host.

$ cd ~
$ mkdir locator server1 server2
$ sqlf locator start -jmx-manager-start=true -dir=locator
....Starting SQLFire Locator...
$ sqlf server start -locators=localhost[10334] -client-port=1528 -dir=server1
....Starting SQLFire Server...
$ sqlf server start -locators=localhost[10334] -client-port=1529 -dir=server2
....Starting SQLFire Server...

Note that we have started the locator with -jms-manager-start=true which will start Pulse, the embedded monitoring tool for GemFire XD.

Cataloging GemFire XD under DB2 client

In order to connect from pyDB2 to GemFire XD we need to “catalog” our database, which is a way to register the database into the DB2 client with an alias. This is required even for pure DB2 installations, when for example your database server is hosted on a different machine than your client.

$ cd $DB2_ROOT (~/sqllib/)
$ cd bin
$ ./db2 catalog tcpip node GFXDNODE remote localhost server 1528
DB20000I  The CATALOG TCPIP NODE command completed successfully.
DB21056W  Directory changes may not be effective until the directory cache is
refreshed.
$ ./db2 catalog database GFXD at node GFXDNODE authentication server
DB20000I  The CATALOG DATABASE command completed successfully.
DB21056W  Directory changes may not be effective until the directory cache is
refreshed.

Note: If you messed up during this process or want to rename your catalog node, use the uncatalog option.

Creating the application and testing

I’m going to use STS (Spring Tool Suite) with PyDev plugin for Python development, but it can easily be done using VIM or any other text editor. The reasoning for PyDev is code completion which is handy for occasional Python developers (like myself) and STS also offers a built-in SQL interface that can be used to browse GemFire XD tables and perform queries, which could be also done through sqlf command line utility.

Creating the TWEETS table

Read this article on how to setup GemFire XD connectivity with STS (Eclipse) for database development. With the setup in place, open a SQL Scrapbook and create a table:

CREATE TABLE TWEETS
(
id BIGINT,
created_at TIMESTAMP,
favorited VARCHAR(5),
lang VARCHAR(3),
retweet_count INT,
retweeted VARCHAR(5),
source VARCHAR(256),
text VARCHAR(150),
user_id BIGINT,
CONSTRAINT FAVORITED_CONSTRAINT CHECK (favorited in ('True','False')),
CONSTRAINT RETWEETED_CONSTRAINT CHECK (retweeted in ('True','False')),
PRIMARY KEY (id)

) PARTITION BY COLUMN (user_id);

We’re creating a simple table mapping the most common Tweet fields and partition by user_id, which should scale well if we later decide to have data about the Users related to these tweets co-located with it’s own tweets. But according to your needs, you may want to partition by Tweet location or any other criteria. Other than that, it’s the usual create table SQL syntax.

Creating the Python client

  1. On STS create a new PyDev project and name it PyTweetXD
  2. Under the project create a new PyDev module and name it GfxdClient.py and let’s write a hello world query:
import DB2

conn = DB2.connect('GFXD', 'app','app')
cursor = conn.cursor()
cursor.execute('select * from sys.systables')

print cursor.fetchone()

if __name__ == '__main__':
    pass

Run this Python script and you should see the query results printed to stdout.

(‘2faf0139-0142-bae2-7db6-000007d2f9a8′, ‘SYSXPLAIN_SCAN_PROPS’, ‘T’, ‘c013800d-00fb-2644-07ec-000000134f30′, ‘SYSSTAT’, ‘R’, ”, ‘NORMAL’, None, None, None, None, None, None, None, None, None, 0, None)

Now let’s write a GfxdClient class that will be reused to persist our tweets from the Twitter client

  1. First create a file named config and put our connection details there. Create a section on this file so we can reuse it later for Twitter client settings as well.
    [GemFireXD]
    database=GFXD
    user=app
    password=app
    
  2. With the help of ConfigParser Python module we’re going to create a simple class that can use this information and connect to GemFireXD
    # -*- coding: utf-8 -*-
    import DB2
    
    from ConfigParser import ConfigParser
    
    class GfxdClient():
        _cursor = None
        _conn = None
        _config = ConfigParser()
    
        def __init__(self):
            self._config.read('config')
    
        def readConfig(self, name):
            return self._config.get('GemFireXD',name)
    
        def connect(self):
            self._conn = DB2.connect(self.readConfig('database'), self.readConfig('user'), self.readConfig('password'))
    
        def getCursor(self):
            if (self._cursor == None):
                if (self._conn != None):
                    _cursor = self._conn.cursor()
                else:
                    raise Exception("No active connection!")
            return _cursor
    
        def execute(self, sql, params = None):
            self.getCursor().execute(sql, params)
    
        def select(self, sql, params = None):
            return self.execute(sql, params)
    
        def insert(self, sql, params):
            return self.execute(sql, params)
    

Finally we are going to write a Twitter client that will monitor public tweets for a given term using the Twitter RESTful stream API. Before proceeding, make sure you have registered an application at https://dev.twitter.com/apps and generate the OAuth keys, which are required –  To make it simple, put the OAuth keys in the config file under a Twitter section.  Your config file may look like this:

[Twitter]
consumer_key=*************
consumer_secret=***************************
access_token_key=***************************
access_token_secret=*************************

[GemFireXD]
database=GFXD
user=app
password=app

Create a new Python module named TwitterClient.py which is a script that collect public real-time tweets and persist them into GemFire XD. Here is the code:

# -*- coding: utf-8 -*-
from TwitterAPI import TwitterAPI
from GfxdClient import GfxdClient
from ConfigParser import ConfigParser
import time
import signal
import sys

successCount = 0
errorCount = 0

'''
Print some stats after script is stopped
'''
def signal_handler(signal, frame):
        print '\n# Stats'
        print 'Saved tweets: %d' % successCount
        print 'Errors: %d' % errorCount
        sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

### Read config file
config = ConfigParser()
config.read('config')

# get twitter keys
consumer_key        = config.get('Twitter', 'consumer_key')
consumer_secret     = config.get('Twitter', 'consumer_secret')
access_token_key    = config.get('Twitter', 'access_token_key')
access_token_secret = config.get('Twitter', 'access_token_secret')

# the subject you want to track on twitter
subject = '#NBA'

# twitter streaming api
api = TwitterAPI(consumer_key,consumer_secret,access_token_key,access_token_secret)
stream = api.request('statuses/filter', {'track': subject})

# Connect to GemFireXD
gfxd = GfxdClient()
gfxd.connect()

print "Reading Twitter stream for subject: %s (hit ctrl+c to stop)" % subject

# read streaming data and persist into GemFireXD
for tweet in stream.get_iterator():
    sql = "insert into tweets values (?,?,?,?,?,?,?,?,?)"
    try:
       params = (tweet['id'],time.strftime('%Y-%m-%d %H:%M:%S', time.strptime(tweet['created_at'],'%a %b %d %H:%M:%S +0000 %Y')),str(tweet['favorited']), \
      str(tweet['lang']),tweet['retweet_count'],str(tweet['retweeted']),str(tweet['source']),unicode(tweet['text']),tweet['user']['id'])

        gfxd.insert(sql, params)
        successCount += 1
    except Exception as e:
        errorCount += 1
        print e

if __name__ == '__main__':
    pass

We’re basically tracking every tweet that has the NBA word and inserting into GemFire XD. If any error occur we’ll just print the exception, count it but keep processing. Running the script to test on a Friday night with 10 games scheduled, I was able to collect approx. 80k tweets, which may not represent a huge amount of data (in GemFire XD scale) but we can already analyze and it’s good enough for some examples.

Monitoring using Pulse

Now while the script is running let’s open a browser and check Pulse, the GemFireXD built-in monitoring tool, and see what’s happening with the servers.  The default URL for Pulse is: http://localhost:7070/pulse

After login (admin/admin) the first page gives you a clear and beautiful overview of the system, with total heap available (from both servers), current memory usage and last 15 minutes throughput (Put/Get operations rate).

PulseMainPage

We can double click one of the servers and check that the load is being balanced between both nodes.

PulseServerPage

Let’s get back to STS and perform some basic data analysis running some SQL queries. One of the joys of GemFire XD is that it will take care of parallelizing the query between the members, so even if I scale up or down, the locator will take care of load balancing the query between members and doing the proper data distribution.

Data analysis

Sources Rank (Query)

Most Twitter clients share their name through a field named ‘source’ on every tweet. So if you post a tweet from an iPhone the tweet source field look like this:
<a href=”http://twitter.com/download/iphone” rel=”nofollow”>Twitter for iPhone</a>

But if you post a tweet from an Android device it may look like this: <a href=”http://twitter.com/download/android” rel=”nofollow”>Twitter for Android</a>

Now back on STS on the SQL Scrapbook we can perform the following query to see which source has more tweets from the tweets we collected:

-- rank of tweet sources
  select ROW_NUMBER() OVER() as RANK,
         source, count(*) total
    from tweets
group by source
order by total desc

The first 20 results of this query returned the following:

GFXDResultsSourceRank

Based on these results we can see that the number of IPhone users double the number of Android users in number of messages.

Languages Rank (Query)

On another quick example, we know that the NBA league is pretty much global with 90+ international players representing more than 30 different countries. But which languages, besides English of course, talks about NBA on Twitter? Run the following query against the data to  get an idea:

-- languages most tweeted
  select ROW_NUMBER() OVER() as RANK,
         lang, count(*) total
    from tweets
group by lang
order by total desc

And the top 20 results of this query are:

GFXDResultLanguageRank

According to this data English tops the list of course, followed by Spanish, but then comes French, Japanese and German.

Word Frequency Analysis (Python script)

Our final example will use another Python script to parse the text from all tweets and return the most used words on all messages, leveraging the Counter class from Python collections package. The script looks like this:

import DB2
from ConfigParser import ConfigParser
from collections import Counter

config = ConfigParser()
config.read('config')

conn = DB2.connect(config.get('GemFireXD', 'database'), config.get('GemFireXD', 'user'), config.get('GemFireXD', 'password'))

SQL = 'SELECT text FROM tweets'
cursor = conn.cursor()
cursor.execute(SQL)

BATCH_SIZE = 10000
result = cursor.fetchmany(BATCH_SIZE)
wordCounter = Counter()

while result:
    # token list with every word with more than 3 letters on every tweet
    words=[ w
            for t in result
                for w in t[0].split()
                    if len(w) >= 3]
    # count frequency on token list
    for item in [words]:
        c = Counter(item)
        wordCounter = wordCounter + c

    try:
        result = cursor.fetchmany(BATCH_SIZE)
    except Exception as e:
        result = None
        print e

print wordCounter.most_common()[:10] # top 10 print

if __name__ == '__main__':
    pass

We’re going to call this script from a shell and evaluate how long it takes to complete. This is the output:

-bash-4.1$ time python BatchWordAnalyzer.py
[('#NBA', 16678), ('Kyle', 11120), ('Korver', 10916), ('#LosAngeles', 8163), ('made', 7304), ('@NBA:', 7275), ('record.', 6840), ('games', 6804), ('straight', 5892), ('Happy', 5759)]

real	0m5.809s
user	0m5.523s
sys	0m0.031s

So in about 6 seconds were able to parse 80000+ tweets, count every word and their frequency, generating the top 10 words of all these messages. Based on this result it’s clear that everybody was talking about Kyle Korver 3pt record. He has hit a shot from behind the arc for 90 consecutive games, an impressive mark, by the way.

Conclusion

This article demonstrated an alternative to ODBC for Python clients that want to use GemFire XD. It also shows how easy it can be to create an application that can store and analyze data from tweets about an specific topic using Twitter’s RESTful Stream API.  A tweet contains a very limited number of characters and that’s why we didn’t even  have to enable disk persistence or any other more advanced feature of GemFire XD, at least for this load. But for the person that may want to keep this data persisted for historical analysis it can easily be done.

Note that we’re using a very naive approach for the Python code, without parallelism  for example, which would definitely improve our processing time and by adding data locality and processing data inside the GemFire nodes and not in the client, it would be a LOT faster.

What’s next ?

If you are using GemFire XD inside the Pivotal HD VM and want to play with GemFire XD  PHD integration, here goes  the extra work that needs to be done for Hadoop persistence using GemFire XD:

Create a HDFS persistence store:

sqlf> CREATE HDFSSTORE tweetStore</pre>
NAMENODE 'hdfs://pivhdsne:8020'
HOMEDIR '/tweetStore'
BATCHTIMEINTERVAL 5000;
0 rows inserted/updated/deleted

And modify the table DDL as follows:

CREATE TABLE TWEETS
(
id BIGINT,
created_at TIMESTAMP,
favorited VARCHAR(5),
lang VARCHAR(3),
retweet_count INT,
retweeted VARCHAR(5),
source VARCHAR(256),
text VARCHAR(150),
user_id BIGINT,
CONSTRAINT FAVORITED_CONSTRAINT CHECK (favorited in ('True','False')),
CONSTRAINT RETWEETED_CONSTRAINT CHECK (retweeted in ('True','False')),
PRIMARY KEY (id)

) PARTITION BY COLUMN (user_id)
HDFSSTORE (tweetStore);

That’s it! Now your tweets will be persisted into HDFS! Simple huh ?

In a future post I’m going to demonstrate how to create the same kind of analysis using SpringXD and GemFireXD, but looking  for trending topics in the hope of generating more data. 

References

Setting up JDBC tools for GemFire XD

There are a lot of advantages by having a database connection directly from your IDE while developing an application. You can reuse that connection and leverage some utilities like a JPA Entities from Tables wizard to generate your Java objects using your preferred ORM framework or even test some queries before getting those written to your application.  Some tools may even generate detailed diagrams, useful for documentation.

I’m going to illustrate a step-by-step guide on how to setup such connection on the most popular Java IDEs

  • IntelliJ
  • Spring Tool Suite – STS (Eclipse)
  • NetBeans

How to setup GemFire XD connection from IntelliJ 12

  1. In IntelliJ, click on View > Tools Windows > Database
  2. Under the Database tab right-click and select New > Data source
  3. In the Data Source properties set your data source name (GFXD Connection for example)
    DSIntelliJ
  4. In the JDBC Driver Source, click on the “…” browse to your GemFireXD installation directory and under /lib select sqlfireclient.jar
  5. Make sure JDBC driver class is set to com.vmware.sqlfire.jdbc.ClientDriver
  6. Set Database URL to jdbc:sqlfire://<HOSTNAME>:1527 and user and password to the same value: “app
  7. Before testing, let’s set the SQL Dialect to Derby, as GemFireXD is based on Derby – To do that, go to the Console tab and select from the Derby on the dialect list.
  8. Back on the Database tab, click Test Connection and if you get a success message click Ok.
  9. Now still in the database tab right-click the SYS schema – Under Diagrams select Show Visualization – This will generate a graphical view of internal tables of GemFire XD, very useful for troubleshooting or to a better understanding of internal tables.
    DATABASE_c49880f7-298a-45b6-8c71-bd52b69b5771.schema_SYS_-_sqlfire-mapreduce_-____Projects_sqlfire-mapreduce__and_Inbox__2__-_wmarkito_gopivotal.com_-_GoPivotal_Inc_Mail
  10. To complete this tutorial, let’s open a simple SQL command prompt – Right-click the connection and select Console
  11. In the console you have auto-completion for most SQL commands and table names.  Run a “select * from sys.members;” in order to check the current running members of the distributed system.
    GFXD_Connection_-_sqlfire-mapreduce_-____Projects_sqlfire-mapreduce__and_1._root_phdvm__opt_hadoop_hadoop_bin__java_

Now let’s repeat the steps above on STS and Netbeans.

How to setup GemFire XD connection from Spring Tool Suite (STS)

  1. In STS (Eclipse) click on File > New > Other
  2. Expand Connection Profiles in the wizard window and select Connection Profile and click Next
    New
  3. In the Connection Profile types, select Derby and give your connection a name (GFXD Connection for example) – Click Next
    New_Connection_Profile
  4. In the Drivers selection, click on New Driver Definition
  5. Select Other Driver from the list – You can use version 10.2 –  Fill the driver name with “GemFireXD Driver”
    New_Driver_Definition
  6. Click on JAR List tab, then Add JAR/Zip… and look for sqlfireclient.jar under /lib directory of GemFireXD installation
  7. Move to the Properties tab and set the properties as the following:
    • Connection URL:  jdbc:sqlfire://<HOSTNAME>:1527
    • Database Name: app
    • Driver Class: com.vmware.sqlfire.jdbc.ClientDriver
    • User ID: app
    • Password: app
      New_Driver_Definition-2
  8. Click Ok and make sure you have GemFireXD Driver in the drivers combo box.
  9. Click on Test Connection and if you get a success message, you’re good to go.
  10. To start using your connection, go to Window > Open Perspective > Other and select Database Development then click Ok
    Screenshot_11_21_13__5_39_PM
  11. Then under Data Source Explorer right-click the GFXD Connection and click Open SQL Scrapbook
  12. Now you can easily manage your tables and perform SQLF queries  directly from STS.
    Database_Development_-_GFXD_Connection_SQL_Scrapbook_0_-_Spring_Tool_Suite_-__Users_markito_Projects_Pivotal_workspaces_articles-2

How to setup GemFire XD connection from NetBeans 7.x

  1. On NetBeans, go to Services tab, right-click Databases and select New Connection
  2. In the New Connection Wizard window, set the Driver combo box to New Driver, which will bring up a new window
  3. Set up the Driver File to sqlfireclient.jar located under /lib of GemFire XD installation
  4. In the Driver class field click on the Find button so the IDE will look for Driver classes on the jar file.
  5. Set the Name as “GemFire XD Driver” and click Ok.
    New_JDBC_Driver_and_New_Connection_Wizard_and_NetBeans_IDE_7.3.1-3
  6. Back at New Connection wizard, GemFire XD Driver will be already selected, so just click Next
    New_Connection_Wizard
  7. Now just fill username/password (app/app) and set the URL to jdbc:sqlfire://<HOSTNAME>:1527
  8. Click Next – Select your default schema and Next again
  9. Give your connection a meaningful name – “GemFireXD Connection” for example
  10. Now you are ready to manage and query your tables through  NetBeans
    NetBeans_IDE_7.3.1-2

Note 1: If you authentication enabled (auth-provider property) use the user and password specified at those settings, otherwise the default user/password are app/app.

Note 2: The hostname used in the connection can be a locator or an specific server of your distributed system. 

You can read more about GemFire XD at:

Getting Started with Oracle WebLogic Server 12c: Developer’s Guide


Getting_Started_with_Oracle_WebLogic_Server_12c-_Developer’s_Guide

Getting Started with Oracle WebLogic Server 12c: Developer’s Guide

It took me a while to update the blog and get back on writing again due to the elaboration of this new material which became the Getting Started withx Oracle WebLogic Server 12c: Developer’s Guide book.  We started to write the book about 12c (12.0.0) which ended up being an not so stable  OK version and had to rapidly update to 12.0.1.

But then, when we’re about to finish the book the 12.0.2 release came out with a lot of improvements and we again updated all the material and included the revisions for the new installer, new type of WebLogic clusters and new JMS resources targeting, among other changes that this version had.

The book follows a tutorial approach, with step-by-step instructions and a lot of screenshots to make it very easy for new users and people that are really starting with the product.

Hopefully I’m going to get back on publishing more frequently on the blog, but now “It’s all about GemFire“! ;)

Java EE 7 Tutorial has been released!

The Java EE 7 Tutorial  is live and on this edition we feature the following:

  • Examples for Java API for WebSocket (JSR 356), Java API for JSON Processing (JSR 353), Batch Applications for the Java Platform (JSR 352), and Concurrency Utilities for Java EE (JSR 236)  – Basically all new Java EE 7 technologies.

  • Examples for the updated specs like Java Message Service 2.0 (JSR 343), Java API for RESTful Web Services 2.0 (JSR 339), Java Servlet 3.1 (JSR 340), JavaServer Faces 2.2 (JSR 344), and Contexts and Dependency Injection for Java EE 1.1 (JSR 346).

  •  The examples now are Maven based, so you can build, package and deploy to GlassFish 4.0 through Maven thanks to the brand new Cargo plugin.

Check it out!