Skip to main content

Bootstraping а PostgreSQL logical replication application

Recently I was involved into a project for replicating data from PostgreSQL to Datalake. I heard about PostgreSQL logical data replication a few years ago and found a few hours to play with the Change data capture fuction of these popular database. Today's blog is all about PostgreSQL logical replication with a few fragments of code to test the functionality. 

Change data capture is an architecture design principle that allows us to capture the recently data changed into the database in realtime. The changes could be any DML operations: Insert, update, delete.  TThere are a few charactaritics of CDC as follow:

  1. Logical replication uses publish-subscribe model. Subscriber pulls the data from the publication they subscribe to and manipulate the data.
  2. Only commited transactions on WAL archive (transaction log file) will be return by the CDC.
  3. CDC will return on orderd form how they commited on the database.
  4. Logical CDC doesn't impact on database performance.
  5. From the PostgreSQL v15, you can get the logical changes from specific table rows/columns
  6. You can use a several output plugins to get the data into different formats such as JSON, AVRO, Binary etc.
PostgreSQL logical replication

Note that, you can't use PostgreSQL logical and physical replication at the same time. The difference between physical replication and logical replication is that logical replication sends data over in a logical format whereas physical replication sends data over in a binary format.

 Before dive into the PostgreSQL logical replication, it will be usefull to understand a few concepts:

  • Publication. From the PostgreSQL documentation: A publication is a set of changes generated from a table or a group of tables, and might also be described as a change set or replication set. Each publication exists in only one database.
    • Every publication can have multiple subscribers.
    • A publication is created using the CREATE PUBLICATION command and may later be altered or dropped using corresponding commands.

  • Logical decoding.  From the PostgreSQL manual, "Logical decoding is the process of extracting all persistent changes to a database's tables into a coherent, easy to understand format which can be interpreted without detailed knowledge of the database's internal state." Transaction Log or the WAL archive format is binary and can change over time. Also, the WAL contains changes for every database in the server. Here logical decoding comes into play by providing changes only one database per slot through API with facilities to writing into an output plugin, which can return the data into any format you define. 
  • Replication slot. A replication slot is a sequence of changes that happened on the database. The slot manages the set of changes and sent through the plugin. There can be more than one slot in the database. PostgreSQL provides a set of FUNCTIONS and VIEWS for working with the Replication slot such as pg_create_logical_replication_slot or pg_replication_slots.
  • Output Plugin. A plugin is a library written in different programming languages, which accepts the changes and decodes the changes into a format you preferred. Plugins need to be compiled and installed before uses. Here is the list of all available plugins for PostgreSQL. 

Now, let's try to develop a simple Java Application for trying all the concepts we have learn before. What are we going to do?

  1. Configure the database.
  2. Develop a Java application to use the PostgreSQL Replication API to change data capture.
  3. Java application will use the "test_decoding" output plugin to print the changes into the console.
  4. Also, we will try the pGEasyRepliction project to get the CDC into JSON format.

Step 1. Configure the database. 

Enable the logical replication of the PostgreSQL database into the postgresql.conf file as shown below:

max_wal_senders = 4             # max number of walsender processes
wal_keep_segments = 4           # in logfile segments, 16MB each; 0 disables
wal_level = logical             # minimal, replica, or logical
max_replication_slots = 4       # max number of replication slots
Allow replication connections from localhost and set the replication privilages into the pg_hba.conf file as follows:

local   replication   all                   trust
host    replication   all   127.0.0.1/32    md5
host    replication   all   ::1/128         md5

Restart the database after making the chnages.

Step 2. Create two tables by the following DDL script.

create table dept(
  deptno integer,
  dname  text,
  loc    text,
  constraint pk_dept primary key (deptno)
);

create table emp(
  empno    integer,
  ename    text,
  job      text,
  mgr      integer,
  hiredate date,
  sal      integer,
  comm     integer,
  deptno   integer,
  constraint pk_emp primary key (empno),
  constraint fk_deptno foreign key (deptno) references dept (deptno)
);

CREATE UNIQUE INDEX ename_idx ON emp (ename);

Step 3. Add a few rows into the tables.

insert into dept
values(10, 'ACCOUNTING', 'NEW YORK');
insert into dept
values(20, 'RESEARCH', 'DALLAS');
insert into dept
values(30, 'SALES', 'CHICAGO');
insert into dept
values(40, 'OPERATIONS', 'BOSTON');

insert into emp
values(
 7839, 'KING', 'PRESIDENT', null,
 to_date('17-11-1981','dd-mm-yyyy'),
 5000, null, 10
);
insert into emp
values(
 7698, 'BLAKE', 'MANAGER', 7839,
 to_date('1-5-1981','dd-mm-yyyy'),
 2850, null, 30
);
insert into emp
values(
 7782, 'CLARK', 'MANAGER', 7839,
 to_date('9-6-1981','dd-mm-yyyy'),
 2450, null, 10
);
insert into emp
values(
 7566, 'JONES', 'MANAGER', 7839,
 to_date('2-4-1981','dd-mm-yyyy'),
 2975, null, 20
);
insert into emp
values(
 7788, 'SCOTT', 'ANALYST', 7566,
 to_date('13-07-87','dd-mm-rr') - 85,
 3000, null, 20
);

Step 4. Create a standalone Java application project. You can use your favoutite IDE or Maven/Gradle to build, compile and run the application. The entire project is located on Github

First, we have create a database connection in replication mode. In this mode, the connection is not available for execute SQL statements. 

InputStream is = new FileInputStream(rootPath+"application.properties");//PostgreSQLConsumer.class.getResourceAsStream(rootPath+"application.properties");

Properties config = new Properties();

config.load(is);

Properties props = new Properties();
PGProperty.USER.set(props, config.getProperty("db.user"));
PGProperty.PASSWORD.set(props, config.getProperty("db.password"));
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");

Connection con = DriverManager.getConnection(config.getProperty("db.url"), props);
PGConnection replConnection = con.unwrap(PGConnection.class);

In the above fragments of the code, we are reading the database connection properties from the application.properties file 

db.url=jdbc:postgresql://localhost:5432/postgres
db.user=postgres
db.password=postgres
repl.logical.slot=10

Step 5. Create a Replication slot via API

replConnection.getReplicationAPI()
        .createReplicationSlot()
        .logical()
        .withSlotName("demo_logical_slot_"+ config.getProperty("repl.logical.slot"))
        .withOutputPlugin("test_decoding")
        .make();

Note that, you can also create replication slot by SQL 

#Create a publication that publishes all changes in all tables:

CREATE PUBLICATION alltables FOR ALL TABLES;
#Create a publication that publishes all changes for table DEPT, but replicates only columns deptno and dname:

CREATE PUBLICATION DEPT_filtered FOR TABLE users (deptno, dname);
Step 6. Create a Replication Stream progamtically 

        PGReplicationStream stream =
                replConnection.getReplicationAPI()
                        .replicationStream()
                        .logical()
                        .withSlotName("demo_logical_slot_"+ config.getProperty("repl.logical.slot"))
                        .withSlotOption("include-xids", true)
                        .withSlotOption("skip-empty-xacts", true)
                        .withStatusInterval(20, TimeUnit.SECONDS)
                        .start();
The above replication stream will send the changes since the creation of the replication slot. 
Step 7. Recieve CDC event for further processing as follows:

while (true) {

    ByteBuffer msg = stream.readPending();

    if (msg == null) {
        TimeUnit.MILLISECONDS.sleep(10L);
        continue;
    }

    int offset = msg.arrayOffset();
    byte[] source = msg.array();
    int length = source.length - offset;
    logger.info(new String(source, offset, length));


    //feedback by LOG sequence Number
    stream.setAppliedLSN(stream.getLastReceiveLSN());
    stream.setFlushedLSN(stream.getLastReceiveLSN());

}
The above frugment of the code will recieve the CDC and print the event on console. Open the postgreSQL sqleditor and execute the following SQL query:

INSERT INTO public.dept (deptno,dname,loc) VALUES
	 (144,'RESEARCH1','DALLAS1');
update public.dept set dname='refresh' where deptno =144;
delete from public.dept where deptno = 144;

commit;
The output should be looks like these:

BEGIN 779
table public.dept: INSERT: deptno[integer]:144 dname[text]:'RESEARCH1' loc[text]:'DALLAS1'
table public.dept: UPDATE: deptno[integer]:144 dname[text]:'refresh' loc[text]:'DALLAS1'
table public.dept: DELETE: deptno[integer]:144
COMMIT 779

The "test_decoding" output plugin is very usefull as a starting point. You can install output plugin you prefered such as JSON, AVRO and continue develop your application. 

See the resource section to get the list of all plugins availabe for PostgreSQL. Moreover, I forked the pgEasyReplication project from the GitHub which can return the CDC events into JSON format. The library is very easy to use. For more information, see the App.Java class to know how to use the library. 


Resources:

  1. PostgreSQL output plugin list https://wiki.postgresql.org/wiki/Logical_Decoding_Plugins 
  2. PostgreSQL logical decoding concept.
  3. Physical and Logical replication API https://access.crunchydata.com/documentation/pgjdbc/42.1.1/replication.html 

Comments

Timur8888 said…
Thanks for great post, pls can you tell is it possible to use this logical changes for event sourcing?

Popular posts from this blog

Send e-mail with attachment through OSB

Oracle Service Bus (OSB) contains a good collection of adapter to integrate with any legacy application, including ftp, email, MQ, tuxedo. However e-mail still recognize as a stable protocol to integrate with any application asynchronously. Send e-mail with attachment is a common task of any business process. Inbound e-mail adapter which, integrated with OSB support attachment but outbound adapter doesn't. This post is all about sending attachment though JavaCallout action. There are two ways to handle attachment in OSB: 1) Use JavaCallout action to pass the binary data for further manipulation. It means write down a small java library which will get the attachment and send the e-mail. 2) Use integrated outbound e-mail adapter to send attachment, here you have to add a custom variable named attachment and assign the binary data to the body of the attachment variable. First option is very common and easy to implement through javax.mail api, however a much more developer manage t

Tip: SQL client for Apache Ignite cache

A new SQL client configuration described in  The Apache Ignite book . If it got you interested, check out the rest of the book for more helpful information. Apache Ignite provides SQL queries execution on the caches, SQL syntax is an ANSI-99 compliant. Therefore, you can execute SQL queries against any caches from any SQL client which supports JDBC thin client. This section is for those, who feels comfortable with SQL rather than execute a bunch of code to retrieve data from the cache. Apache Ignite out of the box shipped with JDBC driver that allows you to connect to Ignite caches and retrieve distributed data from the cache using standard SQL queries. Rest of the section of this chapter will describe how to connect SQL IDE (Integrated Development Environment) to Ignite cache and executes some SQL queries to play with the data. SQL IDE or SQL editor can simplify the development process and allow you to get productive much quicker. Most database vendors have their own front-en

Load balancing and fail over with scheduler

Every programmer at least develop one Scheduler or Job in their life time of programming. Nowadays writing or developing scheduler to get you job done is very simple, but when you are thinking about high availability or load balancing your scheduler or job it getting some tricky. Even more when you have a few instance of your scheduler but only one can be run at a time also need some tricks to done. A long time ago i used some data base table lock to achieved such a functionality as leader election. Around 2010 when Zookeeper comes into play, i always preferred to use Zookeeper to bring high availability and scalability. For using Zookeeper you have to need Zookeeper cluster with minimum 3 nodes and maintain the cluster. Our new customer denied to use such a open source product in their environment and i was definitely need to find something alternative. Definitely Quartz was the next choose. Quartz makes developing scheduler easy and simple. Quartz clustering feature brings the HA and