Thomas Dudziak's Blog

Storm & Esper

with 24 comments

At work, we recently started using Esper for realtime analytics, and so far we quite like Esper. It is a great tool at what it does – running queries continuously over data. The problem however then becomes how to get data into Esper. The recently released Storm could be one way to do that, so I got curios and started playing around with it to see if it could be made to work with Esper. And it turns out, the integration is straightforward.

Some Storm basics

Storm has three basic concepts that are relevant in this context: streams, spouts, and bolts. At the core, Storm facilitates data transfer between spouts and bolts using streams of tuples.

Spouts are the basic data emitters, typically retrieving the data from outside of the Storm cluster. A simple example of this would be a spout that retrieves the tweet stream via the Twitter API and emits the tweets as a stream into the Storm cluster.

Bolts are data processors that receive one or more streams as input and potentially also emit (processed) data on one or more streams. In the twitter example, one could for instance imagine bolts that count the number of tweets per second, or detect the language of the tweet and reemit the tweets into per-language streams.

The data in the streams has a simple tuple form consisting of a fixed number of named values called fields. Storm does not care about the data types of the individual fields in the tuple as long as they can be serialized to the wire format (which is Thrift), whether via serializers provided by Storm or custom ones. Spouts and bolts need to declare the number of fields and their names for each of the tuples they are going to emit as part of the initial setup of the topology. This also means that the number of fields and their names are fixed for the duration of a Storm ‘session’.

Some Eper basics

Esper is, and I’m simplifying things quite a bit here, a processing engine for data streams that uses queries run on the data streams to processes them. Think of it as a way to run SQL-like queries on data that streams by. The queries run continuously and thus have a time or amount-of-data aspect to them. Continuing the twitter example from above, if we consider the never-ending stream of tweets as the data stream that Esper works with, then an Esper query could for instance return the number of tweets per second like so:

select count(*) as tps from Twitter.win:time_batch(1 sec)

The time_batch part in this example will direct Esper to apply the count function on 1-sec batches of events.

Esper data streams consist of structured data called events. The types of these events can be POJOs, maps, and other things. Events are typically registered with Esper in advance before submitting a query to Esper. This means that you have to tell Esper about which kind of event type you give it (java class, map, …) and which properties the event type has. For java classes, Esper can figure that out itself but for map events you need to tell Esper explicitly about the possible keys and the value data types. Fortunately, Esper is forgiving when it comes to the data types. You can tell it that you’ll give it Objects, and it will happily accept numbers in the actual data stream and perform numeric operations on them.

How to combine the two

Storm’s tuples are quite similar to Esper’s map event types. The tuple field names map naturally to map keys and the field values to values for these keys. The tuple fields are not typed when they are defined, but that does not pose a big problem for us as we can simply tell Esper that they are of type Object. In addition, the fact that tuples have to be defined before a topology is run, makes it relatively easy for us to define the map event type in the setup phase.

I am going to use the twitter stream example from the storm-starter project to show how you can use Esper to count the number of tweets per second and also find the maximum number of retweets per 1 second interval. This is probably not of great practical use, but will show off some aspects of the Storm – Esper integration.

An up-to-date version of this code is available on GitHub.

Let’s get started with the twitter spout, a slightly adapted version of the one from the storm-starter project:

public class TwitterSpout implements IRichSpout, StatusListener {
    private static final long serialVersionUID = 1L;

    private final String username;
    private final String pwd;
    private transient BlockingQueue<Status> queue;
    private transient SpoutOutputCollector collector;
    private transient TwitterStream twitterStream;

    public TwitterSpout(String username, String pwd) {
        this.username = username;
        this.pwd = pwd;
    }

    @Override
    public boolean isDistributed() {
        return false;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("createdAt", "retweetCount"));
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.queue = new ArrayBlockingQueue<Status>(1000);
        this.collector = collector;

        Configuration twitterConf = new ConfigurationBuilder().setUser(username)
                                                              .setPassword(pwd)
                                                              .build();
        TwitterStreamFactory fact = new TwitterStreamFactory(twitterConf);

        twitterStream = fact.getInstance();
        twitterStream.addListener(this);
        twitterStream.sample();
    }

    @Override
    public void onStatus(Status status) {
        queue.offer(status);
    }

    @Override
    public void nextTuple() {
        Status value = queue.poll();
        if (value == null) {
            Utils.sleep(50);
        }
        else {
            collector.emit(tuple(value.getCreatedAt().getTime(),
                                 value.getRetweetCount()));
        }
    }

    @Override
    public void close() {
        twitterStream.shutdown();
    }

    @Override
    public void ack(Object arg0) {}
    @Override
    public void fail(Object arg0) {}
    @Override
    public void onException(Exception ex) {}
    @Override
    public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
    @Override
    public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
    @Override
    public void onScrubGeo(long userId, long upToStatusId) {}
}

This defines a spout that emits a single stream of tuples with two fields, createdAt (a timestamp) and retweetCount (an integer).

You’ll notice that aside from the twitter username and password, all fields in the spout are marked as transient, and initialized in the open method. The reason for this is that Storm requires spouts and bolts to be serializable so it can move them to some node in the Storm cluster before starting the topology.

The Esper bolt itself is generic. You pass it esper statements and the names of the output fields which will be generated by these esper statements. The adapted main method for our Twitter example looks like this:

public static void main(String[] args) {
    final String username = args[0];
    final String pwd = args[1];

    TopologyBuilder builder = new TopologyBuilder();
    TwitterSpout spout = new TwitterSpout(username, pwd);
    EsperBolt bolt = new EsperBolt(
        new Fields("tps", "maxRetweets"),
        "select count(*) as tps, max(retweetCount) as maxRetweets from Storm.win:time_batch(1 sec)");

    builder.setSpout(1, spout);
    builder.setBolt(2, bolt).shuffleGrouping(1);

    Config conf = new Config();
    conf.setDebug(true);

    LocalCluster cluster = new LocalCluster();

    cluster.submitTopology("test", conf, builder.createTopology());
    Utils.sleep(10000);
    cluster.shutdown();
}

Note how the Esper statement returns tps and maxRetweets which are also declared as the two output fields for the bolt.

The bolt code itself (see a version of this that is kept up-to-date with Storm here) consists of three pieces. The setup part constructs map event types for each input stream and registers them with Esper (I omitted the Esper setup code):

private void setupEventTypes(TopologyContext context, Configuration configuration) {
    Set<GlobalStreamId> sourceIds = context.getThisSources().keySet();
    singleEventType = (sourceIds.size() == 1);

    for (GlobalStreamId id : sourceIds) {
        Map<String, Object> props = new LinkedHashMap<String, Object>();

        setupEventTypeProperties(
            context.getComponentOutputFields(id.get_componentId(),
                                             id.get_streamId()),
                                             props);
        configuration.addEventType(getEventTypeName(id.get_componentId(),
                                                    id.get_streamId()),
                                                    props);
    }
}

private String getEventTypeName(int componentId, int streamId) {
    if (singleEventType) {
        return "Storm";
    }
    else {
        return String.format("Storm_%d_%d", componentId, streamId);
    }
}

private void setupEventTypeProperties(Fields fields, Map<String, Object> properties){
    int numFields = fields.size();

    for (int idx = 0; idx < numFields; idx++) {
        properties.put(fields.get(idx), Object.class);
    }
}

The field-to-property mapping is straightforward. It simply registers properties of type Object using the field names in the event type corresponding to the input stream. If the bolt only has a single input stream, then it registers a single event type called Storm. For multiple types, it uses the component id (id of the spout or bolt that the data comes from) and the stream id (spouts and bolts can emit multiple streams) to generate a name Storm_{component id}_{stream id}.

The second part is the transfer of data from Storm to Esper:

@Override
public void execute(Tuple tuple) {
    String eventType = getEventTypeName(tuple.getSourceComponent(),
                                        tuple.getSourceStreamId());
    Map<String, Object> data = new HashMap<String, Object>();
    Fields fields = tuple.getFields();
    int numFields = fields.size();

    for (int idx = 0; idx < numFields; idx++) {
        String name = fields.get(idx);
        Object value = tuple.getValue(idx);

        data.put(name, value);
    }

    runtime.sendEvent(data, eventType);
}

This method is called by Storm whenever a tuple from any of the connected streams is sent to the bolt. The code therefore first has to find the event type name corresponding to the tuple. Then it iterates over the fields in the tuple and puts the values into a map using the field names as the keys. Finally, it passes that map to Esper.

At this moment, Esper will route this map (the event) through the statements which in turn might produce new data that we need to hand back to Storm. For this purpose, the bolt registered itself as a listener for data emitted from any of the statements that we configured during the setup. Esper will then call back the update method on the bolt if one of the statements generated data. The update method will then basically perform the reverse operation of the execute method and convert the event data to a tuple:

@Override
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
    if (newEvents != null) {
        for (EventBean newEvent : newEvents) {
            collector.emit(toTuple(newEvent));
        }
    }
}

private List<Object> toTuple(EventBean event) {
    int numFields = outputFields.size();
    List<Object> tuple = new ArrayList<Object>(numFields);

    for (int idx = 0; idx < numFields; idx++) {
        tuple.add(event.get(outputFields.get(idx)));
    }
    return tuple;
}

Written by tomdzk

September 28, 2011 at 9:12 pm

Posted in computers

24 Responses

Subscribe to comments with RSS.

  1. Thanks for write up. Given your experiments, would you consider the storm + esper combination useful? Would it be better than a storm setup by itself or an esper setup by itself?

    Guest

    September 30, 2011 at 10:50 am

    • Esper on its own doesn’t help you with getting data into it, you’ll always have to combine it with something that does that. This is where Storm and similar systems come into play. Now, in Storm you can do everything that you can do in Esper, but especially for advanced analysis, this will require a lot of rather complicated code, that you’ll also have to optimize yourself. So the combination of Storm and Esper seems quite useful: Storm would handle the data transfer plus custom handling that can’t be expressed in Esper, and Esper would handle what it does best, processing the data.

      tomdzk

      September 30, 2011 at 11:13 am

  2. Can you give steps for how to compile this and run it.

    I have tried compiling using mvn compile it has taken a while to compile it fully trying to download lot of jar most of them not found but finally it said compiled successfully.

    Then I started running it
    I get in the directory ../tomdz-storm-esper-30e5db2/target

    and ran storm jar storm-esper-0.5.4.2-SNAPSHOT.jar storm.esper.EsperBolt

    but getting error EsperBolt main not found. Please give me steps to run it and test it.

    Thanks
    Manish

    Manish

    January 2, 2012 at 8:33 am

  3. Hi,
    I looked at the jar and found the EsperBolt would have main and packaged so I ran this
    Iris:/home/manish/storm-esper/tomdz-storm-esper-30e5db2/target/test-classes # storm jar storm-esper-0.5.4.2-SNAPSHOT.jar org.tomdz.storm.esper.EsperBolt Running: export STORM_JAR=storm-esper-0.5.4.2-SNAPSHOT.jar; java -client -Djava.library.path=/usr/local/lib:/usr/lib -cp /home/manish/storm-0.6.0/storm-0.6.0.jar:/home/manish/storm-0.6.0/lib/hiccup-0.3.6.jar:/home/manish/storm-0.6.0/lib/jetty-util-6.1.26.jar:/home/manish/storm-0.6.0/lib/commons-fileupload-1.2.1.jar:/home/manish/storm-0.6.0/lib/clj-time-0.3.0.jar:/home/manish/storm-0.6.0/lib/clout-0.4.1.jar:/home/manish/storm-0.6.0/lib/clojure-1.2.0.jar:/home/manish/storm-0.6.0/lib/slf4j-api-1.5.8.jar:/home/manish/storm-0.6.0/lib/ring-jetty-adapter-0.3.11.jar:/home/manish/storm-0.6.0/lib/tools.macro-0.1.0.jar:/home/manish/storm-0.6.0/lib/jetty-6.1.26.jar:/home/manish/storm-0.6.0/lib/compojure-0.6.4.jar:/home/manish/storm-0.6.0/lib/reflectasm-1.01.jar:/home/manish/storm-0.6.0/lib/servlet-api-2.5-20081211.jar:/home/manish/storm-0.6.0/lib/kryo-1.04.jar:/home/manish/storm-0.6.0/lib/core.incubator-0.1.0.jar:/home/manish/storm-0.6.0/lib/log4j-1.2.16.jar:/home/manish/storm-0.6.0/lib/jline-0.9.94.jar:/home/manish/storm-0.6.0/lib/commons-codec-1.3.jar:/home/manish/storm-0.6.0/lib/commons-lang-2.5.jar:/home/manish/storm-0.6.0/lib/commons-logging-1.1.1.jar:/home/manish/storm-0.6.0/lib/jvyaml-1.0.0.jar:/home/manish/storm-0.6.0/lib/ring-servlet-0.3.11.jar:/home/manish/storm-0.6.0/lib/jzmq-2.1.0.jar:/home/manish/storm-0.6.0/lib/asm-3.2.jar:/home/manish/storm-0.6.0/lib/slf4j-log4j12-1.5.8.jar:/home/manish/storm-0.6.0/lib/clojure-contrib-1.2.0.jar:/home/manish/storm-0.6.0/lib/carbonite-1.0.0.jar:/home/manish/storm-0.6.0/lib/minlog-1.2.jar:/home/manish/storm-0.6.0/lib/httpcore-4.0.1.jar:/home/manish/storm-0.6.0/lib/commons-io-1.4.jar:/home/manish/storm-0.6.0/lib/ring-core-0.3.10.jar:/home/manish/storm-0.6.0/lib/zookeeper-3.3.2.jar:/home/manish/storm-0.6.0/lib/joda-time-1.6.jar:/home/manish/storm-0.6.0/lib/json-simple-1.1.jar:/home/manish/storm-0.6.0/lib/servlet-api-2.5.jar:/home/manish/storm-0.6.0/lib/httpclient-4.0.1.jar:/home/manish/storm-0.6.0/lib/commons-exec-1.1.jar:/home/manish/storm-0.6.0/lib/libthrift7-0.7.0.jar:/home/manish/storm-0.6.0/lib/junit-3.8.1.jar:storm-esper-0.5.4.2-SNAPSHOT.jar:/root/.storm:/home/manish/storm-0.6.0/bin org.tomdz.storm.esper.EsperBolt
    Exception in thread “main” java.lang.NoClassDefFoundError: com/espertech/esper/client/UpdateListener
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:637)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
    at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
    at java.lang.ClassLoader.loadClassInternal(ClassLoader.java:336)
    Caused by: java.lang.ClassNotFoundException: com.espertech.esper.client.UpdateListener
    at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
    at java.lang.ClassLoader.loadClassInternal(ClassLoader.java:336)
    … 12 more
    Could not find the main class: org.tomdz.storm.esper.EsperBolt. Program will exit.

    Now it is giving error for Esper. How can I give esper jar too storm I have added esper jar in the classpath but not working.

    Please help..

    Regards,
    Manish

    Manish

    January 2, 2012 at 11:11 am

    • The library will only provide you with a bolt that you can then use in your Storm programs. It is not itself runnable.

      For info about Storm itself please see Storm’s Wiki, especially the “Read these first” section.

      tomdzk

      January 3, 2012 at 9:30 pm

    • I am facing the same issue. did you find any solution?

      oz

      April 3, 2013 at 5:02 am

  4. Thanks for this great article. I think you did quite well in explaining the benefits of both Storm and Esper. Further, how to combine them.

    Unfortunately, the links to the code are not working, e.g. for the EsperBolt.

    Not least, is there any ongoing work to overcome the limitations you mentioned for the EsperBolt.

    amorotti

    February 9, 2012 at 7:33 am

    • I’ve updated the blog post with proper links. The current version of this combined with Storm 0.7 should fix the mentioned limitations.

      tomdzk

      February 12, 2012 at 11:04 pm

  5. This is a combination we’re currently trying to use for an advanced fleet management analytics application (gps, telemetry, canbus data and temperature). We are rather at the beginning, but it’s nice to see that Storm is getting traction. Combining this with visualization and a graph database, we can do wonders!!! Thanks for sharing…

    mmemetea

    March 17, 2012 at 6:02 am

  6. Hi! Thanks for sharing.

    I’ve been looking at a Complex Event Processing problem where Esper would work, but it is unable to scale horizontally by adding servers.

    Your example here works because the Esper instances can look at the incoming tweets discretely. Are you able to combine the working memory of all the Esper instances, and use Storm to “merge” all the instances into one logical instance, that can look at the incoming data as one?

    mparaz

    March 20, 2012 at 8:47 pm

    • I have not tried that, sorry. However I’d think that since Esper bolts can emit a stream back into Storm, this should be possible as long as more than spout/bolt can emit tuples for the same stream. You’ll then have to split the work across the Esper instances in some fashion that makes sense to your problem (this is not that much different from how you would split a problem with map/reduce).

      tomdzk

      March 20, 2012 at 8:56 pm

      • Thank you for your inputs and your time!

        mparaz

        March 22, 2012 at 9:43 am

  7. Someone asked me to compare Storm and Esper. The analogy (or perhaps its a metaphor?) that I came up with is this one, from construction, I guess. Is that a good way to explain?

    With Storm you get the frames and doors. The frames and doors in a building make up and connect rooms thereby making a big or a small house, as needed.

    Perhaps Esper provides the stuff to put into your rooms, i.e. cabinets, appliances, etc.. Esper also provides stuff that you can put anywhere else, for example into a gym or outside.

    People that are building a house will find Storm and Esper very useful as they are complimentary products. I think there is zero overlap between Storm and Esper.

    Rainer G.

    March 22, 2012 at 1:23 pm

  8. Hi Tomdzk,

    Found a critical issue with Storm Esper library. It works fine on single Esper bolt but when I was trying to have multiple such bolts in my topology then only first bolt was getting called (rest didn’t get called).

    I have applied below patch to fix this issue and it works for me.
    Here is the issue with patch : https://github.com/tomdz/storm-esper/issues/1

    Mayur.

    Mayur Choubey

    April 3, 2012 at 8:40 am

  9. Esper Bolt works well on functions like max or count given in the examples.
    But when I am trying to use functions like Sum or Avg it is throwing following exception:

    com.espertech.esper.client.EPStatementException: Error starting statement: Implicit conversion from datatype ‘Object’ to numeric is not allowed for aggregation function ‘sum’ [select sum(hits) as totalHits from weblogs.win:time_batch(1 sec)]

    I have gone through the code and found that setupEventTypeProperties() method is putting Object.class for all fields. This could be the reason.

    Raised an issue for the same here : https://github.com/tomdz/storm-esper/issues/2

    Mayur Choubey

    April 4, 2012 at 12:09 am

  10. A very good and useful experiment. I have tried something similar using RabbitMQ and Java (to inject code into Esper) and the result has been useful for the job at hand; however, the concepts used here are simpler, extensible and I guess, inherently scalable. Many thanks for sharing.

  11. Hi, this integration looks very promising, thanks for sharing. On your Github project I’ve seen you can use maven to integrate the sotrm-esper library in a project. However adding in the pom.xml file the code you suggested didn’t work for me, on which repository is the project?

    Thanks in advance

    robl

    June 8, 2012 at 3:29 am

    • It’s in Sonatype’s OSS repository which should be synchronized with Maven central.

      tomdzk

      June 9, 2012 at 10:28 pm

      • Thanks for you reply. I tried including storm-esper in my pom.xml file, but it could not be found in the maven central. I also tried to look for storm-esper into the maven central web-site (http://search.maven.org/), but it seems not to be there… Should not it be?

        Best regards

        robl

        June 13, 2012 at 6:19 am

  12. First of all, thanks for sharing.

    Although, I am experiencing issues with the dependency as well.
    Could you please clarify how to integrate Storm-Esper in pom.xml?

    org.tomdz.storm
    storm-esper
    0.7.1-SNAPSHOT

    This does not work for me 😦
    I could not even find Storm-Esper via the maven central web-site…

    Mooose

    July 10, 2012 at 2:40 am

  13. Nice stuff, but I got a two questions:
    – Can this implementation be used for patterns as well? Like EPAdministrator.createPattern() ?
    – How can one apply an own UpdateListener to an EsperBolt?
    Any clarification is highly appreciated!

    Kumar

    July 11, 2012 at 9:18 am

  14. […] Two implementations of CEP getting a lot of buzz are Esper and Storm (Thomas Dudziak has written a good blog […]

  15. […] The lack of continuous query supporting time/length windows and temporal pattern matching seem to altogether missing in current versions of the S4 and Storm projects. Probably, it is due to their infancy and they will mature up to include such features in future. As of now, they only seem fit for pre-processing the event stream before passing it over to a CEP engine like ESPER. Their ability to do distributed processing (a la map-reduce mode) can help to speed up the pre-processing where events can be filtered off, or enriched via some lookup/computation etc. There have also been some attempts to integrate Storm with Esper (https://tomdzk.wordpress.com/2011/09/28/storm-esper/). […]


Leave a reply to tomdzk Cancel reply