Skip to main content

Database DML/DDL event processing with Oracle Database change notification

A few years ago in one of my blog post, i described how to use Oracle database changed notification to update HazelCast cache in application server layer. For now this is one of the finest use case of using Oracle database changed notification, but you can also use this possibilities for solving others problem such as event processing in legacy table. For instance, you are developing dispute system for any Banking system. For banking core system, dispute is as a another banking transaction, when any dispute comes from any client, operator of bank should react on this transaction. Most of the time disputes keeps in same storage (tables) along with another transactions. Such type of tables can keeps billions of rows, and when you would like to get notified when a few of the rows changes, you have a few options in your hand:
1) Poll periodically, schedular which will poll the whole table periodically to get changed data.
2) Using oracle trigger to send some notification (stored procedure or oracle embedded java implemention)
3) Using Oracle Database change notification or Oracle continuous Query Notification
if you have a few billions of rows in OLTP system, first option is not a option at all, using trigger can also hit performance issue in OLTP system. For asynchronous event processing from oracle objects, Oracle database change notification is one the best possible option. Oracle provides three different implementing of change notification: Oracle continuous query notification and Oracle database change notification. Oracle continuous notification provides only C and pl/sql implements, in the other hand Oracle changed notification has java implementation. To enable changed notification you should only have to grant the privilege as follows and you are ready to go:
grant change notification to USER_NAME;
One thing you have to mentioned that, with notification you will only get the ROWID and objects name or ID not the entire row, it's means you will have to do extra select query by rowid to get the row. Below illustration of the simple flow of the process:
The explanation of the steps in above illustration is as follows:
1) In this example, client register the lister to certain user Oracle objects and the listener also.
2) The database populate the registration information in the data dictionary
3) Any partner application making any changes in User Objects, it's may be any DML/DDL operations
4) Oracle JOBQ background process is notified of a new change notification message
5) JOBQ process notify the client app listener.
6) Client listener gets the ROWID and the user objects ID and sending the information to the MQ server, such as kafka
7) Subscriber of the topic of Kafka server getting the information of ROWID
8) Processor query the User objects to get the result set and start processing the updated information
Lets take a quick look щи pseudo code (all the code you will find in the git hub)

Registration of the notification:
package com.blu.db;

import oracle.jdbc.OracleStatement;
import oracle.jdbc.dcn.*;
import oracle.jdbc.driver.OracleConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class DBNotifactionConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DBNotifactionConsumer.class);

    private NsiOracleConnection oracleConnection;
    private static final Properties properties = new Properties();
    private String queryString;

    public String getQueryString() {
        return queryString;
    }

    public void setQueryString(String queryString) {
        this.queryString = queryString;
    }

    static{
        properties.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
        properties.setProperty(OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION, "true"); //Activates query change notification instead of object change notification.
    }

    public DBNotifactionConsumer(NsiOracleConnection oracleConnection) {
        this.oracleConnection = oracleConnection;
    }

    public NsiOracleConnection getOracleConnection() {
        return oracleConnection;
    }

    public void registerNotification() throws SQLException{
        DatabaseChangeRegistration databaseChangeRegistration =  getOracleConnection().getConnection().registerDatabaseChangeNotification(properties);
        databaseChangeRegistration.addListener(new NsiListner());
        Statement stm = getOracleConnection().getConnection().createStatement();
        ((OracleStatement) stm).setDatabaseChangeRegistration(databaseChangeRegistration);
        ResultSet rs;
        for(String queryString : getQueryString().split(";")){
            rs = stm.executeQuery(queryString);
            while(rs.next()){
            }
            rs.close();
        }
        // get tables from dcr
        String[] tables = databaseChangeRegistration.getTables();
        for(String str : tables){
            LOGGER.info("Registreted Tables:{}", str);
        }
        if(!stm.isClosed()){
            stm.close();
        }
    }
}
In the above code i used query changed notification.
Listener without kafka is very simple
public class NsiListner implements DatabaseChangeListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(NsiListner.class);

    @Override
    public void onDatabaseChangeNotification(DatabaseChangeEvent databaseChangeEvent) {
        for(QueryChangeDescription qcd : databaseChangeEvent.getQueryChangeDescription()){
            LOGGER.info("Query Id: {}", qcd.getQueryId());
            LOGGER.info("Event Type: {}", qcd.getQueryChangeEventType().name());
            for(TableChangeDescription tcd : qcd.getTableChangeDescription()){
                //ClassDescriptor descriptor = OracleChangeNotificationListener.this.descriptorsByTable.get(new DatabaseTable(tcd.getTableName()));
                LOGGER.info("table Name: {}", tcd.getTableName()); // table name is empty
                LOGGER.info("Object ID: {}", tcd.getObjectNumber()); // use object id]]
                for(RowChangeDescription rcd : tcd.getRowChangeDescription()){
                    LOGGER.info("Row ID:" + rcd.getRowid().stringValue() + " Operation:" + rcd.getRowOperation().name());
                }
            }

        }

    }
}
If you want to fault tolerant your listener application, you can use several listener application in a cluster and use leader election to run one listener at time. Here is the pseudo code of the simple leader elector, note that i am using curator to avoid boiler plate code.
package com.blu.curator;

import com.blu.db.DBNotifactionConsumer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.concurrent.atomic.AtomicInteger;

public class Leader extends LeaderSelectorListenerAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleClient.class);
    private String clientName;
    private CuratorFramework client;
    private String path;
    private LeaderSelector leaderSelector;
    // oracle change notification
    private ApplicationContext ctx;// = new ClassPathXmlApplicationContext("spring-context.xml");
    private DBNotifactionConsumer consumer;//= (DBNotifactionConsumer) ctx.getBean("consumer");

    public Leader(String clientName, CuratorFramework client, String path) {
        this.clientName = clientName;
        this.client = client;
        this.path = path;
        leaderSelector = new LeaderSelector(this.client,this.path, this);
        leaderSelector.autoRequeue();
        // initialize oracle change notification
        ctx = new ClassPathXmlApplicationContext("spring-context.xml");
        consumer = (DBNotifactionConsumer)ctx.getBean("consumer");
    }

    @Override
    public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
        // run oracle notification here

        consumer.registerNotification();
        System.out.println(this.clientName + " is now the leader!!");
        Thread.sleep(Integer.MAX_VALUE);
    }
    public void start(){
        leaderSelector.start();
    }
}
When one of the listener will be not available, another one will replace it and continue to getting notification from Oracle DB. That's enough for today, guess above information will help somebody to quick start with Oracle notification changed.

Comments

Popular posts from this blog

Send e-mail with attachment through OSB

Oracle Service Bus (OSB) contains a good collection of adapter to integrate with any legacy application, including ftp, email, MQ, tuxedo. However e-mail still recognize as a stable protocol to integrate with any application asynchronously. Send e-mail with attachment is a common task of any business process. Inbound e-mail adapter which, integrated with OSB support attachment but outbound adapter doesn't. This post is all about sending attachment though JavaCallout action. There are two ways to handle attachment in OSB: 1) Use JavaCallout action to pass the binary data for further manipulation. It means write down a small java library which will get the attachment and send the e-mail. 2) Use integrated outbound e-mail adapter to send attachment, here you have to add a custom variable named attachment and assign the binary data to the body of the attachment variable. First option is very common and easy to implement through javax.mail api, however a much more developer manage t

Tip: SQL client for Apache Ignite cache

A new SQL client configuration described in  The Apache Ignite book . If it got you interested, check out the rest of the book for more helpful information. Apache Ignite provides SQL queries execution on the caches, SQL syntax is an ANSI-99 compliant. Therefore, you can execute SQL queries against any caches from any SQL client which supports JDBC thin client. This section is for those, who feels comfortable with SQL rather than execute a bunch of code to retrieve data from the cache. Apache Ignite out of the box shipped with JDBC driver that allows you to connect to Ignite caches and retrieve distributed data from the cache using standard SQL queries. Rest of the section of this chapter will describe how to connect SQL IDE (Integrated Development Environment) to Ignite cache and executes some SQL queries to play with the data. SQL IDE or SQL editor can simplify the development process and allow you to get productive much quicker. Most database vendors have their own front-en

Load balancing and fail over with scheduler

Every programmer at least develop one Scheduler or Job in their life time of programming. Nowadays writing or developing scheduler to get you job done is very simple, but when you are thinking about high availability or load balancing your scheduler or job it getting some tricky. Even more when you have a few instance of your scheduler but only one can be run at a time also need some tricks to done. A long time ago i used some data base table lock to achieved such a functionality as leader election. Around 2010 when Zookeeper comes into play, i always preferred to use Zookeeper to bring high availability and scalability. For using Zookeeper you have to need Zookeeper cluster with minimum 3 nodes and maintain the cluster. Our new customer denied to use such a open source product in their environment and i was definitely need to find something alternative. Definitely Quartz was the next choose. Quartz makes developing scheduler easy and simple. Quartz clustering feature brings the HA and