Recently I was involved into a project for replicating data from PostgreSQL to Datalake. I heard about PostgreSQL logical data replication a few years ago and found a few hours to play with the Change data capture fuction of these popular database. Today's blog is all about PostgreSQL logical replication with a few fragments of code to test the functionality.
Change data capture is an architecture design principle that allows us to capture the recently data changed into the database in realtime. The changes could be any DML operations: Insert, update, delete. TThere are a few charactaritics of CDC as follow:
- Logical replication uses publish-subscribe model. Subscriber pulls the data from the publication they subscribe to and manipulate the data.
- Only commited transactions on WAL archive (transaction log file) will be return by the CDC.
- CDC will return on orderd form how they commited on the database.
- Logical CDC doesn't impact on database performance.
- From the PostgreSQL v15, you can get the logical changes from specific table rows/columns
- You can use a several output plugins to get the data into different formats such as JSON, AVRO, Binary etc.
PostgreSQL logical replication |
Note that, you can't use PostgreSQL logical and physical replication at the same time. The difference between physical replication and logical replication is that logical replication sends data over in a logical format whereas physical replication sends data over in a binary format.
Before dive into the PostgreSQL logical replication, it will be usefull to understand a few concepts:
- Publication. From the PostgreSQL documentation: A publication is a set of changes generated from a table or a group of tables, and might also be described as a change set or replication set. Each publication exists in only one database.
- Every publication can have multiple subscribers.
- A publication is created using the CREATE PUBLICATION command and may later be altered or dropped using corresponding commands.
- Logical decoding. From the PostgreSQL manual, "Logical decoding is the process of extracting all persistent changes to a database's tables into a coherent, easy to understand format which can be interpreted without detailed knowledge of the database's internal state." Transaction Log or the WAL archive format is binary and can change over time. Also, the WAL contains changes for every database in the server. Here logical decoding comes into play by providing changes only one database per slot through API with facilities to writing into an output plugin, which can return the data into any format you define.
- Replication slot. A replication slot is a sequence of changes that happened on the database. The slot manages the set of changes and sent through the plugin. There can be more than one slot in the database. PostgreSQL provides a set of FUNCTIONS and VIEWS for working with the Replication slot such as pg_create_logical_replication_slot or pg_replication_slots.
- Output Plugin. A plugin is a library written in different programming languages, which accepts the changes and decodes the changes into a format you preferred. Plugins need to be compiled and installed before uses. Here is the list of all available plugins for PostgreSQL.
Now, let's try to develop a simple Java Application for trying all the concepts we have learn before. What are we going to do?
- Configure the database.
- Develop a Java application to use the PostgreSQL Replication API to change data capture.
- Java application will use the "test_decoding" output plugin to print the changes into the console.
- Also, we will try the pGEasyRepliction project to get the CDC into JSON format.
max_wal_senders = 4 # max number of walsender processes wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables wal_level = logical # minimal, replica, or logical max_replication_slots = 4 # max number of replication slotsAllow replication connections from localhost and set the replication privilages into the pg_hba.conf file as follows:
local replication all trust host replication all 127.0.0.1/32 md5 host replication all ::1/128 md5
Restart the database after making the chnages.
Step 2. Create two tables by the following DDL script.
create table dept( deptno integer, dname text, loc text, constraint pk_dept primary key (deptno) ); create table emp( empno integer, ename text, job text, mgr integer, hiredate date, sal integer, comm integer, deptno integer, constraint pk_emp primary key (empno), constraint fk_deptno foreign key (deptno) references dept (deptno) ); CREATE UNIQUE INDEX ename_idx ON emp (ename);
Step 3. Add a few rows into the tables.
insert into dept values(10, 'ACCOUNTING', 'NEW YORK'); insert into dept values(20, 'RESEARCH', 'DALLAS'); insert into dept values(30, 'SALES', 'CHICAGO'); insert into dept values(40, 'OPERATIONS', 'BOSTON'); insert into emp values( 7839, 'KING', 'PRESIDENT', null, to_date('17-11-1981','dd-mm-yyyy'), 5000, null, 10 ); insert into emp values( 7698, 'BLAKE', 'MANAGER', 7839, to_date('1-5-1981','dd-mm-yyyy'), 2850, null, 30 ); insert into emp values( 7782, 'CLARK', 'MANAGER', 7839, to_date('9-6-1981','dd-mm-yyyy'), 2450, null, 10 ); insert into emp values( 7566, 'JONES', 'MANAGER', 7839, to_date('2-4-1981','dd-mm-yyyy'), 2975, null, 20 ); insert into emp values( 7788, 'SCOTT', 'ANALYST', 7566, to_date('13-07-87','dd-mm-rr') - 85, 3000, null, 20 );
Step 4. Create a standalone Java application project. You can use your favoutite IDE or Maven/Gradle to build, compile and run the application. The entire project is located on Github.
First, we have create a database connection in replication mode. In this mode, the connection is not available for execute SQL statements.
InputStream is = new FileInputStream(rootPath+"application.properties");//PostgreSQLConsumer.class.getResourceAsStream(rootPath+"application.properties"); Properties config = new Properties(); config.load(is); Properties props = new Properties(); PGProperty.USER.set(props, config.getProperty("db.user")); PGProperty.PASSWORD.set(props, config.getProperty("db.password")); PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4"); PGProperty.REPLICATION.set(props, "database"); PGProperty.PREFER_QUERY_MODE.set(props, "simple"); Connection con = DriverManager.getConnection(config.getProperty("db.url"), props); PGConnection replConnection = con.unwrap(PGConnection.class);
In the above fragments of the code, we are reading the database connection properties from the application.properties file
db.url=jdbc:postgresql://localhost:5432/postgres db.user=postgres db.password=postgres repl.logical.slot=10
Step 5. Create a Replication slot via API
replConnection.getReplicationAPI() .createReplicationSlot() .logical() .withSlotName("demo_logical_slot_"+ config.getProperty("repl.logical.slot")) .withOutputPlugin("test_decoding") .make();
Note that, you can also create replication slot by SQL
#Create a publication that publishes all changes in all tables: CREATE PUBLICATION alltables FOR ALL TABLES; #Create a publication that publishes all changes for table DEPT, but replicates only columns deptno and dname: CREATE PUBLICATION DEPT_filtered FOR TABLE users (deptno, dname);Step 6. Create a Replication Stream progamtically
PGReplicationStream stream = replConnection.getReplicationAPI() .replicationStream() .logical() .withSlotName("demo_logical_slot_"+ config.getProperty("repl.logical.slot")) .withSlotOption("include-xids", true) .withSlotOption("skip-empty-xacts", true) .withStatusInterval(20, TimeUnit.SECONDS) .start();
while (true) { ByteBuffer msg = stream.readPending(); if (msg == null) { TimeUnit.MILLISECONDS.sleep(10L); continue; } int offset = msg.arrayOffset(); byte[] source = msg.array(); int length = source.length - offset; logger.info(new String(source, offset, length)); //feedback by LOG sequence Number stream.setAppliedLSN(stream.getLastReceiveLSN()); stream.setFlushedLSN(stream.getLastReceiveLSN()); }
INSERT INTO public.dept (deptno,dname,loc) VALUES (144,'RESEARCH1','DALLAS1'); update public.dept set dname='refresh' where deptno =144; delete from public.dept where deptno = 144; commit;
BEGIN 779 table public.dept: INSERT: deptno[integer]:144 dname[text]:'RESEARCH1' loc[text]:'DALLAS1' table public.dept: UPDATE: deptno[integer]:144 dname[text]:'refresh' loc[text]:'DALLAS1' table public.dept: DELETE: deptno[integer]:144 COMMIT 779
The "test_decoding" output plugin is very usefull as a starting point. You can install output plugin you prefered such as JSON, AVRO and continue develop your application.
See the resource section to get the list of all plugins availabe for PostgreSQL. Moreover, I forked the pgEasyReplication project from the GitHub which can return the CDC events into JSON format. The library is very easy to use. For more information, see the App.Java class to know how to use the library.
Resources:
- PostgreSQL output plugin list https://wiki.postgresql.org/wiki/Logical_Decoding_Plugins
- PostgreSQL logical decoding concept.
- Physical and Logical replication API https://access.crunchydata.com/documentation/pgjdbc/42.1.1/replication.html
Comments