Thomas Dudziak's Blog

Archive for the ‘computers’ Category

Storm & Esper

with 24 comments

At work, we recently started using Esper for realtime analytics, and so far we quite like Esper. It is a great tool at what it does – running queries continuously over data. The problem however then becomes how to get data into Esper. The recently released Storm could be one way to do that, so I got curios and started playing around with it to see if it could be made to work with Esper. And it turns out, the integration is straightforward.

Some Storm basics

Storm has three basic concepts that are relevant in this context: streams, spouts, and bolts. At the core, Storm facilitates data transfer between spouts and bolts using streams of tuples.

Spouts are the basic data emitters, typically retrieving the data from outside of the Storm cluster. A simple example of this would be a spout that retrieves the tweet stream via the Twitter API and emits the tweets as a stream into the Storm cluster.

Bolts are data processors that receive one or more streams as input and potentially also emit (processed) data on one or more streams. In the twitter example, one could for instance imagine bolts that count the number of tweets per second, or detect the language of the tweet and reemit the tweets into per-language streams.

The data in the streams has a simple tuple form consisting of a fixed number of named values called fields. Storm does not care about the data types of the individual fields in the tuple as long as they can be serialized to the wire format (which is Thrift), whether via serializers provided by Storm or custom ones. Spouts and bolts need to declare the number of fields and their names for each of the tuples they are going to emit as part of the initial setup of the topology. This also means that the number of fields and their names are fixed for the duration of a Storm ‘session’.

Some Eper basics

Esper is, and I’m simplifying things quite a bit here, a processing engine for data streams that uses queries run on the data streams to processes them. Think of it as a way to run SQL-like queries on data that streams by. The queries run continuously and thus have a time or amount-of-data aspect to them. Continuing the twitter example from above, if we consider the never-ending stream of tweets as the data stream that Esper works with, then an Esper query could for instance return the number of tweets per second like so:

select count(*) as tps from sec)

The time_batch part in this example will direct Esper to apply the count function on 1-sec batches of events.

Esper data streams consist of structured data called events. The types of these events can be POJOs, maps, and other things. Events are typically registered with Esper in advance before submitting a query to Esper. This means that you have to tell Esper about which kind of event type you give it (java class, map, …) and which properties the event type has. For java classes, Esper can figure that out itself but for map events you need to tell Esper explicitly about the possible keys and the value data types. Fortunately, Esper is forgiving when it comes to the data types. You can tell it that you’ll give it Objects, and it will happily accept numbers in the actual data stream and perform numeric operations on them.

How to combine the two

Storm’s tuples are quite similar to Esper’s map event types. The tuple field names map naturally to map keys and the field values to values for these keys. The tuple fields are not typed when they are defined, but that does not pose a big problem for us as we can simply tell Esper that they are of type Object. In addition, the fact that tuples have to be defined before a topology is run, makes it relatively easy for us to define the map event type in the setup phase.

I am going to use the twitter stream example from the storm-starter project to show how you can use Esper to count the number of tweets per second and also find the maximum number of retweets per 1 second interval. This is probably not of great practical use, but will show off some aspects of the Storm – Esper integration.

An up-to-date version of this code is available on GitHub.

Let’s get started with the twitter spout, a slightly adapted version of the one from the storm-starter project:

public class TwitterSpout implements IRichSpout, StatusListener {
    private static final long serialVersionUID = 1L;

    private final String username;
    private final String pwd;
    private transient BlockingQueue<Status> queue;
    private transient SpoutOutputCollector collector;
    private transient TwitterStream twitterStream;

    public TwitterSpout(String username, String pwd) {
        this.username = username;
        this.pwd = pwd;

    public boolean isDistributed() {
        return false;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("createdAt", "retweetCount"));

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.queue = new ArrayBlockingQueue<Status>(1000);
        this.collector = collector;

        Configuration twitterConf = new ConfigurationBuilder().setUser(username)
        TwitterStreamFactory fact = new TwitterStreamFactory(twitterConf);

        twitterStream = fact.getInstance();

    public void onStatus(Status status) {

    public void nextTuple() {
        Status value = queue.poll();
        if (value == null) {
        else {

    public void close() {

    public void ack(Object arg0) {}
    public void fail(Object arg0) {}
    public void onException(Exception ex) {}
    public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
    public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
    public void onScrubGeo(long userId, long upToStatusId) {}

This defines a spout that emits a single stream of tuples with two fields, createdAt (a timestamp) and retweetCount (an integer).

You’ll notice that aside from the twitter username and password, all fields in the spout are marked as transient, and initialized in the open method. The reason for this is that Storm requires spouts and bolts to be serializable so it can move them to some node in the Storm cluster before starting the topology.

The Esper bolt itself is generic. You pass it esper statements and the names of the output fields which will be generated by these esper statements. The adapted main method for our Twitter example looks like this:

public static void main(String[] args) {
    final String username = args[0];
    final String pwd = args[1];

    TopologyBuilder builder = new TopologyBuilder();
    TwitterSpout spout = new TwitterSpout(username, pwd);
    EsperBolt bolt = new EsperBolt(
        new Fields("tps", "maxRetweets"),
        "select count(*) as tps, max(retweetCount) as maxRetweets from sec)");

    builder.setSpout(1, spout);
    builder.setBolt(2, bolt).shuffleGrouping(1);

    Config conf = new Config();

    LocalCluster cluster = new LocalCluster();

    cluster.submitTopology("test", conf, builder.createTopology());

Note how the Esper statement returns tps and maxRetweets which are also declared as the two output fields for the bolt.

The bolt code itself (see a version of this that is kept up-to-date with Storm here) consists of three pieces. The setup part constructs map event types for each input stream and registers them with Esper (I omitted the Esper setup code):

private void setupEventTypes(TopologyContext context, Configuration configuration) {
    Set<GlobalStreamId> sourceIds = context.getThisSources().keySet();
    singleEventType = (sourceIds.size() == 1);

    for (GlobalStreamId id : sourceIds) {
        Map<String, Object> props = new LinkedHashMap<String, Object>();


private String getEventTypeName(int componentId, int streamId) {
    if (singleEventType) {
        return "Storm";
    else {
        return String.format("Storm_%d_%d", componentId, streamId);

private void setupEventTypeProperties(Fields fields, Map<String, Object> properties){
    int numFields = fields.size();

    for (int idx = 0; idx < numFields; idx++) {
        properties.put(fields.get(idx), Object.class);

The field-to-property mapping is straightforward. It simply registers properties of type Object using the field names in the event type corresponding to the input stream. If the bolt only has a single input stream, then it registers a single event type called Storm. For multiple types, it uses the component id (id of the spout or bolt that the data comes from) and the stream id (spouts and bolts can emit multiple streams) to generate a name Storm_{component id}_{stream id}.

The second part is the transfer of data from Storm to Esper:

public void execute(Tuple tuple) {
    String eventType = getEventTypeName(tuple.getSourceComponent(),
    Map<String, Object> data = new HashMap<String, Object>();
    Fields fields = tuple.getFields();
    int numFields = fields.size();

    for (int idx = 0; idx < numFields; idx++) {
        String name = fields.get(idx);
        Object value = tuple.getValue(idx);

        data.put(name, value);

    runtime.sendEvent(data, eventType);

This method is called by Storm whenever a tuple from any of the connected streams is sent to the bolt. The code therefore first has to find the event type name corresponding to the tuple. Then it iterates over the fields in the tuple and puts the values into a map using the field names as the keys. Finally, it passes that map to Esper.

At this moment, Esper will route this map (the event) through the statements which in turn might produce new data that we need to hand back to Storm. For this purpose, the bolt registered itself as a listener for data emitted from any of the statements that we configured during the setup. Esper will then call back the update method on the bolt if one of the statements generated data. The update method will then basically perform the reverse operation of the execute method and convert the event data to a tuple:

public void update(EventBean[] newEvents, EventBean[] oldEvents) {
    if (newEvents != null) {
        for (EventBean newEvent : newEvents) {

private List<Object> toTuple(EventBean event) {
    int numFields = outputFields.size();
    List<Object> tuple = new ArrayList<Object>(numFields);

    for (int idx = 0; idx < numFields; idx++) {
    return tuple;

Written by tomdzk

September 28, 2011 at 9:12 pm

Posted in computers

Setting up reconnoiter on Ubuntu (Karmic and newer)

with 9 comments

After it took me about 2 days to figure out how to setup reconnoiter, I figured, it would be nice to document the steps so that it will be much easier for other people.

Note: This guide was written for Karmic Koala (9.10) and Lucid Lynx (10.04). It should generally work for Jaunty, too, as well as other Linux distributions (minus the package manager instructions obviously).

Note: This guide has been updated to reconnoiter trunk revision 1404.

Before we begin, here are some useful links:

Reconnoiter home page:

Reconnoiter docs:

Oscon demo:

1. Build it

First, let’s install a bunch of things. In the Synaptic Package Manager under Settings -> Repositories -> Other Software enable the two entries for the partner repositories. Then

sudo apt-get install autoconf build-essential libtool gettext \
  zlib1g-dev uuid-dev libpcre3-dev libssl-dev libpq-dev \
  libxml2-dev libxslt-dev libapr1-dev libaprutil1-dev xsltproc \
  libncurses5-dev libssh2-1-dev libsnmp-dev libmysqlclient-dev \
  subversion sun-java6-jdk 

Now we check out reconnoiter from subversion and build it:

svn co reconnoiter
cd reconnoiter
sudo mkdir -p /usr/local/java/libmake
sudo make install

2. Setup the DB

We need PostgreSQL 8.4 server & client. On Karmic you get that via

sudo apt-get install postgresql postgresql-client

For Jaunty, follow the steps here.

Next, make sure that the postgresql config file allows local access without password. Edit the /etc/postgresql/8.4/main/pg_hba.conf to change the local entry to use “trust”:

local   all         all                               trust

After that, restart the postgresql server:

sudo /etc/init.d/postgresql-8.4 restart

Now log in into postgresql:

sudo su postgres
cd sql

Within psql do

\i scaffolding.sql

3. Setup cron

First, we need to change the crontab to point to where postgresql is actually installed:

sed -i 's/\/opt\/psql835/usr/g' sql/crontab
sudo su postgres
cd sql

We also need to run the commands in the crontab at least once manually as they will initialize certain database structures. As the postgres user:

eval "`cat crontab | cut -d' ' -f6- | grep -v ^$ | awk '{print $0\";\"}'`"

Finally, and still as user postgres do

crontab crontab

4. Setup the web ui

For configuring the web UI (PHP), we first need Apache2 and PHP:

sudo apt-get install apache2 libapache2-mod-php5 php5-pgsql

This will also enable mod_php5. Every other required module (mod_mime, mod_lib_config, mod_rewrite, mod_proxy, mod_proxy_http, mod_authz_host) should be already enabled or even compiled in the server (apache2 -l will show). To make sure that they are enabled, simply do

sudo a2enmod mime
sudo a2enmod rewrite
sudo a2enmod proxy
sudo a2enmod proxy_http
sudo a2enmod authz_host

Next, we need the apache configuration, either as a a new file /etc/apache2/sites-available/reconnoiter that then should be symlinked into /etc/apache2/sites-enabled, or in the current configuration (e.g. /etc/apache2/sites-enabled/000-default). A sample configuration to setup reconnoiter on port 80:

<VirtualHost *:80>
  ServerAdmin webmaster@localhost
  DocumentRoot @ROOT@/ui/web/htdocs

  <Directory "/">
      Options None
      AllowOverride None
      Order allow,deny
      Deny from all
  <FilesMatch "^\.ht">
      Order allow,deny
      Deny from all
      Satisfy All
  <Directory "@ROOT@/ui/web/htdocs/">
      php_value include_path @ROOT@/ui/web/lib
      php_value short_open_tag off
      Options FollowSymLinks Indexes
      AllowOverride All
      Order deny,allow
      Allow from all

  LogLevel warn
  LogFormat "%h %l %u %t \"%r\" %>s %b" common

  ErrorLog @ROOT@/ui/web/logs/error_log
  CustomLog @ROOT@/ui/web/logs/access_log common

  AddType application/x-compress .Z
  AddType application/x-gzip .gz .tgz
  AddType application/x-httpd-php .php
  DefaultType text/plain

Replace @ROOT@ with the directory where you have installed reconnoiter.

If you chose to add reconnoiter to the Apache config on a different port than 80, say 9090, then you will also have to change Apache’s port configuration in /etc/apache2/ports.conf by adding:

NameVirtualHost *:9090
Listen 9090

Then restart apache:

sudo /etc/init.d/apache2 restart

5. Generate test certificates

These steps show how to generate test certificates. In a production environment you would of course use a real CA.

Create/go to a temporary directory:

mkdir ssh-keys
cd ssh-keys

Next create a file openssl.cnf file in it with this contents:

HOME = .

oid_section = new_oids

[ new_oids ]

[ ca ]
default_ca = CA_default

[ CA_default ]
dir = ./testCA
certs = $dir/certs
crl_dir = $dir/crl
database = $dir/index.txt
new_certs_dir = $dir/newcerts
certificate = $dir/cacert.pem
serial = $dir/serial
crl = $dir/crl.pem
private_key = $dir/private/cakey.pem
RANDFILE = $dir/private/.rand
x509_extensions = usr_cert
name_opt = ca_default
cert_opt = ca_default
default_days = 365
default_crl_days = 30
default_md = md5
preserve = no
policy = policy_match

[ policy_match ]
countryName = match
stateOrProvinceName = match
organizationName = match
organizationalUnitName = optional
commonName = supplied
emailAddress = optional

[ policy_anything ]
countryName = optional
stateOrProvinceName = optional
localityName = optional
organizationName = optional
organizationalUnitName	= optional
commonName	 = supplied
emailAddress = optional

[ req ]
default_bits = 1024
default_keyfile = privkey.pem
distinguished_name = req_distinguished_name
attributes = req_attributes
x509_extensions = v3_ca
string_mask = nombstr

[ req_distinguished_name ]
countryName = Country Name (2 letter code)
countryName_default = AU
countryName_min = 2
countryName_max = 2
stateOrProvinceName = State or Province Name (full name)
stateOrProvinceName_default = Some-State
localityName = Locality Name (eg, city)
0.organizationName = Organization Name (eg, company)
0.organizationName_default = Internet Widgits Pty Ltd
organizationalUnitName = Organizational Unit Name (eg, section)
commonName = Common Name (eg, YOUR name)
commonName_max = 64
emailAddress = Email Address
emailAddress_max = 64

[ req_attributes ]
challengePassword = A challenge password
challengePassword_min	= 4
challengePassword_max = 20
unstructuredName = An optional company name

[ usr_cert ]
basicConstraints = CA:FALSE
nsComment = "OpenSSL Generated Certificate"
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid,issuer:always

[ v3_req ]
basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment

[ v3_ca ]
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid:always,issuer:always
basicConstraints = CA:true

[ crl_ext ]
authorityKeyIdentifier = keyid:always,issuer:always

[ proxy_cert_ext ]
basicConstraints = CA:FALSE
nsComment = "OpenSSL Generated Certificate"
subjectKeyIdentifier = hash
authorityKeyIdentifier  = keyid,issuer:always
proxyCertInfo = critical,language:id-ppl-anyLanguage,pathlen:3,policy:foo

Next execute these commands:

mkdir testCA
touch testCA/index.txt
test -f testCA/serial || echo 00 > testCA/serial

# CA
openssl genrsa -out test-ca.key
openssl req -key test-ca.key -days 365 \
    -new -out test-ca.csr -config openssl.cnf \
    -subj "/C=US/ST=California/O=Ning Inc./CN=Reconnoiter Test CA"
openssl x509 -req -in test-ca.csr -signkey test-ca.key \
    -out test-ca.crt

# noit
openssl genrsa -out test-noit.key
openssl req -key test-noit.key -days 365 \
    -new -out test-noit.csr -config openssl.cnf \
    -subj "/C=US/ST=California/O=Ning Inc./CN=noit-test"
openssl ca -batch -config openssl.cnf \
    -in test-noit.csr -out test-noit.crt \
    -outdir . -keyfile test-ca.key -cert test-ca.crt -days 120

# stratcon
openssl genrsa -out test-stratcon.key
openssl req -key test-stratcon.key -days 365 \
    -new -out test-stratcon.csr -config openssl.cnf \
    -subj "/C=US/ST=California/O=Ning Inc./CN=stratcon"
openssl ca -batch -config openssl.cnf \
    -in test-stratcon.csr -out test-stratcon.crt \
    -outdir . -keyfile test-ca.key -cert test-ca.crt -days 120

This will create a bunch of .pem, .crt, .csr, and .key files, that you should copy to /usr/local/etc:

sudo cp *.pem *.crt *.csr *.key /usr/local/etc

6. Setup a noit daemon

Generate the config:

sudo cp src/noit.conf /usr/local/etc/

Now you can edit that file to your heart’s content. Some things to note

  • Comment out/remove sections as necessary, or make sure that they point to existing machines.
  • For every new item, create a new uuid using the uuidgen tool was installed earlier.
  • Update the sslconfig section to use the test certificates:
  • For snmp entries, make sure you have the community set correctly (see

Finally start the noit daemon:

sudo /usr/local/sbin/noitd -c /usr/local/etc/noit.conf -D

The -D option is for debugging purposes. It will tell noitd to run in the foreground and log everything to stdout/stderr. You also might want to tweak the logging settings in the configuration file. Turn the debug logging by changing this line near the top of the config file:

<log name="debug" disabled="true"/>


<log name="debug"/>

Then switch whichever specific modules you want debug logging for. E.g. for snmp debug logging change this line further down in the config file:

<log name="debug/snmp" disabled="true"/>


<log name="debug/snmp"/>

7. Setup a stratcon daemon

Again, create the config file using the sample config file:

sudo cp src/stratcon.conf /usr/local/etc/

Edit as necessary:

  • Logging is configured in the same way as for noit above.
  • Set the password in the database config section to stratcon (or whatever you chose in the scaffolding.sql above).
  • For each noitd instance there needs to be a noitd section.
  • Configure the listeners section, esp. the port (should be an unused one), the hostname and document_domain.
  • Update the sslconfig sections (there is two of them, one in the noits section and one in the listeners section) to use the test certificates:

Finally start the stratcon daemon:

sudo /usr/local/sbin/stratcond -c /usr/local/etc/stratcon.conf -D

Again, the -D option is for debugging. You can tweak the logging settings in pretty much the same was as for noitd.

8. Verification

In your browser (note that the UI doesn’t quite work in Chrome), go to http://localhost:9090. The reconnoiter UI should appear. On the left side click the + next to “Graph Controls” and then on “Browse Data”. The data that you configured for noitd above should show up, though it might take a few minutes between starting noitd and the first data showing up.

Relevant logs are:

  • /var/log/postgresql/postgresql-8.4-main.log
  • /tmp/rollup.log – the log created by the cron rollup job
  • /var/log/syslog
  • @ROOT@/ui/web/logs/error_log and @ROOT@/ui/web/logs/access_log

Written by tomdzk

November 24, 2009 at 4:35 pm

Posted in computers