溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

oracle數(shù)據(jù)如何通過goldengate實時同步到kafka消息隊列中

發(fā)布時間:2021-12-07 10:14:00 來源:億速云 閱讀:395 作者:小新 欄目:數(shù)據(jù)庫

這篇文章主要介紹oracle數(shù)據(jù)如何通過goldengate實時同步到kafka消息隊列中,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

環(huán)境介紹

組件版本

組件

版本

描述

源端oracle

oracle 11.2.0.4 for linux x64

源端oracle

源端ogg

oracle ogg 11.2.0.1.20 for oracle   linux x64

源端ogg,用于抽取源端oracle的數(shù)據(jù)變更,并將變更日志發(fā)送目標端

目標端kafka

kafka_2.11-0.11.0.2 for linux x64

消息隊列,接收目標端ogg推送過來的數(shù)據(jù)

目標端ogg


目標端ogg,接收源端發(fā)送的oracle事物變更日志,并將變更推送到kafka消息隊列中

 

整體架構(gòu)圖

 oracle數(shù)據(jù)如何通過goldengate實時同步到kafka消息隊列中

名詞解釋

1.OGG Manager

              OGG Manager用于配置和管理其它OGG組件,配置數(shù)據(jù)抽取、數(shù)據(jù)推送、數(shù)據(jù)復(fù)制,啟動和停止相關(guān)組件,查看相關(guān)組件的運行情況。

2.數(shù)據(jù)抽取(Extract

抽取源端數(shù)據(jù)庫的變更(DML, DDL)。數(shù)據(jù)抽取主要分如下幾種類型:本地抽取從本地數(shù)據(jù)庫捕獲增量變更數(shù)據(jù),寫入到本地Trail文件數(shù)據(jù)推送(Data Pump)從本地Trail文件讀取數(shù)據(jù),推送到目標端。初始數(shù)據(jù)抽取從數(shù)據(jù)庫表中導(dǎo)出全量數(shù)據(jù),用于初次數(shù)據(jù)加載

3.數(shù)據(jù)推送(Data Pump

              Data Pump是一種特殊的數(shù)據(jù)抽?。‥xtract)類型,從本地Trail文件中讀取數(shù)據(jù),并通過網(wǎng)絡(luò)將數(shù)據(jù)發(fā)送到目標端OGG

4.Trail文件

              數(shù)據(jù)抽取從源端數(shù)據(jù)庫抓取到的事物變更信息會寫入到Trail文件。

5.數(shù)據(jù)接收(Collector

數(shù)據(jù)接收程序運行在目標端機器,用于接收Data Pump發(fā)送過來的Trail日志,并將數(shù)據(jù)寫入到本地Trail文件。

6.數(shù)據(jù)復(fù)制(Replicat

              數(shù)據(jù)復(fù)制運行在目標端機器,從Trail文件讀取數(shù)據(jù)變更,并將變更數(shù)據(jù)應(yīng)用到目標端數(shù)據(jù)存儲系統(tǒng)。本案例中,數(shù)據(jù)復(fù)制將數(shù)據(jù)推送到kafka消息隊列。

7.檢查點(Checkpoint

              檢查點用于記錄數(shù)據(jù)庫事物變更。

操作步驟

源端Oracle數(shù)據(jù)庫配置

開啟源端歸檔

SQL> archive log list

Database log mode             Archive Mode

Automatic archival        Enabled

Archive destination              /u01/app/oracle/product/11.2.3/db_1/dbs/arch

Oldest online log sequence     12

Next log sequence to archive   17

Current log sequence          17

若為打開歸檔解決如下:

conn / as sysdba (以DBA身份連接數(shù)據(jù)庫)

shutdown immediate (立即關(guān)閉數(shù)據(jù)庫)

startup mount (啟動實例并加載數(shù)據(jù)庫,但不打開)

alter database archivelog; (更改數(shù)據(jù)庫為歸檔模式)

alter database open; (打開數(shù)據(jù)庫)

alter system archive log start; (啟用自動歸檔)

2)OGG基于輔助日志等進行實時傳輸,故需要打開相關(guān)日志確??色@取事務(wù)內(nèi)容,通過下面的命令查看該狀態(tài)

SQL> select force_logging, supplemental_log_data_min,supplemental_log_data_all from v$database;

FORCE_LOGG SUPPLEMENTAL_LOG_DATA_MI

---------- ------------------------

YES        YES

如果沒有開啟輔助日志,需要開啟

SQL> alter database force logging;

SQL> alter database add supplemental log data;

SQL>alter database add supplemental log data(all) columns;

3.開啟goldengate復(fù)制參數(shù)

SQL> alter system set enable_goldengate_replication = true;

4.創(chuàng)建源端Oracle賬號

 SQL> create tablespace tbs_ogg datafile '/oradata/dtstack/tbs_ogg.dbf' size 1024M autoextend on;

    SQL> create user ggsadmin identified by oracle default tablespace tbs_ogg;

    SQL> grant dba to ggsadmin;

5.創(chuàng)建測試表  (生產(chǎn)略) 

SQL> create table baiyang.ora_to_kfk as select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id < 500;

SQL> alter table baiyang.ora_to_kfk add constraint pk_kfk_obj primary key(object_id);

SQL> select count(*) from baiyang.ora_to_kfk;
    COUNT(*)

    ----------

        436

部署ogg

源端 (oracle源端)

 

1、解壓

先建立ogg目錄

 

mkdir -p /ogg

tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /ogg

chown -R oracle:oinstall /ogg (使oracle用戶有ogg的權(quán)限,后面有些需要在oracle用戶下執(zhí)行才能成功)

 

 

2配置ogg環(huán)境變量

為了簡單方便起見,建議在生產(chǎn)中配置oracle的環(huán)境變量文件/home/oracle/.bash_profile里配置

export JAVA_HOME=/usr/local/java1.8

export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH

export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jar

export JAVA=$JAVA_HOME/bin/java

export OGG_HOME=/ogg

export PATH=$PATH:$OGG_HOME

export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so

生效環(huán)境變量

source /home/oracle/.bash_profile

3、OGG初始化

ggsci

create subdirs

 

ggsci

Oracle GoldenGate Command Interpreter for Oracle

Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO

Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21

Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.

GGSCI (ambari.master.com) 1> create subdirs

Creating subdirectories under current directory /root

Parameter files                /root/dirprm: created

Report files                   /root/dirrpt: created

Checkpoint files               /root/dirchk: created

Process status files           /root/dirpcs: created

SQL script files               /root/dirsql: created

Database definitions files     /root/dirdef: created

Extract data files             /root/dirdat: created

Temporary files                /root/dirtmp: created

Stdout files                   /root/dirout: created

 

4、配置源端Manager

GGSCI (dtproxy) 4> dblogin userid ggsadmin password oracle

GGSCI (dtproxy as ggsadmin@dtstack) 5> edit param ./globals

--添加

oggschema ggsadmin

GGSCI (dtproxy as ggsadmin@dtstack) 6> edit param mgr

 ----添加

     PORT 7810 --默認監(jiān)聽端口

        DYNAMICPORTLIST  7811-7820 --動態(tài)端口列表

        AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 --進程有問題,每3分鐘重啟一次,一共重啟五次

        PURGEOLDEXTRACTS ./dirdat/, USECHECKPOINTS, MINKEEPDAYS 7  --/

        LAGREPORTHOURS 1 --每隔一小時檢查一次傳輸延遲情況

        LAGINFOMINUTES 30 --傳輸延時超過30分鐘將寫入錯誤日志

        LAGCRITICALMINUTES 45 --傳輸延時超過45分鐘將寫入警告日志

        PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7 --定期清理trail文件

        --ACCESSRULE, PROG , IPADDR 172..., ALLOW --設(shè)定172網(wǎng)段可連接

5、添加同步表級別日志

GGSCI (dtproxy as ggsadmin@dtstack) 9> add trandata baiyang.ora_to_kfk

GGSCI (dtproxy as ggsadmin@dtstack) 10> info trandata baiyang.ora_to_kfk

 

 

 

目標端 (kafka目標端)

 

1、 解壓

mkdir -p /ogg

unzip V839824-01.zip

tar xf ggs_Adapters_Linux_x64.tar  -C /ogg/

 

 

2配置ogg環(huán)境變量

為了簡單方便起見,建議在生產(chǎn)中配置oracle的環(huán)境變量文件/home/oracle/.bash_profile里配置

export JAVA_HOME=/usr/local/java1.8/jre

export PATH=$JAVA_HOME/bin:$PATH

export LD_LIBRARY_PATH=$JAVA_HOME/lib/amd64/server:$JAVA_HOME/lib/amd64:$LD_LIBRARY_PATH

export OGG_HOME=/ogg

export PATH=$PATH:$OGG_HOME

export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so

生效環(huán)境變量

source /home/oracle/.bash_profile

OGG初始化

ggsci

create subdirs

 

ggsci

Oracle GoldenGate Command Interpreter for Oracle

Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO

Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21

Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.

GGSCI (ambari.master.com) 1> create subdirs

Creating subdirectories under current directory /root

Parameter files                /root/dirprm: created

Report files                   /root/dirrpt: created

Checkpoint files               /root/dirchk: created

Process status files           /root/dirpcs: created

SQL script files               /root/dirsql: created

Database definitions files     /root/dirdef: created

Extract data files             /root/dirdat: created

Temporary files                /root/dirtmp: created

Stdout files                   /root/dirout: created

配置源端Manager

GGSCI (dtproxy as ggsadmin@dtstack) 6> edit param mgr

 ----添加

     PORT 7810 --默認監(jiān)聽端口

        DYNAMICPORTLIST  7811-7820 --動態(tài)端口列表

        AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 --進程有問題,每3分鐘重啟一次,一共重啟五次

        PURGEOLDEXTRACTS ./dirdat/, USECHECKPOINTS, MINKEEPDAYS 7  

        PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7 --定期清理trail文件

        --ACCESSRULE, PROG , IPADDR 172..., ALLOW --設(shè)定172網(wǎng)段可連接

GGSCI (172-16-101-242) 4> edit  param  ./GLOBALS

--添加

CHECKPOINTTABLE ggsadmin.checkpoint

全量數(shù)據(jù)同步(oracle to kafka)

1.    配置源端數(shù)據(jù)初始化

 

1)  配置源端初始化進程

GGSCI (dtproxy as ggsadmin@dtstack) 15> add extract initkfk,sourceistable

2)  配置源端初始化參數(shù)

  GGSCI (dtproxy as ggsadmin@dtstack) 16> edit params initkfk

    EXTRACT initkfk
 SETENV (NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
 USERID ggsadmin,PASSWORD oracle
 RMTHOST 172.16.101.242, MGRPORT 7810
 RMTFILE ./dirdat/ek,maxfiles 999, megabytes 500
  table baiyang.ora_to_kfk;

3)源端生成表結(jié)構(gòu)define文件

GGSCI (dtproxy as ggsadmin@dtstack) 17> edit param define_kfk

        -- 添加

    defsfile  /ogg/dirdef/define_kfk.txt

    userid ggsadmin,password oracle

    table baiyang.ora_to_kfk;

 4)獲取oracle全量數(shù)據(jù)

$cd /ogg    

$./defgen paramfile dirprm/define_kfk.prm

    -- Definitions generated for 1 table in /oradata/oggorcl/ogg/dirdef/define_kfk.txt

5) 將獲取全量數(shù)據(jù)記錄傳送到目標端

- 將此文件傳輸?shù)侥繕硕蝑irdef文件夾

    scp /ogg/dirdef/define_kfk.txt 172.16.101.242:/ogg/dirdef/define_kfk.txt

2、 配置目標端數(shù)據(jù)初始化進程

 

1) 配置目標端初始化進程

GGSCI (172-16-101-242) 3> ADD replicat initkfk,specialrun                         

2) 配置目標端初始化參數(shù)

 GGSCI (172-16-101-242) 6> edit params initkfk

     -- 添加

    SPECIALRUN

    end runtime

    setenv(NLS_LANG="AMERICAN_AMERICA.AL32UTF8")

    targetdb libfile libggjava.so set property=./dirprm/kafka.props

SOURCEDEFS ./dirdef/define_kfk.txt

REPLACEBADCHAR SKIP

SOURCECHARSET OVERRIDE ISO-8859-1

    EXTFILE ./dirdat/ek

    reportcount every 1 minutes, rate

    grouptransops 10000

map baiyang.ora_to_kfk,target baiyang.ora_to_kfk;

3) 配置ogg 針對kafka相關(guān)參數(shù)

vi ./dirprm/kafka.props

--添加

    gg.handlerlist=kafkahandler

gg.handler.kafkahandler.type=kafka

gg.handler.kafkahandler.format.includePrimaryKeys=true

gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties

gg.handler.kafkahandler.topicName=test_ogg  --舊版參數(shù),本次使用舊版參數(shù)

    #gg.handler.kafkahandler.topicMappingTemplate=test_ogg –-新版本參數(shù)

    gg.handler.kafkahandler.format=json

    gg.handler.kafkahandler.mode=op

gg.classpath=dirprm/:/kafka/libs/*:/ogg/:/ogg/lib/*

                     kafka 安裝的位置     ogg安裝的位置

將./dirprm/kafka.props 文件復(fù)制到/ogg/AdapterExamples/big-data/kafka 目錄下

vi ./dirprm/custom_kafka_producer.properties

bootstrap.servers=172.16.101.242:9092  ---kafka地址

    acks=-1

    compression.type=gzip

    reconnect.backoff.ms=1000

    value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

    key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

    batch.size=102400  --數(shù)據(jù)有堆積

    linger.ms=10000    --數(shù)據(jù)傳輸kafka有延時

將./dirprm/custom_kafka_producer.properties 文件復(fù)制到/ogg/AdapterExamples/big-data/kafka

3、 開啟抽取全量任務(wù)

 

源端:

GGSCI (dtproxy) 20>  start mgr

GGSCI (dtproxy) 21>  start initkfk

 

目標端全量數(shù)據(jù)應(yīng)用

GGSCI (172-16-101-242) 13> start mgr

cd /ogg

./replicat paramfile ./dirprm/initkfk.prm reportfile ./dirrpt/init01.rpt -p INITIALDATALOAD

--查看應(yīng)用日志是否有錯誤

cd /opt/ogg/dirrpt
more init01.rpt

 

 

 

4、 驗證kafka全量數(shù)據(jù)

cd /kafka

bin/kafka-console-consumer.sh --bootstrap-server 172.16.101.242:9092 --topic test_ogg --from-beginning

{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:55.946000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"C_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":2,"DATA_OBJECT_ID":2,"OBJECT_TYPE":"CLUSTER"}}

{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:56.289000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"I_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":3,"DATA_OBJECT_ID":3,"OBJECT_TYPE":"INDEX"}}

 

全量數(shù)據(jù)已經(jīng)同步到目標kafka topic test_ogg

增量數(shù)據(jù)同步(oracle to kafka)

源端配置

 

1.     源端抽取進程配置

GGSCI (dtproxy) 9> edit param extkfk

-- 添加

extract extkfk

dynamicresolution

SETENV (ORACLE_SID = "orcl")

SETENV (NLS_LANG = "american_america.AL32UTF8")

userid ggsadmin,password oracle

FETCHOPTIONS NOUSESNAPSHOT

GETUPDATEBEFORES

NOCOMPRESSDELETES

NOCOMPRESSUPDATES

exttrail ./dirdat/to

table baiyang.ora_to_kfk;

2、添加extract進程

GGSCI (dtproxy) 10> add extract extkfk,tranlog,begin now

GGSCI (dtproxy) 11> add exttrail ./dirdat/to,extract extkfk

3、配置源端推送進程

GGSCI (dtproxy) 12> edit param pupkfk

-- 添加

extract pupkfk

passthru

dynamicresolution

userid ggsadmin,password oracle

rmthost 172.16.101.242 mgrport 7810

rmttrail ./dirdat/to

table baiyang.ora_to_kfk;

4、添加投遞進程

GGSCI (dtproxy) 13>  add extract pupkfk,exttrailsource ./dirdat/to

GGSCI (dtproxy) 14>  add rmttrail ./dirdat/to,extract pupkfk

目標端配置

1、 配置目標端恢復(fù)進程

edit param repkfk

-- 添加

REPLICAT repkfk

SOURCEDEFS ./dirdef/define_kfk.txt

targetdb libfile libggjava.so set property=./dirprm/kafka.props

REPORTCOUNT EVERY 1 MINUTES, RATE

GROUPTRANSOPS 10000

MAP baiyang.ora_to_kfk, TARGET baiyang.ora_to_kfk;

2、 添加trail文件到replicate進程

add replicat repkfk exttrail ./dirdat/to,checkpointtable ggsadmin.checkpoint

開啟增量實時數(shù)據(jù)抓取

 

源端:

./ggsci

GGSCI (dtproxy) 5> start extkfk

Sending START request to MANAGER ...

EXTRACT EXTKFK starting

GGSCI (dtproxy) 6> start pupkfk

Sending START request to MANAGER ...

EXTRACT PUPKFK starting

GGSCI (dtproxy) 7> status all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING

EXTRACT     RUNNING     EXTKFK      00:00:00      00:00:10

EXTRACT     RUNNING     PUPKFK      00:00:00      00:00:00

 

目標端:

 

/ggsci

GGSCI (172-16-101-242) 7> start replicat repkfk

Sending START request to MANAGER ...

REPLICAT REPKFK starting

GGSCI (172-16-101-242) 8> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING

REPLICAT    RUNNING     REPKFK      00:00:00      00:00:00

 

 

測試增量數(shù)據(jù)抓取

 

源端:

Oracle插入增量數(shù)據(jù)


SQL> insert into baiyang.ora_to_kfk  select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id >500 and  object_id < 1000;

SQL> commit;

SQL> select count(*) from baiyang.ora_to_kfk;
COUNT(*)


    905

 

目標端:

查看Kafka消息隊列消費數(shù)據(jù)

cd /kafka

bin/kafka-console-consumer.sh --bootstrap-server 172.16.101.242:9092 --topic test_ogg

{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042000","pos":"00000000000000075298","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS","SUBOBJECT_NAME":null,"OBJECT_ID":998,"DATA_OBJECT_ID":998,"OBJECT_TYPE":"TABLE"}}

{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042001","pos":"00000000000000075459","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS_I","SUBOBJECT_NAME":null,"OBJECT_ID":999,"DATA_OBJECT_ID":999,"OBJECT_TYPE":"INDEX"}}

 

 

DDL操作

              如果ogg 源端,也就是oracle 端的表增加字段或者刪除字段,或者修改字段等等,只要是修改表結(jié)構(gòu)定義的,就算是DDL操作,在ogg for bigdata 12.2 穩(wěn)定版本中,目前是不支持同步ddl語句的,在12,3版本以后會進行ddl支持。

在12.2 ogg for bigdata 中,源端如果做ddl,需要在源端的定義表結(jié)構(gòu)文件中重新生成define_kfk.txt文件的定義,并將define_kfk.txt文件傳輸?shù)侥繕硕酥小?/p>

舉例說明:

源端:(oracle端)

1) 源表添加id字段

alter table ORA_TO_KFK add id number;

2) ogg 源端需要重新生成表定義文件

mv /ogg/dirdef/define_kfk.txt /ogg/dirdef/define_kfk.txt.bak1

cd /ogg

/defgen paramfile dirprm/define_kfk.prm

3) 將生成的表定義文件scp 到目標端

cd /ogg

scp ./dirdef/define_kfk.txt root@192.168.56.57:/ogg/dirdef/

4) 源端抽取進程需要重啟

  GGSCI (edsir1p9) 2> stop EXTKFK

Sending STOP request to EXTRACT EXTKFK ...

Request processed.

GGSCI (edsir1p9) 3> start EXTKFK

Sending START request to MANAGER ...

EXTRACT EXTKFK starting

GGSCI (edsir1p9) 4> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING                                          

EXTRACT     RUNNING     EXTKFK      00:00:00      00:00:08   

EXTRACT     RUNNING     PUPKFK      00:00:00      00:00:07 

目標端:(kafka端)

1)查看目標端的應(yīng)用進程發(fā)生了abend

GGSCI (node) 38> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING                                          

REPLICAT    ABENDED     REPKFK      00:10:27      00:05:29

2)啟動復(fù)制進程

GGSCI (node) 40> start REPKFK

Sending START request to MANAGER ...

REPLICAT REPKFK starting

GGSCI (node) 9> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING                                          

REPLICAT    RUNNING     REPKFK      00:00:00      00:00:04 

測試:

源端插入一條數(shù)據(jù)

SQL> insert into ORA_TO_KFK(OWNER,OBJECT_NAME,OBJECT_ID,ID) values ('gg','gg',876,9);

1 row created.

SQL> commit;

目標端:

cd /kafka

bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.57:9092 --topic ogg_test

數(shù)據(jù)已經(jīng)從源端oracle同步到目標端kafka中。至此oracle新添加一列,可以正常同步到kafka中。

以上是“oracle數(shù)據(jù)如何通過goldengate實時同步到kafka消息隊列中”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI