User Tools

Site Tools



This page walks through a sample usage of the org.gramar.storm.gramar gramar. The specifics of the topology to be implemented aren't all that important. Instead we'll focus on how to use the gramar to generate almost all of the topology implementation.


This walkthrough was run on a new Ubuntu VMware image with some additional installed software:

  • Eclipse was downloaded and unzipped
  • From Eclipse, Help –> Install New Software to install
  • In Window –> Preferences –> Java –> Installed JRE's –> Execution Environment select the installed JDK o be used for Java 1.8
  • From a terminal, use sudo apt-get to install the graphviz package. Technically it's not required, but you need graphviz to produce the topology diagrams and the diagrams will be a key success factor in a real project.
    • For a real project, I'd also install git and maven

The Model

We start with an empty Eclipse workspace (except for a general project named “Sandbox”). We New → Other… → Gramar → Sample Model and select the sample model for the gramar:

And we get this skeletal sample that we need to fill in:


    <!--  label : A multi-token displayable name for the topology -->
    <!--  mavenGroupId : The groupId for the maven artifacts to be created -->
    <!--  basePackage : The name prefix for all Java packages -->
    <!--  provider : The name of the person, prganization or company that owns this transformation tool -->
    <!--  stormProject : Optional, the name of the maven project containing the storm implementation -->
    <!--  allProject : Optional, the name of the maven module project  -->
    <topology  label=""  mavenGroupId=""  basePackage=""  provider=""  title=""  >

        <!--  label : A multi-token, displayable name for the bolt -->
        <!--  instances : The number of instances of this bolt to be constructed at runtime -->
        <bolt  label=""  instances="">

            <!--  stream : The single-token name of a stream emitted by the bolt -->
            <!--  type : The name of a defined type that describes the shape of the fields on the stream -->
            <emits  stream=""  type="">


            <!--  stream : The name of a stream read by the bolt -->
            <!--  grouping : One of "fields", "shuffle", "global", "all", "local", "localOrShuffle", "none", "partialKey" (custom not yet supported) -->
            <reads  stream=""  grouping=""  >

                <!--  name : The name of the fields whose value is used to direct tuples on this stream (for fields and partialKey groupings) -->
                <fieldRef  name="">



            <!--  from : The id of an other component -->
            <in  from="" />

            <!--  to : The id of an other component -->
            <out  to="" />


        <!--  label : A multi-token, displayable name for the spout -->
        <!--  instances : The number of instances of this bolt to be constructed at runtime -->
        <spout  label=""  instances="">

            <!--  type : The name of a defined type that describes the shape of the fields on the stream -->
            <!--  idType : The type of the message ID for reliable emits.  Defaults to the type. -->
            <!--  stream : The unique name of the stream -->
            <emits  type=""  stream="">



        <!--  name : The capitalized, single-token name of the type -->
        <type  name="">

            <!--  name : The single-token name of the property -->
            <!--  type : One of String, Integer, Long, Double, Boolean, Date -->
            <!--  list : Boolean value indicating whether this field is really a list of the given type -->
            <field  name=""  type=""  list="false" >



        <!--  name : The single-token name of the environment -->
        <!--  runLocally : Indicates whether execution is in a LocalCluster (true) or full cluster (false) -->
        <!--  default : True only if if the target environment is the default environment if none is specified during execution -->
        <environment  name=""  runLocally=""  default="">


        <!--  id : The unique id of the component  -->
        <!--  kind : one of "kafka"                 -->
        <other  id=""   kind=""  style="shape=box, fillcolor=skyblue1" lineStyle="penwidth=3, style=dashed, color=slateblue1" />



Some observations:

  • You can have as many spouts as you like. Simply add a spout stanza for each spout in the topology.
  • You can have as many bolts as you like. Simply add a bolt stanza for each bolt in the topology.
  • Spouts and bolts may emit onto as many streams as you like, but each stream should only be emitted by a single spout or bolt and must be uniquely named. You can have multiple components emitting onto the same stream, but that turns out to be bad form, and makes for more difficult debugging and performance tuning (even without Gramar).
  • Similarly, you should have only one bolt read from a given stream, although you can specify multiple bolts that read from a given stream.
  • Types are used to describe the field names, types and order on a stream
  • When you specify that a bolt reads from a stream, you also specify the grouping. If you specify a grouping of “fields”, you can specify one or more field names from the list of names defined on the type of the stream. For example, the Perform bolt reads from the parsedMessages and correctedErrors streams, each with a grouping of “fields”. Since both streams have a specified type of ParsedMessage, you could have specified any combination of the “id”, “action” and “arg” fields.
  • The environment element defines an environment in which the topology can run. Each defined environment will have its own property file and the topology can be run either on a local cluster or on a distributed cluster.

At this point in the process we need to stop to think about what we want to implement.

In this hypothetical example, we'll be reading XML messages from some middleware API in a destructive manner, meaning we need to persist or archive the messages we read as soon as possible. Once we've archived the message (which will just consist of a single, complex XML document in string form) we need to parse the message to extract an action, entity ID and possibly some number of arguments. Those values need to be processed in some appropriate way. Finally, for any message that doesn't parse properly, we have some logic that might be able to correct the parse error in which case we'll send the corrected parse results on to be processed. Otherwise we'll report the error.

The topology above can be implemented with a spout and five bolts. The spout will emit single-value tuples onto a stream named “rawMessages” which, in turn, is read by an Archiver bolt. Once the Archiver logic archives the message, the bolt will emit a two-value tuple (the original payload and a persistance location URI) onto a stream named “archivedMessages”. A Parser bolt reads tuples from stream archiverMessages and attempts to parse them into an id, an action and some number of additional arguments. If successful, the bolt with emit those values onto a “parsedMessages” stream. Otherwise it emits the original tuple to a “problemMessages” stream. A Correction bolt reads from the problemMessages stream and tries to correct the parse problem. If it's successful it emits an id, action and arguments to a “correctedErrors stream but otherwise emits the original message and an error message to an “uncorrectableErrors” stream where they're read by a Reporter bolt. Finally a Perform bolt reads from both the parsedMessages and correctedErrors streams to process the message.

We can capture the above flow, both process and data, in the model file we just created. The finished model would look as follows.


    <topology  label="Ingest"  mavenGroupId="org.gramar"  basePackage="org.gramar.ingest"  provider="" >

        <spout  label="API Reader"  instances="3">
            <emits  type="RawMessage"  stream="rawMessages" />

        <bolt  label="Archiver"  instances="1">
            <reads  stream="rawMessages"  grouping="shuffle"  />
            <emits  type="ArchivedMessage"  stream="archivedMessages" />

        <bolt  label="Parser"  instances="10">
            <reads  stream="archivedMessages"  	grouping="shuffle"  />
            <emits  stream="parsedMessages"  	type="ParsedMessage" />
            <emits  stream="problemMessages"  	type="ArchivedMessage" />

        <bolt  label="Correction"  instances="3">
            <reads  stream="problemMessages"  		grouping="shuffle"  />
            <emits  stream="uncorrectableErrors"  	type="ParseError" />
            <emits  stream="correctedErrors"  		type="ParsedMessage" />

        <bolt  label="Error Reported"  instances="1">
            <reads  stream="uncorrectableErrors"  grouping="shuffle"  />

        <bolt  label="Perform"  instances="20">
            <reads  stream="parsedMessages"  grouping="fields"  >
                <fieldRef  name="id" />
            <reads  stream="correctedErrors"  grouping="fields"  >
                <fieldRef  name="id" />

        <type  name="RawMessage">
            <field  name="payload"  type="String"  list="false"  />

        <type  name="ArchivedMessage">
            <field  name="payload"  type="String"  list="false"  />
            <field  name="uri"      type="String"  list="false"  />

        <type  name="ParsedMessage">
            <field  name="id"  		type="String"  list="false"  />
            <field  name="action"  	type="String"  list="false"  />
            <field  name="arg"  	type="String"  list="true"  />

        <type  name="ParseError">
            <field  name="message"  type="ArchivedMessage"  list="false"  />
            <field  name="error"  	type="String"  list="false"  />

        <environment  name="local"  runLocally="true"   default="true" />
        <environment  name="qa"     runLocally="false"  default="false" />
        <environment  name="prod"   runLocally="false"  default="false" />




Now that the model is complete, we'll generate the 98% or so of the topology implementation that's just boilerplate. We select the model file in the Navigator (or Java Explorer) view, right-mouse-button click and select menu item Apply Gramar… to get the gramar application dialog:

At this point the dialog has already analyzed the model, has decided that it looks most like a model for the “Complete Storm Topology” gramar and has selected that gramar as the recommended gramar to apply to the model. If you want to apply another gramar instead you can change the selection, but we don't and we click on the Apply button.

When the generation is complete we get a informational dialog message:

Note that the model had been referenced 8281 times. That means that if you were to have written the generated code by hand you would have had to refer to the requirements 8281 times while typing out the code and you would have had to type at least 8281 code snippets correctly. This gramar did that same work for you in seconds.

Let's take a look at the Eclipse resources (projects, folders, Java, properties files, maven poms, etc.) that were just created by the gramar. Note that at this point you sometimes have to tell Eclipse to configure the projects as maven projects if Eclipse doesn't do it automatically.

You see from this screen shot that the gramar has created two projects. One is a maven-ized Java project containing most of the final Storm implementation and the other project is there for advanced build needs. We'll focus on the ingest-storm project, which gets its name from the @label attribute of the topology element in our model. If we have used a label value of “Fred”, for example, the storm project would be named “fred-storm”.

Topology Diagram

An interesting file you should be using early and often is the IngestTopologySummary file. It contains source for a graphviz diagram that documents the topology. Open a terminal, go to the root folder in this project and run the script (generated by the gramar):

Refresh Eclipse and open IngestTopologySummary.png:

This diagram reflects the topology that we defined in the model. The other diagram shows that same thing, but also annotates the shape of the data on each of the streams:


Recall that we defined types to describe the fields (names, types and order) in each stream. For each of those types, the gramar generates a Java bean in the .bean package:

The generated bolts and spouts will emit actual Values objects to the streams and will execute actual Tuple objects from those streams, but the business logic that we'll be expected to write will work with these bean objects. The class that marshals and unmarshals these beans from Tuples to beans to JSON to Values and back to beans is the Marshaller class in the .util package.

Bolts and Spouts

The generated logic for bolts and spouts is factored into Storm-aware code in the actual bolt and spout classes (in the .bolt and .spout packages, respectively) and helper classes containing business logic in the .logic package.

Other observations include:

  • For each bolt and spout, there is a generated interface that defines the bolt or spout behavior that might be needed by the business logic.
  • Each interface will have an ack() and fail() method, although their use for bolts and spouts is, of course, different.
  • Each interface will define emit methods for each stream onto which the spout or bolt emits
  • Each bolt helper class has a method for each stream from which the bolt reads. Each of those methods takes the type of bean that defines the stream tuple shape and a reference to the interface for the bolt. Use the interface reference to emit tuples, to ack and to fail.
  • Each spout helper class has a nextTuples() method that takes a reference to the spout's interface. Use this interface to emit tuples, either with a message ID for reliable topologies or without a message ID for unreliable topologies.

Adding Your Business Logic

Let's take a look at how you'd modify a spout helper class to add your business logic.

An important concept with gramars in general and this gramar in particular, is that there are portions of the generated code that you can modify such that if you re-apply the gramar to a modified model those changes you make will be preserved. The general pattern is that pairs of begin/end comments delimit the regions whose contents are kept on a gramar re-apply. For example, the declaration section below falls within a begin/end comment block, indicating that if we had declarations for this class that those declarations will be kept when we re-apply the gramar. And we will. Many times.

		// Begin declarations
	private static final long serialVersionUID = 1L;

        private static final Logger log = Logger.getLogger(APIReaderSpoutLogic.class);
        private boolean written = false;

          // This is added, but not set.  We'll assign this variable a value in the open() method
        private MyMiddleWareClient client;
		// End declarations 

The open() method is passed the Config, TopologyContext and a reference to the actual spout. As generated, the Topology driver class (in the .topology package) reads the entire property file and moves all of the key/value pairs there into the Config. Use these properties to construct any variables you'll need for the running of the spout. Note that there's a close() method, too, if you have shut-down logic. Remember to keep all of your code between the begin/end comment pairs.

    public void open(Map map, TopologyContext topologyContext, IAPIReaderSpout spout) {

			// Begin open() logic 
			// End open() logic 


    public void close(IAPIReaderSpout spout) {

			// Begin close() logic 

			// End close() logic 


The nextTuple() method is where you put your code business logic. In the example below, we've added three lines of code to construct an instance of RawMessage (the type which defines the shape of stream rawMessages) and then to emit that bean onto the rawMessages stream. Generated logic in the spout class with marshal the fields in the RawMessage object into a Values object and then emit that values object to the stream.

Note the generated emit() message for the stream. There will be an explicitle names emit() message for each stream that the spout is defined to have.

Note also the other spout lifecycle methods below.

    public void nextTuple(final IAPIReaderSpout spout) {

			// Begin nextTuple() logic 
        try {

				// emit a tuple

            String payload = "....";  // Get this value from the middleware
            RawMessage rawMessage = new RawMessage(payload);
            spout.emitToRawMessages(rawMessage, rawMessage);

        } catch (Exception e) {
       		log.error("APIReaderSpoutLogic nextTuple() error: "+ e.toString());

			// End nextTuple() logic 


    public void open(Map map, TopologyContext topologyContext, IAPIReaderSpout spout) {

			// Begin open() logic 
			// End open() logic 


    public void close(IAPIReaderSpout spout) {

			// Begin close() logic 

			// End close() logic 


    public void activate(IAPIReaderSpout spout) {

			// Begin activate() logic 

			// End activate() logic 


    public void deactivate(IAPIReaderSpout spout) {

			// Begin deactivate() logic 

			// End deactivate() logic 


    public void ack(Object o, IAPIReaderSpout spout) {

			// Begin ack() logic 

			// End ack() logic 


Evolving the Topology

When you write a Storm topology it will evolve. You'll find yourself adding, removing, merging or splitting spouts and bolts. The streams will change, too. You'll add and remove streams, change their source or destination and change their fields and their formats. Whenever you need to evolve your topology, you can simply update the original model to which you applied the gramar and apply the gramar again.

As long as you keep your code changes between the begin/end comment pairs you can make most of the above changes easily while keeping your business logic in place. There are some special cases where you do have to be careful:

  • If you change the name of a stream, a new method will be generated in the receiving bolt's helper class. Be sure to save the previous read method on the side so you can copy the logic after the gramar re-apply.
  • If you change the name (label) of a bolt or spout, a new set of classes will be generated according to the new name, but the classes for the previous label/name will still remain. You have to hand copy the cope you want to keep.
org.gramar.storm.gramar/walkthrough.txt · Last modified: 2016/08/13 11:41 by chrisgerken