Skip to main content

Bootstraping а PostgreSQL logical replication application

Recently I was involved into a project for replicating data from PostgreSQL to Datalake. I heard about PostgreSQL logical data replication a few years ago and found a few hours to play with the Change data capture fuction of these popular database. Today's blog is all about PostgreSQL logical replication with a few fragments of code to test the functionality. 

Change data capture is an architecture design principle that allows us to capture the recently data changed into the database in realtime. The changes could be any DML operations: Insert, update, delete.  TThere are a few charactaritics of CDC as follow:

  1. Logical replication uses publish-subscribe model. Subscriber pulls the data from the publication they subscribe to and manipulate the data.
  2. Only commited transactions on WAL archive (transaction log file) will be return by the CDC.
  3. CDC will return on orderd form how they commited on the database.
  4. Logical CDC doesn't impact on database performance.
  5. From the PostgreSQL v15, you can get the logical changes from specific table rows/columns
  6. You can use a several output plugins to get the data into different formats such as JSON, AVRO, Binary etc.
PostgreSQL logical replication

Note that, you can't use PostgreSQL logical and physical replication at the same time. The difference between physical replication and logical replication is that logical replication sends data over in a logical format whereas physical replication sends data over in a binary format.

 Before dive into the PostgreSQL logical replication, it will be usefull to understand a few concepts:

  • Publication. From the PostgreSQL documentation: A publication is a set of changes generated from a table or a group of tables, and might also be described as a change set or replication set. Each publication exists in only one database.
    • Every publication can have multiple subscribers.
    • A publication is created using the CREATE PUBLICATION command and may later be altered or dropped using corresponding commands.

  • Logical decoding.  From the PostgreSQL manual, "Logical decoding is the process of extracting all persistent changes to a database's tables into a coherent, easy to understand format which can be interpreted without detailed knowledge of the database's internal state." Transaction Log or the WAL archive format is binary and can change over time. Also, the WAL contains changes for every database in the server. Here logical decoding comes into play by providing changes only one database per slot through API with facilities to writing into an output plugin, which can return the data into any format you define. 
  • Replication slot. A replication slot is a sequence of changes that happened on the database. The slot manages the set of changes and sent through the plugin. There can be more than one slot in the database. PostgreSQL provides a set of FUNCTIONS and VIEWS for working with the Replication slot such as pg_create_logical_replication_slot or pg_replication_slots.
  • Output Plugin. A plugin is a library written in different programming languages, which accepts the changes and decodes the changes into a format you preferred. Plugins need to be compiled and installed before uses. Here is the list of all available plugins for PostgreSQL. 

Now, let's try to develop a simple Java Application for trying all the concepts we have learn before. What are we going to do?

  1. Configure the database.
  2. Develop a Java application to use the PostgreSQL Replication API to change data capture.
  3. Java application will use the "test_decoding" output plugin to print the changes into the console.
  4. Also, we will try the pGEasyRepliction project to get the CDC into JSON format.

Step 1. Configure the database. 

Enable the logical replication of the PostgreSQL database into the postgresql.conf file as shown below:

max_wal_senders = 4             # max number of walsender processes
wal_keep_segments = 4           # in logfile segments, 16MB each; 0 disables
wal_level = logical             # minimal, replica, or logical
max_replication_slots = 4       # max number of replication slots
Allow replication connections from localhost and set the replication privilages into the pg_hba.conf file as follows:

local   replication   all                   trust
host    replication   all   127.0.0.1/32    md5
host    replication   all   ::1/128         md5

Restart the database after making the chnages.

Step 2. Create two tables by the following DDL script.

create table dept(
  deptno integer,
  dname  text,
  loc    text,
  constraint pk_dept primary key (deptno)
);

create table emp(
  empno    integer,
  ename    text,
  job      text,
  mgr      integer,
  hiredate date,
  sal      integer,
  comm     integer,
  deptno   integer,
  constraint pk_emp primary key (empno),
  constraint fk_deptno foreign key (deptno) references dept (deptno)
);

CREATE UNIQUE INDEX ename_idx ON emp (ename);

Step 3. Add a few rows into the tables.

insert into dept
values(10, 'ACCOUNTING', 'NEW YORK');
insert into dept
values(20, 'RESEARCH', 'DALLAS');
insert into dept
values(30, 'SALES', 'CHICAGO');
insert into dept
values(40, 'OPERATIONS', 'BOSTON');

insert into emp
values(
 7839, 'KING', 'PRESIDENT', null,
 to_date('17-11-1981','dd-mm-yyyy'),
 5000, null, 10
);
insert into emp
values(
 7698, 'BLAKE', 'MANAGER', 7839,
 to_date('1-5-1981','dd-mm-yyyy'),
 2850, null, 30
);
insert into emp
values(
 7782, 'CLARK', 'MANAGER', 7839,
 to_date('9-6-1981','dd-mm-yyyy'),
 2450, null, 10
);
insert into emp
values(
 7566, 'JONES', 'MANAGER', 7839,
 to_date('2-4-1981','dd-mm-yyyy'),
 2975, null, 20
);
insert into emp
values(
 7788, 'SCOTT', 'ANALYST', 7566,
 to_date('13-07-87','dd-mm-rr') - 85,
 3000, null, 20
);

Step 4. Create a standalone Java application project. You can use your favoutite IDE or Maven/Gradle to build, compile and run the application. The entire project is located on Github

First, we have create a database connection in replication mode. In this mode, the connection is not available for execute SQL statements. 

InputStream is = new FileInputStream(rootPath+"application.properties");//PostgreSQLConsumer.class.getResourceAsStream(rootPath+"application.properties");

Properties config = new Properties();

config.load(is);

Properties props = new Properties();
PGProperty.USER.set(props, config.getProperty("db.user"));
PGProperty.PASSWORD.set(props, config.getProperty("db.password"));
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");

Connection con = DriverManager.getConnection(config.getProperty("db.url"), props);
PGConnection replConnection = con.unwrap(PGConnection.class);

In the above fragments of the code, we are reading the database connection properties from the application.properties file 

db.url=jdbc:postgresql://localhost:5432/postgres
db.user=postgres
db.password=postgres
repl.logical.slot=10

Step 5. Create a Replication slot via API

replConnection.getReplicationAPI()
        .createReplicationSlot()
        .logical()
        .withSlotName("demo_logical_slot_"+ config.getProperty("repl.logical.slot"))
        .withOutputPlugin("test_decoding")
        .make();

Note that, you can also create replication slot by SQL 

#Create a publication that publishes all changes in all tables:

CREATE PUBLICATION alltables FOR ALL TABLES;
#Create a publication that publishes all changes for table DEPT, but replicates only columns deptno and dname:

CREATE PUBLICATION DEPT_filtered FOR TABLE users (deptno, dname);
Step 6. Create a Replication Stream progamtically 

        PGReplicationStream stream =
                replConnection.getReplicationAPI()
                        .replicationStream()
                        .logical()
                        .withSlotName("demo_logical_slot_"+ config.getProperty("repl.logical.slot"))
                        .withSlotOption("include-xids", true)
                        .withSlotOption("skip-empty-xacts", true)
                        .withStatusInterval(20, TimeUnit.SECONDS)
                        .start();
The above replication stream will send the changes since the creation of the replication slot. 
Step 7. Recieve CDC event for further processing as follows:

while (true) {

    ByteBuffer msg = stream.readPending();

    if (msg == null) {
        TimeUnit.MILLISECONDS.sleep(10L);
        continue;
    }

    int offset = msg.arrayOffset();
    byte[] source = msg.array();
    int length = source.length - offset;
    logger.info(new String(source, offset, length));


    //feedback by LOG sequence Number
    stream.setAppliedLSN(stream.getLastReceiveLSN());
    stream.setFlushedLSN(stream.getLastReceiveLSN());

}
The above frugment of the code will recieve the CDC and print the event on console. Open the postgreSQL sqleditor and execute the following SQL query:

INSERT INTO public.dept (deptno,dname,loc) VALUES
	 (144,'RESEARCH1','DALLAS1');
update public.dept set dname='refresh' where deptno =144;
delete from public.dept where deptno = 144;

commit;
The output should be looks like these:

BEGIN 779
table public.dept: INSERT: deptno[integer]:144 dname[text]:'RESEARCH1' loc[text]:'DALLAS1'
table public.dept: UPDATE: deptno[integer]:144 dname[text]:'refresh' loc[text]:'DALLAS1'
table public.dept: DELETE: deptno[integer]:144
COMMIT 779

The "test_decoding" output plugin is very usefull as a starting point. You can install output plugin you prefered such as JSON, AVRO and continue develop your application. 

See the resource section to get the list of all plugins availabe for PostgreSQL. Moreover, I forked the pgEasyReplication project from the GitHub which can return the CDC events into JSON format. The library is very easy to use. For more information, see the App.Java class to know how to use the library. 


Resources:

  1. PostgreSQL output plugin list https://wiki.postgresql.org/wiki/Logical_Decoding_Plugins 
  2. PostgreSQL logical decoding concept.
  3. Physical and Logical replication API https://access.crunchydata.com/documentation/pgjdbc/42.1.1/replication.html 

Comments

Anonymous said…
Thanks for great post, pls can you tell is it possible to use this logical changes for event sourcing?

Popular posts from this blog

8 things every developer should know about the Apache Ignite caching

Any technology, no matter how advanced it is, will not be able to solve your problems if you implement it improperly. Caching, precisely when it comes to the use of a distributed caching, can only accelerate your application with the proper use and configurations of it. From this point of view, Apache Ignite is no different, and there are a few steps to consider before using it in the production environment. In this article, we describe various technics that can help you to plan and adequately use of Apache Ignite as cutting-edge caching technology. Do proper capacity planning before using Ignite cluster. Do paperwork for understanding the size of the cache, number of CPUs or how many JVMs will be required. Let’s assume that you are using Hibernate as an ORM in 10 application servers and wish to use Ignite as an L2 cache. Calculate the total memory usages and the number of Ignite nodes you have to need for maintaining your SLA. An incorrect number of the Ignite nodes can become a b...

Apache Ignite Baseline Topology by Examples

Ignite Baseline Topology or BLT represents a set of server nodes in the cluster that persists data on disk. Where, N1-2 and N5 server nodes are the member of the Ignite clusters with native persistence which enable data to persist on disk. N3-4 and N6 server nodes are the member of the Ignite cluster but not a part of the baseline topology. The nodes from the baseline topology are a regular server node, that store's data in memory and on the disk, and also participates in computing tasks. Ignite clusters can have different nodes that are not a part of the baseline topology such as: Server nodes that are not used Ignite native persistence to persist data on disk. Usually, they store data in memory or persists data to a 3rd party database or NoSQL. In the above equitation, node N3 or N4 might be one of them. Client nodes that are not stored shared data. To better understand the baseline topology concept, let’s start at the beginning and try to understand its goal and what ...

Benchmarking high performance java collection framework

I am an ultimate fan of java high performance framework or library. Java native collection framework always works with primitive wrapper class such as Integer, Float e.t.c. Boxing and unboxing of wrapper class to primitive data type always decrease the java execution performance. Most of us, always looking for such a library or framework to works with primitive data type in collections for increasing performance of Java application. Most of the time i uses javolution framework to get better performance, however, this holiday i have read about a few new java collections frameworks and decided to do some homework benchmarking to find out, how much they could better than Java native collection framework. I have examine two new java collection framework, one of them are fastutil and another one are HPPC. For benchmarking i have used java JMH with mode Throughput. For benchmarking i took similar collection for java ArrayList, HashSet and HasMap from two above described frameworks. Col...