Kafka JDBC connector and Oracle - direct reading and writing
Direct reading and writing data by Kafka from and to Oracle with or without data types
Connector setup - all as String
Connector setup with default data types but no date and timestamp
Connector setup with specification including date and timestamp
Connector to insert all messages as new rows
Connector as source without schema
Kafka time zone vs Oracle time zone
Introduction
The main goal of this post is to create an open documentation resource for myself and anyone else who needs guidance on setting up a sample integration between Kafka and Oracle
This guide will walk you through the process of setting up Kafka JDBC Source and Sink Connectors to integrate Kafka with Oracle Database. Using connectors available on Confluent Hub, I will demonstrate different configurations for reading and writing data, handling various data types, and ensuring data flows smoothly between Kafka and Oracle DB.
All of this has been tested on my personal docker containers with Kafka, Kafka Connect and Oracle DB
You can find Kafka JDBC connector documentation you can find here:
JDBC Source and Sink Connector for Confluent
Sample docker
here is my docker desktop setup which might be helpful in understanding below code
Here I want to thank you for introduction to Kafka knowledge to
Timotius Pamungkas for his udemy course: Kafka and Java Spring bootcamp
Simple connector setup - all is String
Oracle source table
CREATE TABLE fin_invoices_kafka_src (
row_id NUMBER(15) GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
update_time DATE NOT NULL,
invoice_id NUMBER(15) NOT NULL,
invoice_amount NUMBER(27, 5) NULL,
invoice_number VARCHAR2(50 CHAR) NULL,
invoice_date DATE NULL
);
There are 2 technical columns which would be used by connector to discover row update.
row_id
update_time - update timestamp
Invoice_id could be a business PK so there could be an idex setup on that column if neccessary
Above setup assumes that during loading procedure for all updates row would be updated.
Alternatively there could be only 1 column - kafka_id in case I would like to load every update as new row.
Kafka Source Connector setup
For purpose for reading updates I decided to use combination of unique row id and update time. Which is described as most robust and accurate mode.
More information about query modes are here: Incremental query modes
Important parameters:
table.whitelist need to be in upper casetransforms.Cast.spec
"ROW_ID:string,UPDATE_TIME:string,INVOICE_ID:string,INVOICE_AMOUNT:string,INVOICE_CURRENCY:string,INVOICE_NUMBER:string,INVOICE_DATE:string"
curl -x post http://localhost:8083/connectors
with following body in Postman app
{
"name": "source-jdbc-oracle-orclcdb-all-strings-fin_invoices_kafka_src",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"topic.prefix": "source-oracle-strings-",
"connection.url": "{{OracleConnectionUrl}}",
"connection.user": "{{OracleUser}}",
"connection.password": "{{OraclePassword}}",
"mode": "timestamp+incrementing",
"incrementing.column.name": "ROW_ID",
"timestamp.column.name": "UPDATE_TIME",
"table.whitelist": "FIN_INVOICES_KAFKA_SRC",
"numeric.mapping": "best_fit",
"poll.interval.ms": "10000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "Cast",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "ROW_ID:string,UPDATE_TIME:string,INVOICE_ID:string,INVOICE_AMOUNT:string,INVOICE_CURRENCY:string,INVOICE_NUMBER:string,INVOICE_DATE:string"
}
}
or from command line:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "source-jdbc-oracle-orclcdb-all-strings-fin_invoices_kafka_src",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"topic.prefix": "source-oracle-strings-",
"connection.url": "{{OracleConnectionUrl}}",
"connection.user": "{{OracleUser}}",
"connection.password": "{{OraclePassword}}",
"mode": "timestamp+incrementing",
"incrementing.column.name": "ROW_ID",
"timestamp.column.name": "UPDATE_TIME",
"table.whitelist": "FIN_INVOICES_KAFKA_SRC",
"numeric.mapping": "best_fit",
"poll.interval.ms": "10000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "Cast",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "ROW_ID:string,UPDATE_TIME:string,INVOICE_ID:string,INVOICE_AMOUNT:string,INVOICE_CURRENCY:string,INVOICE_NUMBER:string,INVOICE_DATE:string"
}
}'
Connector status
curl -x get http://localhost:8083/connectors
Result
[
"source-jdbc-oracle-orclcdb-all-strings-fin_invoices_kafka_src"
]
curl -x get http://localhost:8083/source-jdbc-oracle-orclcdb-all-strings-fin_invoices_kafka_src/status
Result
{
"name": "source-jdbc-oracle-orclcdb-all-strings-fin_invoices_kafka_src",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
}
],
"type": "source"
}
Test
insert row
insert into fin_invoices_kafka_src
(
update_time,
invoice_id,
invoice_amount,
invoice_number,
invoice_date
)
values (
sysdate,
45,
30,
'number-45',
trunc(sysdate)-1
);
COMMIT;
read topic
kafka-topics.sh --list --bootstrap-server localhost:9092
kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic source-oracle-string-FIN_INVOICES_KAFKA_SRC
results
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "ROW_ID"
},
{
"type": "string",
"optional": false,
"field": "UPDATE_TIME"
},
{
"type": "string",
"optional": false,
"field": "INVOICE_ID"
},
{
"type": "string",
"optional": true,
"field": "INVOICE_AMOUNT"
},
{
"type": "string",
"optional": true,
"field": "INVOICE_NUMBER"
},
{
"type": "string",
"optional": true,
"field": "INVOICE_DATE"
}
],
"optional": false,
"name": "FIN_INVOICES_KAFKA_SRC"
},
"payload": {
"ROW_ID": "1",
"UPDATE_TIME": "2024-07-27T15:48:27.000Z",
"INVOICE_ID": "45",
"INVOICE_AMOUNT": "30.00000",
"INVOICE_NUMBER": "number-45",
"INVOICE_DATE": "2024-07-26"
}
}
Update row
update in source table
UPDATE fin_invoices_kafka_src
SET INVOICE_AMOUNT = 42.20,
update_time = sysdate
WHERE INVOICE_ID = 45;
COMMIT;
result
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "ROW_ID"
},
{
"type": "string",
"optional": false,
"field": "UPDATE_TIME"
},
{
"type": "string",
"optional": false,
"field": "INVOICE_ID"
},
{
"type": "string",
"optional": true,
"field": "INVOICE_AMOUNT"
},
{
"type": "string",
"optional": true,
"field": "INVOICE_NUMBER"
},
{
"type": "string",
"optional": true,
"field": "INVOICE_DATE"
}
],
"optional": false,
"name": "FIN_INVOICES_KAFKA_SRC"
},
"payload": {
"ROW_ID": "1",
"UPDATE_TIME": "2024-07-27T15:50:21.000Z",
"INVOICE_ID": "45",
"INVOICE_AMOUNT": "42.20000",
"INVOICE_NUMBER": "number-45",
"INVOICE_DATE": "2024-07-26"
}
}
Oracle target table
Source topic already exists and contains record that is why at first we need to create target table before creating sink connector
CREATE TABLE fin_invoices_kafka_sink_str (
row_id VARCHAR2(50 CHAR),
update_time VARCHAR2(50 CHAR),
invoice_id VARCHAR2(50 CHAR),
invoice_amount VARCHAR2(50 CHAR),
invoice_number VARCHAR2(50 CHAR),
invoice_date VARCHAR2(50 CHAR)
);
Kafka sink connector
Important note: table.name.format needs to be in upper case
{
"name": "sink-jdbc-oracle-orclcdb-fin_invoices_kafka_src",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "{{OracleConnectionUrl}}",
"connection.user": "{{OracleUser}}",
"connection.password": "{{OraclePassword}}",
"topics": "source-oracle-strings-FIN_INVOICES_KAFKA_SRC",
"auto.create": "false",
"auto.evolve": "false",
"insert.mode": "upsert",
"pk.fields": "ROW_ID",
"pk.mode": "record_value",
"table.name.format": "FIN_INVOICES_KAFKA_SINK_STR",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true"
}
}
Results
source table
target table
Data types with default setup
Oracle source table
We will use exactly the same table as for previous example. if it exists and contains rows then you can skip that code. If you haven’t created and it then here is a code.
In case that you plan to read data types then it is important to specify size of the filed like NUMBER(15)
CREATE TABLE fin_invoices_kafka_src (
row_id NUMBER(15) GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
update_time DATE NOT NULL,
invoice_id NUMBER(15) NOT NULL,
invoice_amount NUMBER(27, 5) NULL,
invoice_number VARCHAR2(50 CHAR) NULL,
invoice_date DATE NULL
);
Kafka Source Connector with default data types
Source connector doesn’t require specification for int, date, timestamp however it requires casting specification for decimal values.
{
"name": "source-jdbc-oracle-orclcdb-data-types-fin_invoices_kafka_src",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"topic.prefix": "source-oracle-data-types-",
"connection.url": "{{OracleConnectionUrl}}",
"connection.user": "{{OracleUser}}",
"connection.password": "{{OraclePassword}}",
"mode": "timestamp+incrementing",
"incrementing.column.name": "ROW_ID",
"timestamp.column.name": "UPDATE_TIME",
"table.whitelist": "FIN_INVOICES_KAFKA_SRC",
"numeric.mapping": "best_fit",
"poll.interval.ms": "10000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "Cast",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "INVOICE_AMOUNT:float64"
}
}
Test
If table already exists and contains data then run this code
UPDATE fin_invoices_kafka_src
SET update_time = sysdate;
COMMIT;
if table does not exists then insert records is required
insert into fin_invoices_kafka_src
(
update_time,
invoice_id,
invoice_amount,
invoice_number,
invoice_date
)
values (
sysdate,
45,
42.20,
'number-45',
trunc(sysdate)-1
);
COMMIT;
Kafka topic
Take a look on payload part - all data types are stored properly, timestamp and date are stored as number but it can be formatted to show proper date (see below section)
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "ROW_ID"
},
{
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "UPDATE_TIME"
},
{
"type": "int64",
"optional": false,
"field": "INVOICE_ID"
},
{
"type": "double",
"optional": true,
"field": "INVOICE_AMOUNT"
},
{
"type": "string",
"optional": true,
"field": "INVOICE_NUMBER"
},
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "INVOICE_DATE"
}
],
"optional": false,
"name": "FIN_INVOICES_KAFKA_SRC"
},
"payload": {
"ROW_ID": 1,
"UPDATE_TIME": 1722082018000,
"INVOICE_ID": 45,
"INVOICE_AMOUNT": 42.2,
"INVOICE_NUMBER": "number-45",
"INVOICE_DATE": 1721952000000
}
}
Oracle target table
To write existing topic content to table at first table is required. In this case I would insert record with all data types except timestamp and date.
In case of setting insert.mode=upsert it is good to set PK in table
CREATE TABLE fin_invoices_kafka_sink_part (
row_id NUMBER(15) PRIMARY KEY,
update_time VARCHAR2(50 CHAR),
invoice_id NUMBER(15),
invoice_amount NUMBER(27, 5),
invoice_number VARCHAR2(50 CHAR),
invoice_date VARCHAR2(50 CHAR)
);
Kafka Sink Connector
Important note: table.name.format needs to be in upper case
date and timestamp columns are casted to string when inserted into table
"transforms.Cast.spec": "UPDATE_TIME:string,INVOICE_AMOUNT:float64,INVOICE_DATE:string"
{
"name": "sink-jdbc-oracle-orclcdb-data-types-no-dates-fin_invoices_kafka_src",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "{{OracleConnectionUrl}}",
"connection.user": "{{OracleUser}}",
"connection.password": "{{OraclePassword}}",
"topics": "source-oracle-data-types-FIN_INVOICES_KAFKA_SRC",
"auto.create": "false",
"auto.evolve": "false",
"insert.mode": "upsert",
"pk.fields": "ROW_ID",
"pk.mode": "record_value",
"table.name.format": "FIN_INVOICES_KAFKA_SINK_PART",
"transforms": "Cast",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "UPDATE_TIME:string,INVOICE_AMOUNT:float64,INVOICE_DATE:string",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true"
}
}
Results
Data types including timestamp and date - required proper casting and configuration
Oracle source table
We will use exactly the same table as for previous example. if it exists and contains rows then you can skip that code. If you haven’t created and it then here is a code.
In case that you plan to read data types then it is important to specify size of the filed like NUMBER(15)
CREATE TABLE fin_invoices_kafka_src (
row_id NUMBER(15) GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
update_time DATE NOT NULL,
invoice_id NUMBER(15) NOT NULL,
invoice_amount NUMBER(27, 5) NULL,
invoice_number VARCHAR2(50 CHAR) NULL,
invoice_date DATE NULL
);
Kafka Source Connector with default data types
Source connector doesn’t require specification for int, date, timestamp however it requires casting specification for decimal values.
{
"name": "source-jdbc-oracle-orclcdb-data-types-fin_invoices_kafka_src",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"topic.prefix": "source-oracle-data-types-",
"connection.url": "{{OracleConnectionUrl}}",
"connection.user": "{{OracleUser}}",
"connection.password": "{{OraclePassword}}",
"mode": "timestamp+incrementing",
"incrementing.column.name": "ROW_ID",
"timestamp.column.name": "UPDATE_TIME",
"table.whitelist": "FIN_INVOICES_KAFKA_SRC",
"numeric.mapping": "best_fit",
"poll.interval.ms": "10000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "Cast",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "INVOICE_AMOUNT:float64"
}
}
Oracle target table with all data types
In case of setting insert.mode=upsert it is good to set PK in table
CREATE TABLE FIN_INVOICES_KAFKA_SINK_DTYPE (
row_id NUMBER(15) PRIMARY KEY,
update_time DATE,
invoice_id NUMBER(15),
invoice_amount NUMBER(27, 5),
invoice_number VARCHAR2(50 CHAR),
invoice_date DATE
);
Kafka sink connector with all data types
Take a look on json fields transforms* which contains all casting details from kafka topic to oracle table.
My understanding is that at first date and timestamp are read from Kafka topic as string then are converted into proper data type format.
Casting of Update_time field from Kafka has type “Timestamp” to preserve time part of date. In Oracle it could be Date type field.
{
"name": "sink-jdbc-oracle-orclcdb-all-data_types-fin_invoices_kafka_src",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "{{OracleConnectionUrl}}",
"connection.user": "{{OracleUser}}",
"connection.password": "{{OraclePassword}}",
"topics": "source-oracle-data-types-FIN_INVOICES_KAFKA_SRC",
"auto.create": "false",
"auto.evolve": "false",
"insert.mode": "upsert",
"pk.fields": "ROW_ID",
"pk.mode": "record_value",
"table.name.format": "FIN_INVOICES_KAFKA_SINK_DTYPE",
"transforms": "Cast,TimestampConverterUpdateTs,TimestampConverterInvoiceDate",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "ROW_ID:int32,UPDATE_TIME:string,INVOICE_ID:int32,INVOICE_AMOUNT:float64,INVOICE_CURRENCY:string,INVOICE_NUMBER:string,INVOICE_DATE:string",
"transforms.TimestampConverterUpdateTs.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverterUpdateTs.target.type": "Timestamp",
"transforms.TimestampConverterUpdateTs.field": "UPDATE_TIME",
"transforms.TimestampConverterUpdateTs.format": "yyyy-MM-dd'T'HH:mm:ss",
"transforms.TimestampConverterInvoiceDate.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverterInvoiceDate.target.type": "Date",
"transforms.TimestampConverterInvoiceDate.field": "INVOICE_DATE",
"transforms.TimestampConverterInvoiceDate.format": "yyyy-MM-dd",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true"
}
}
Results of writing message with all data types
Connector with insert setup
Oracle target table with all data types no PK
CREATE TABLE FIN_INVOICES_KAFKA_SINK_ALL(
row_id NUMBER(15),
update_time DATE,
invoice_id NUMBER(15),
invoice_amount NUMBER(27, 5),
invoice_number VARCHAR2(50 CHAR),
invoice_date DATE
);
Kafka sink connector with all data types
Connector looks exactly the same except:
insert.mode = “insert”
In this case all updates would be inserted as new rows
{
"name": "sink-jdbc-oracle-orclcdb-all-data_types-insert-fin_invoices_kafka_src",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "{{OracleConnectionUrl}}",
"connection.user": "{{OracleUser}}",
"connection.password": "{{OraclePassword}}",
"topics": "source-oracle-data-types-FIN_INVOICES_KAFKA_SRC",
"auto.create": "false",
"auto.evolve": "false",
"insert.mode": "insert",
"pk.fields": "ROW_ID",
"pk.mode": "record_value",
"table.name.format": "FIN_INVOICES_KAFKA_SINK_ALL",
"transforms": "Cast,TimestampConverterUpdateTs,TimestampConverterInvoiceDate",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "ROW_ID:int32,UPDATE_TIME:string,INVOICE_ID:int32,INVOICE_AMOUNT:float64,INVOICE_CURRENCY:string,INVOICE_NUMBER:string,INVOICE_DATE:string",
"transforms.TimestampConverterUpdateTs.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverterUpdateTs.target.type": "Timestamp",
"transforms.TimestampConverterUpdateTs.field": "UPDATE_TIME",
"transforms.TimestampConverterUpdateTs.format": "yyyy-MM-dd'T'HH:mm:ss",
"transforms.TimestampConverterInvoiceDate.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverterInvoiceDate.target.type": "Date",
"transforms.TimestampConverterInvoiceDate.field": "INVOICE_DATE",
"transforms.TimestampConverterInvoiceDate.format": "yyyy-MM-dd",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true"
}
}
Results of writing message with all data types
All updates for the same invoice_id = 45 are inserted as new records into Oracle table. The only difference is update_time column
Connector to read without schema
There is an option to read Oracle table and all columns but without schema definition in message. However it makes it later more complex to save message back into Oracle or probably any other DB engines as well. There are also option to use schema registry but I have not tested it so I cannot provide any manual for it.
Source connector definition
{
"name": "source-jdbc-oracle-orclcdb-no-schema-float-fin_invoices_kafka_src",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"topic.prefix": "source-oracle-no-schema-float-",
"connection.url": "{{OracleConnectionUrl}}",
"connection.user": "{{OracleUser}}",
"connection.password": "{{OraclePassword}}",
"mode": "timestamp+incrementing",
"incrementing.column.name": "ROW_ID",
"timestamp.column.name": "UPDATE_TIME",
"table.whitelist": "FIN_INVOICES_KAFKA_SRC",
"numeric.mapping": "best_fit",
"poll.interval.ms": "10000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false" ,
"transforms": "Cast",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "INVOICE_AMOUNT:float64"
}
}
Sample messages
after checking Kafka topic
kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic source-oracle-no-schema-float-FIN_INVOICES_KAFKA_SRC
We can see following messages
{
"ROW_ID": 1,
"UPDATE_TIME": 1722164296000,
"INVOICE_ID": 45,
"INVOICE_AMOUNT": 52.2,
"INVOICE_NUMBER": "number-45",
"INVOICE_DATE": 1721952000000
}
{
"ROW_ID": 2,
"UPDATE_TIME": 1722164296000,
"INVOICE_ID": 46,
"INVOICE_AMOUNT": 34567.0,
"INVOICE_NUMBER": "number-46",
"INVOICE_DATE": 1721952000000
}
{
"ROW_ID": 3,
"UPDATE_TIME": 1722164296000,
"INVOICE_ID": 47,
"INVOICE_AMOUNT": 5000.0,
"INVOICE_NUMBER": "number-47",
"INVOICE_DATE": 1721952000000
}
Kafka time zone vs Oracle time zone
Put attention to time zone settings in Oracle and Kafka. Usually Kafka use UTC, Oracle use local time. In this case all values of timestamps in monitored column (UPDATE_TIME) should be in time zone related to Kafka. It also mean that update would be read by Kafka at time value from DB when this time would come in UTC.
This means that if Oracle time is UTC+1 and during update UPDATE_TIME column would have value UTC+1 then record would be read by Kafka 1 hour later, when Kafka clock would reach a value in UTC which is equal to value set in row in column UPDATE_TIME. It might also happen that Kafka would not read record, because timestamp would be smaller than last maximum value of timestamp read by Kafka
Example
ROW_ID = 1 was updated with value UTC+10 minutes
ROW_ID = 2 was inserted 3 minutes after ROW_ID = 1 has been updated and Kafka read that record immediately
ROW_ID = 3 was inserted with update_time value UTC -5 minutes. Kafka would not read that record until it would be updated to timestamp which is bigger than recent timestamp read by Kafka
Kafka messages
// Insert as of UTC 3 minutes after update was done, but it has been read first
{
"schema": {
...
},
"payload": {
"ROW_ID": "2",
"UPDATE_TIME": "2024-07-27T16:02:04.000Z",
"INVOICE_ID": "46",
"INVOICE_AMOUNT": "34567.00000",
"INVOICE_NUMBER": "number-46",
"INVOICE_DATE": "2024-07-26"
}
}
// UPDATE with UTC +10 minutes read after insert
{
"schema": {
...
},
"payload": {
"ROW_ID": "1",
"UPDATE_TIME": "2024-07-27T16:09:40.000Z",
"INVOICE_ID": "45",
"INVOICE_AMOUNT": "52.20000",
"INVOICE_NUMBER": "number-45",
"INVOICE_DATE": "2024-07-26"
}
}
Error handling
Check list of all connectors:
curl -x get http://localhost:8083/connectors
Get specific connector status
curl -x get http://localhost:8083/:CONNECTOR_NAME/status
Thank you for reading!