Tuesday

Database DML/DDL event processing with Oracle Database change notification

A few years ago in one of my blog post, i described how to use Oracle database changed notification to update HazelCast cache in application server layer. For now this is one of the finest use case of using Oracle database changed notification, but you can also use this possibilities for solving others problem such as event processing in legacy table. For instance, you are developing dispute system for any Banking system. For banking core system, dispute is as a another banking transaction, when any dispute comes from any client, operator of bank should react on this transaction. Most of the time disputes keeps in same storage (tables) along with another transactions. Such type of tables can keeps billions of rows, and when you would like to get notified when a few of the rows changes, you have a few options in your hand:
1) Poll periodically, schedular which will poll the whole table periodically to get changed data.
2) Using oracle trigger to send some notification (stored procedure or oracle embedded java implemention)
3) Using Oracle Database change notification or Oracle continuous Query Notification
if you have a few billions of rows in OLTP system, first option is not a option at all, using trigger can also hit performance issue in OLTP system. For asynchronous event processing from oracle objects, Oracle database change notification is one the best possible option. Oracle provides three different implementing of change notification: Oracle continuous query notification and Oracle database change notification. Oracle continuous notification provides only C and pl/sql implements, in the other hand Oracle changed notification has java implementation. To enable changed notification you should only have to grant the privilege as follows and you are ready to go:
grant change notification to USER_NAME;
One thing you have to mentioned that, with notification you will only get the ROWID and objects name or ID not the entire row, it's means you will have to do extra select query by rowid to get the row. Below illustration of the simple flow of the process:
The explanation of the steps in above illustration is as follows:
1) In this example, client register the lister to certain user Oracle objects and the listener also.
2) The database populate the registration information in the data dictionary
3) Any partner application making any changes in User Objects, it's may be any DML/DDL operations
4) Oracle JOBQ background process is notified of a new change notification message
5) JOBQ process notify the client app listener.
6) Client listener gets the ROWID and the user objects ID and sending the information to the MQ server, such as kafka
7) Subscriber of the topic of Kafka server getting the information of ROWID
8) Processor query the User objects to get the result set and start processing the updated information
Lets take a quick look щи pseudo code (all the code you will find in the git hub)

Registration of the notification:
package com.blu.db;

import oracle.jdbc.OracleStatement;
import oracle.jdbc.dcn.*;
import oracle.jdbc.driver.OracleConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class DBNotifactionConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DBNotifactionConsumer.class);

    private NsiOracleConnection oracleConnection;
    private static final Properties properties = new Properties();
    private String queryString;

    public String getQueryString() {
        return queryString;
    }

    public void setQueryString(String queryString) {
        this.queryString = queryString;
    }

    static{
        properties.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
        properties.setProperty(OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION, "true"); //Activates query change notification instead of object change notification.
    }

    public DBNotifactionConsumer(NsiOracleConnection oracleConnection) {
        this.oracleConnection = oracleConnection;
    }

    public NsiOracleConnection getOracleConnection() {
        return oracleConnection;
    }

    public void registerNotification() throws SQLException{
        DatabaseChangeRegistration databaseChangeRegistration =  getOracleConnection().getConnection().registerDatabaseChangeNotification(properties);
        databaseChangeRegistration.addListener(new NsiListner());
        Statement stm = getOracleConnection().getConnection().createStatement();
        ((OracleStatement) stm).setDatabaseChangeRegistration(databaseChangeRegistration);
        ResultSet rs;
        for(String queryString : getQueryString().split(";")){
            rs = stm.executeQuery(queryString);
            while(rs.next()){
            }
            rs.close();
        }
        // get tables from dcr
        String[] tables = databaseChangeRegistration.getTables();
        for(String str : tables){
            LOGGER.info("Registreted Tables:{}", str);
        }
        if(!stm.isClosed()){
            stm.close();
        }
    }
}
In the above code i used query changed notification.
Listener without kafka is very simple
public class NsiListner implements DatabaseChangeListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(NsiListner.class);

    @Override
    public void onDatabaseChangeNotification(DatabaseChangeEvent databaseChangeEvent) {
        for(QueryChangeDescription qcd : databaseChangeEvent.getQueryChangeDescription()){
            LOGGER.info("Query Id: {}", qcd.getQueryId());
            LOGGER.info("Event Type: {}", qcd.getQueryChangeEventType().name());
            for(TableChangeDescription tcd : qcd.getTableChangeDescription()){
                //ClassDescriptor descriptor = OracleChangeNotificationListener.this.descriptorsByTable.get(new DatabaseTable(tcd.getTableName()));
                LOGGER.info("table Name: {}", tcd.getTableName()); // table name is empty
                LOGGER.info("Object ID: {}", tcd.getObjectNumber()); // use object id]]
                for(RowChangeDescription rcd : tcd.getRowChangeDescription()){
                    LOGGER.info("Row ID:" + rcd.getRowid().stringValue() + " Operation:" + rcd.getRowOperation().name());
                }
            }

        }

    }
}
If you want to fault tolerant your listener application, you can use several listener application in a cluster and use leader election to run one listener at time. Here is the pseudo code of the simple leader elector, note that i am using curator to avoid boiler plate code.
package com.blu.curator;

import com.blu.db.DBNotifactionConsumer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.concurrent.atomic.AtomicInteger;

public class Leader extends LeaderSelectorListenerAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleClient.class);
    private String clientName;
    private CuratorFramework client;
    private String path;
    private LeaderSelector leaderSelector;
    // oracle change notification
    private ApplicationContext ctx;// = new ClassPathXmlApplicationContext("spring-context.xml");
    private DBNotifactionConsumer consumer;//= (DBNotifactionConsumer) ctx.getBean("consumer");

    public Leader(String clientName, CuratorFramework client, String path) {
        this.clientName = clientName;
        this.client = client;
        this.path = path;
        leaderSelector = new LeaderSelector(this.client,this.path, this);
        leaderSelector.autoRequeue();
        // initialize oracle change notification
        ctx = new ClassPathXmlApplicationContext("spring-context.xml");
        consumer = (DBNotifactionConsumer)ctx.getBean("consumer");
    }

    @Override
    public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
        // run oracle notification here

        consumer.registerNotification();
        System.out.println(this.clientName + " is now the leader!!");
        Thread.sleep(Integer.MAX_VALUE);
    }
    public void start(){
        leaderSelector.start();
    }
}
When one of the listener will be not available, another one will replace it and continue to getting notification from Oracle DB. That's enough for today, guess above information will help somebody to quick start with Oracle notification changed.
Post a Comment