Skip to main content

Real time data processing with Storm, ETL from Oracle to Cassandra

Last couple of months we are using Apache Hadoop Map Reduce batch processing to analyze a huge amount of data. We have a few legacy product where we can't consider to using Cassandra big table database. A few of them uses Oracle Database as their primary storage. As our requirements we have to extract the data from the rdbms, parse the payload and load it to Cassandra for aggregating. Here i have decided to use Storm for real time data processing. Our usecase is as follows:
1) Storm spout connects to Oracle Database and collects data from particular table with some intervals.
2) Storm bolt parses the data with some fashion and emit to Storm-cassandra bolt to store the row into Cassandra DB.

Here is the fragment code of project. First i have create a Jdbc connector class, class contain a few class variables which contradicting with Storm ideology, as far i have just need one spout as input - it's enough for me.
package storm.contrib.jdbc;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;


public class JdbcConnection {
    private static Connection connection;
    private static final String JDBC_DRIVER = "oracle.jdbc.driver.OracleDriver";

    private JdbcConnection() {   }
    private static String jdbcUrl;
    private static String userName;
    private static String password;
    private static String query;
    private static String interval;
    private static final Logger logger = LoggerFactory.getLogger(JdbcConnection.class);

    static{
        Properties prop = new Properties();
        try {
            prop.load(JdbcConnection.class.getResourceAsStream("/connection.properties"));
            jdbcUrl = prop.getProperty("jdbc.url");
            userName = prop.getProperty("jdbc.username");
            password = prop.getProperty("jdbc.password");
            query = prop.getProperty("jdbc.query");
            interval = prop.getProperty("poll.interval");
            
        } catch (IOException e) {
            logger.error(e.getMessage());
        }
        
    }
    
    public static Connection getConnection() {
        if(connection != null ){
            return connection;
        }
        try {
            Class.forName(JDBC_DRIVER);
            connection = DriverManager.getConnection(getJdbcUrl(),getUserName(),getPassword());
            connection.setAutoCommit(false);
        } catch (ClassNotFoundException e) {
            // throw the exception
            logger.error(e.getMessage());
        } catch(SQLException e){
            logger.error(e.getMessage());
        }
        return connection;
    }

    public static String getJdbcUrl() {
        return jdbcUrl;
    }

    public static String getUserName() {
        return userName;
    }

    public static String getPassword() {
        return password;
    }

    public static String getQuery() {
        return query;
    }

    public static String getInterval() {
        return interval;
    }
}
Class JdbcConnection reads connection.properties file from the classpath and initialize the connection.
Now we are ready to create our Oracle Spout
package storm.contrib.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;

import backtype.storm.utils.Utils;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.contrib.jdbc.JdbcConnection;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.*;
import java.util.*;

import static backtype.storm.utils.Utils.tuple;

public class OracleSpout implements IRichSpout{
    private SpoutOutputCollector collector;
    private TopologyContext context;
    private transient Connection connection;
    private boolean completed =false;
    private String query;
    private long interval;
    private static final Logger logger = LoggerFactory.getLogger(OracleSpout.class);
    private Fields outputFields;
    public OracleSpout(final Fields outputFields){
        this.outputFields = outputFields;
    }
    public boolean isDistributed() {
        return false;
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(this.outputFields);
    }

    public Map<String, Object> getComponentConfiguration() {

        return null;
    }
    // open connection to Oracle DB
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
        this.context = topologyContext;
        // connect to DB
        connection = JdbcConnection.getConnection();
        query = JdbcConnection.getQuery();
        interval = Long.valueOf(JdbcConnection.getInterval());
    }
    public void close() {
        if(connection != null){
            try {
                connection.close();
            } catch (SQLException e) {
                logger.error(e.getMessage());
            }
        }
    }

    public void activate() {
    }

    public void deactivate() {
    }
    // read data and send to bolt
    public void nextTuple() {
        if(completed){
            Utils.sleep(interval);
        }
        Statement stm = null;
        if(connection != null){
            List<Object> tupleVal = new ArrayList<Object>();
            try {
                //stm = connection.createStatement();
                stm = connection.prepareStatement(query,ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE);
                ResultSet rs = stm.executeQuery(query);
                //ResultSetMetaData rsmd = rs.getMetaData();
                while(rs.next()){
                    ResultSetMetaData rsmd = rs.getMetaData();

                    for(int col=1; col <= rsmd.getColumnCount(); col++){
                        tupleVal.add(getDataByCol(rs, rsmd.getColumnType(col), col));
                    }
                    getCollector().emit(tuple(tupleVal.toArray()));
                    // delete the row
                    rs.deleteRow();
                    completed = true;
                }
            } catch (SQLException e) {
                logger.error(e.getMessage());
            }finally {
                tupleVal.clear();
                if(stm!=null)
                    try {
                        stm.close();
                    } catch (SQLException e) {
                        logger.error(e.getMessage());
                    }
            }
        }
    }

    public void ack(Object o)   {
        // @todo delete the record
        //logger.info("ack:", o);
    }

    public void fail(Object o) {
        logger.info("fail:", o);
    }
    private SpoutOutputCollector getCollector() {
        return collector;
    }

    private Object getDataByCol(ResultSet rs, int colType, int colIdx) throws SQLException{
        Object colData;
        switch (colType){
            case Types.CHAR:
            case Types.VARCHAR:
            case Types.CLOB:
                colData = rs.getString(colIdx);
                break;
            case Types.INTEGER:
            case Types.BIGINT:
            //case Types.:
                colData = rs.getLong(colIdx);
                break;
            case Types.DECIMAL:
            case Types.NUMERIC:
                colData = rs.getDouble(colIdx);
                break;
            case Types.DATE:
                colData = rs.getDate(colIdx);
                break;
            case Types.TIMESTAMP:
                colData = rs.getTimestamp(colIdx);
                break;
            case Types.BLOB:
                Blob blob = rs.getBlob(colIdx);
                InputStream is =  blob.getBinaryStream();
                colData = getBytes(is);
                //colData = rs.getBlob(colIdx);
                break;
            default:
                colData = rs.getString(colIdx);
                break;
        }
        return colData;
    }
    private byte[] getBytes(InputStream is){
        // Get the size of the file
        try {
            return IOUtils.toByteArray(is);
        } catch (IOException e) {
            e.printStackTrace(); 
            return new byte[0];
        }
    }
}
The class is self explanatory itself. In open method we initialized the jdbc connection, in the nextTuple method we query the table and emit tuple for the parse bolt.
package storm.contrib.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.contrib.parse.ParseMessage;

import javax.xml.stream.XMLStreamException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static backtype.storm.utils.Utils.tuple;

public class ParseMsgBolt implements IRichBolt {
    private OutputCollector collector;
    private static final Logger logger = LoggerFactory.getLogger(ParseMsgBolt.class);
    TopologyContext context;

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.context = topologyContext;
    }

    public void execute(Tuple tuple) {
        // parse
        Map<String, String> msgTuple = null;
        if(logMsg.length > 0){
            ParseSmevMessage3  parseMessage = new ParseSmevMessage3();
            List ls = new ArrayList();
            try {
                msgTuple =  parseMessage.parse(new String(logMsg));
                // add logid first;
                ls.add(logId);
                // add parsed fields values
                ls.addAll(msgTuple.values());
                this.collector.emit(ls);
                msgTuple.clear();
            } catch (XMLStreamException e) {
                // send failure msg
                collector.fail(tuple);
                logger.error(e.getMessage());
            }
        }
        //send ack
        this.collector.ack(tuple);
    }

    public void cleanup() {

    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("logid",
                                                "exct",
                                                "asen",
                                                "origc",
                                                "ard",
                                                "arc",
                                                "tcode",
                                                "asenc",
                                                "orign",
                                                "certsn",
                                                "arcn"));
                                                "request_id_ref",
                                                "origin_request_id_reg",
                                                "case_number",
                                                "cert",
                                                "messageid",
                                                "srv_sid",
                                                "test_msg",
                                                "status",
                                                "exchange_type"));
    }
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

package ru.atc.smev.pig.utils;

import com.ximpleware.*;

import java.io.ByteArrayInputStream;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class ParseSmevMessage3 {
    private final static String CERT_TYPE = "X509";
    private static final String CERTSN = "certsn";
    private static final String ROOT_TAG = "root_tag";

    private static Map<String, String> SEARCH_TAGS = new HashMap<String, String>(17);
    static{
        SEARCH_TAGS.put("/*[local-name() = 'Envelope']/*[local-name() = 'Header']/*[local-name() = 'Security']/*[local-name() = 'BinarySecurityToken']/text()", CERTSN);
        //SEARCH_TAGS.put("/Envelope/root_tag", ROOT_TAG);
        SEARCH_TAGS.put("/*[local-name() = 'Envelope']/*[local-name() = 'Header']/*[local-name() = 'Header']/*[local-name() = 'MessageId']/text()", "msgid");

        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Sender']/*[local-name() = 'Code']/text()", "asenc");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Sender']/*[local-name() = 'Name']/text()", "asen");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Recipient']/*[local-name() = 'Code']/text()", "arc");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Recipient']/*[local-name() = 'Name']/text()", "arcn");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Originator']/*[local-name() = 'Code']/text()", "origc");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Originator']/*[local-name() = 'Name']/text()", "orign");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'Date']/text()", "ard");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'TypeCode']/text()", "tcode");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'ExchangeType']/text()", "exct");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'ServiceCode']/text()", "serc");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'RequestIdRef']/text()", "reqidr");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'RequestIdRef']/text()", "reqidr");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'OriginRequestIdRef']/text()", "origridr");
        SEARCH_TAGS.put("//*[local-name() = 'Message']/*[local-name() = 'CaseNumber']/text()", "can");

    }
    private String getSerialNumber(final String cert) throws CertificateException {
        byte[] derFile = org.bouncycastle.util.encoders.Base64.decode(cert.getBytes());

        final CertificateFactory cf = CertificateFactory.getInstance(CERT_TYPE);
        final X509Certificate x509 = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(derFile));
        // get serial number in hex
        return x509.getSerialNumber().toString(10);
    }
    public Map<String, String> parse (String xml) throws Exception{
        final Map<String, String> result = new HashMap<String, String>(17);
        final VTDGen vg = new VTDGen();
        final AutoPilot ap = new AutoPilot();
        int i;
        try {
            vg.setDoc(xml.getBytes());
            vg.parse(true);
            VTDNav vn = vg.getNav();
            ap.bind(vn);
            for(String key : SEARCH_TAGS.keySet()){
                ap.selectXPath(key);
                while( (i = ap.evalXPath())!=-1 ){
                    //System.out.println(SEARCH_TAGS.get(key)+":"+ vn.toString(i));
                    result.put(SEARCH_TAGS.get(key),vn.toString(i));
                    if(vn.matchElement("wsse:BinarySecurityToken")){
                        result.put(CERTSN, getSerialNumber(vn.toString(i)));
                    }
                }
                ap.resetXPath();
            }
        } catch (XPathParseException e) {
            e.printStackTrace();
        } catch(XPathEvalException e){
            e.printStackTrace();            
        } catch(NavException e){
            e.printStackTrace();
        }
        return result;
    }
}
I have used vtd xml library to parse the file and emit it to cassandra bolt, which store the row to the Cassandra DB.
Here is the topology class:
package storm.contrib.topology;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.contrib.cassandra.bolt.BatchingCassandraBolt;
import backtype.storm.contrib.cassandra.bolt.CassandraBolt;
import backtype.storm.contrib.cassandra.bolt.DefaultBatchingCassandraBolt;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import storm.contrib.bolt.ParseMsgBolt;
import storm.contrib.spout.OracleSpout;

public class CassandraBoltTopology {
    public static void main(String[] args) {

        TopologyBuilder tBuilder = new TopologyBuilder();
        // define oracle table column name 
        tBuilder.setSpout("oracle-reader", new OracleSpout(new Fields("logobject_id",
                                                        "oper_name",
                                                        "sender_code",
                                                        "sender_name",
                                                        "recipient_code",
                                                        "recipient_name",
                                                        "originator_code",
                                                        "originator_name",
                                                        "request_date",
                                                        "type_code",
                                                        "service_code",
                                                        "request_id_ref",
                                                        "origin_request_id_reg",
                                                        "case_number",
                                                        "cert",
                                                        "messageid",
                                                        "srv_sid",
                                                        "test_msg",
                                                        "status",
                                                        "exchange_type")));
        tBuilder.setBolt("Msgparser", new ParseMsgBolt()).shuffleGrouping("oracle-reader");
        tBuilder.setBolt("save to cassandra", new CassandraBolt("stormcf", "logid")).shuffleGrouping("Msgparser");
  
        Config config = new Config();
        config.put(CassandraBolt.CASSANDRA_HOST, "192.168.XXX.XXX");
        config.put(CassandraBolt.CASSANDRA_PORT, 9160);
        config.put(CassandraBolt.CASSANDRA_KEYSPACE, "stormks");
        config.setDebug(true);
        
  //Topology run
        config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Getting-Started-Toplogie", config, tBuilder.createTopology());
    }
}
Here we first declare and define the column names of the table where we would collect the data. In the current storm version we could not define out put field dynamically. Next we set the bolt accordingly and configure the CassandraBolt. Project runs well on two machine cluster with any errors. Anyway i have a plan to add transaction feature in spout. Hope this will help someone to kick start in Storm. Here is the properties file:
jdbc.url=jdbc:oracle:thin:@192.168.1XX.XX:1521:TEST
jdbc.username=orawsm
jdbc.password=orawsm
jdbc.query=SELECT t.* FROM LOG_OBJECTS t

poll.interval=10000

Comments

Popular posts from this blog

8 things every developer should know about the Apache Ignite caching

Any technology, no matter how advanced it is, will not be able to solve your problems if you implement it improperly. Caching, precisely when it comes to the use of a distributed caching, can only accelerate your application with the proper use and configurations of it. From this point of view, Apache Ignite is no different, and there are a few steps to consider before using it in the production environment. In this article, we describe various technics that can help you to plan and adequately use of Apache Ignite as cutting-edge caching technology. Do proper capacity planning before using Ignite cluster. Do paperwork for understanding the size of the cache, number of CPUs or how many JVMs will be required. Let’s assume that you are using Hibernate as an ORM in 10 application servers and wish to use Ignite as an L2 cache. Calculate the total memory usages and the number of Ignite nodes you have to need for maintaining your SLA. An incorrect number of the Ignite nodes can become a b...

Tip: SQL client for Apache Ignite cache

A new SQL client configuration described in  The Apache Ignite book . If it got you interested, check out the rest of the book for more helpful information. Apache Ignite provides SQL queries execution on the caches, SQL syntax is an ANSI-99 compliant. Therefore, you can execute SQL queries against any caches from any SQL client which supports JDBC thin client. This section is for those, who feels comfortable with SQL rather than execute a bunch of code to retrieve data from the cache. Apache Ignite out of the box shipped with JDBC driver that allows you to connect to Ignite caches and retrieve distributed data from the cache using standard SQL queries. Rest of the section of this chapter will describe how to connect SQL IDE (Integrated Development Environment) to Ignite cache and executes some SQL queries to play with the data. SQL IDE or SQL editor can simplify the development process and allow you to get productive much quicker. Most database vendors have their own fron...

Load balancing and fail over with scheduler

Every programmer at least develop one Scheduler or Job in their life time of programming. Nowadays writing or developing scheduler to get you job done is very simple, but when you are thinking about high availability or load balancing your scheduler or job it getting some tricky. Even more when you have a few instance of your scheduler but only one can be run at a time also need some tricks to done. A long time ago i used some data base table lock to achieved such a functionality as leader election. Around 2010 when Zookeeper comes into play, i always preferred to use Zookeeper to bring high availability and scalability. For using Zookeeper you have to need Zookeeper cluster with minimum 3 nodes and maintain the cluster. Our new customer denied to use such a open source product in their environment and i was definitely need to find something alternative. Definitely Quartz was the next choose. Quartz makes developing scheduler easy and simple. Quartz clustering feature brings the HA and...