您好,登錄后才能下訂單哦!
這篇文章主要介紹oracle數(shù)據(jù)如何通過goldengate實時同步到kafka消息隊列中,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
組件版本
組件 | 版本 | 描述 |
源端oracle | oracle 11.2.0.4 for linux x64 | 源端oracle |
源端ogg | oracle ogg 11.2.0.1.20 for oracle linux x64 | 源端ogg,用于抽取源端oracle的數(shù)據(jù)變更,并將變更日志發(fā)送目標端 |
目標端kafka | kafka_2.11-0.11.0.2 for linux x64 | 消息隊列,接收目標端ogg推送過來的數(shù)據(jù) |
目標端ogg | 目標端ogg,接收源端發(fā)送的oracle事物變更日志,并將變更推送到kafka消息隊列中 |
1.OGG Manager
OGG Manager用于配置和管理其它OGG組件,配置數(shù)據(jù)抽取、數(shù)據(jù)推送、數(shù)據(jù)復(fù)制,啟動和停止相關(guān)組件,查看相關(guān)組件的運行情況。
2.數(shù)據(jù)抽取(Extract)
抽取源端數(shù)據(jù)庫的變更(DML, DDL)。數(shù)據(jù)抽取主要分如下幾種類型:本地抽取從本地數(shù)據(jù)庫捕獲增量變更數(shù)據(jù),寫入到本地Trail文件數(shù)據(jù)推送(Data Pump)從本地Trail文件讀取數(shù)據(jù),推送到目標端。初始數(shù)據(jù)抽取從數(shù)據(jù)庫表中導(dǎo)出全量數(shù)據(jù),用于初次數(shù)據(jù)加載
3.數(shù)據(jù)推送(Data Pump)
Data Pump是一種特殊的數(shù)據(jù)抽?。‥xtract)類型,從本地Trail文件中讀取數(shù)據(jù),并通過網(wǎng)絡(luò)將數(shù)據(jù)發(fā)送到目標端OGG
4.Trail文件
數(shù)據(jù)抽取從源端數(shù)據(jù)庫抓取到的事物變更信息會寫入到Trail文件。
5.數(shù)據(jù)接收(Collector)
數(shù)據(jù)接收程序運行在目標端機器,用于接收Data Pump發(fā)送過來的Trail日志,并將數(shù)據(jù)寫入到本地Trail文件。
6.數(shù)據(jù)復(fù)制(Replicat)
數(shù)據(jù)復(fù)制運行在目標端機器,從Trail文件讀取數(shù)據(jù)變更,并將變更數(shù)據(jù)應(yīng)用到目標端數(shù)據(jù)存儲系統(tǒng)。本案例中,數(shù)據(jù)復(fù)制將數(shù)據(jù)推送到kafka消息隊列。
7.檢查點(Checkpoint)
檢查點用于記錄數(shù)據(jù)庫事物變更。
源端Oracle數(shù)據(jù)庫配置
開啟源端歸檔
SQL> archive log list
Database log mode Archive Mode
Automatic archival Enabled
Archive destination /u01/app/oracle/product/11.2.3/db_1/dbs/arch
Oldest online log sequence 12
Next log sequence to archive 17
Current log sequence 17
若為打開歸檔解決如下:
conn / as sysdba (以DBA身份連接數(shù)據(jù)庫)
shutdown immediate (立即關(guān)閉數(shù)據(jù)庫)
startup mount (啟動實例并加載數(shù)據(jù)庫,但不打開)
alter database archivelog; (更改數(shù)據(jù)庫為歸檔模式)
alter database open; (打開數(shù)據(jù)庫)
alter system archive log start; (啟用自動歸檔)
2)OGG基于輔助日志等進行實時傳輸,故需要打開相關(guān)日志確??色@取事務(wù)內(nèi)容,通過下面的命令查看該狀態(tài)
SQL> select force_logging, supplemental_log_data_min,supplemental_log_data_all from v$database;
FORCE_LOGG SUPPLEMENTAL_LOG_DATA_MI
---------- ------------------------
YES YES
如果沒有開啟輔助日志,需要開啟
SQL> alter database force logging;
SQL> alter database add supplemental log data;
SQL>alter database add supplemental log data(all) columns;
3.開啟goldengate復(fù)制參數(shù)
SQL> alter system set enable_goldengate_replication = true;
4.創(chuàng)建源端Oracle賬號
SQL> create tablespace tbs_ogg datafile '/oradata/dtstack/tbs_ogg.dbf' size 1024M autoextend on;
SQL> create user ggsadmin identified by oracle default tablespace tbs_ogg;
SQL> grant dba to ggsadmin;
5.創(chuàng)建測試表 (生產(chǎn)略)
SQL> create table baiyang.ora_to_kfk as select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id < 500;
SQL> alter table baiyang.ora_to_kfk add constraint pk_kfk_obj primary key(object_id);
SQL> select count(*) from baiyang.ora_to_kfk;
COUNT(*)
----------
436
部署ogg
源端 (oracle源端)
1、解壓
先建立ogg目錄
mkdir -p /ogg
tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /ogg
chown -R oracle:oinstall /ogg (使oracle用戶有ogg的權(quán)限,后面有些需要在oracle用戶下執(zhí)行才能成功)
2配置ogg環(huán)境變量
為了簡單方便起見,建議在生產(chǎn)中配置oracle的環(huán)境變量文件/home/oracle/.bash_profile里配置
export JAVA_HOME=/usr/local/java1.8
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jar
export JAVA=$JAVA_HOME/bin/java
export OGG_HOME=/ogg
export PATH=$PATH:$OGG_HOME
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so
生效環(huán)境變量
source /home/oracle/.bash_profile
3、OGG初始化
ggsci
create subdirs
ggsci
Oracle GoldenGate Command Interpreter for Oracle
Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO
Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21
Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.
GGSCI (ambari.master.com) 1> create subdirs
Creating subdirectories under current directory /root
Parameter files /root/dirprm: created
Report files /root/dirrpt: created
Checkpoint files /root/dirchk: created
Process status files /root/dirpcs: created
SQL script files /root/dirsql: created
Database definitions files /root/dirdef: created
Extract data files /root/dirdat: created
Temporary files /root/dirtmp: created
Stdout files /root/dirout: created
4、配置源端Manager
GGSCI (dtproxy) 4> dblogin userid ggsadmin password oracle
GGSCI (dtproxy as ggsadmin@dtstack) 5> edit param ./globals
--添加
oggschema ggsadmin
GGSCI (dtproxy as ggsadmin@dtstack) 6> edit param mgr
----添加
PORT 7810 --默認監(jiān)聽端口
DYNAMICPORTLIST 7811-7820 --動態(tài)端口列表
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 --進程有問題,每3分鐘重啟一次,一共重啟五次
PURGEOLDEXTRACTS ./dirdat/, USECHECKPOINTS, MINKEEPDAYS 7 --/
LAGREPORTHOURS 1 --每隔一小時檢查一次傳輸延遲情況
LAGINFOMINUTES 30 --傳輸延時超過30分鐘將寫入錯誤日志
LAGCRITICALMINUTES 45 --傳輸延時超過45分鐘將寫入警告日志
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7 --定期清理trail文件
--ACCESSRULE, PROG , IPADDR 172..., ALLOW --設(shè)定172網(wǎng)段可連接
5、添加同步表級別日志
GGSCI (dtproxy as ggsadmin@dtstack) 9> add trandata baiyang.ora_to_kfk
GGSCI (dtproxy as ggsadmin@dtstack) 10> info trandata baiyang.ora_to_kfk
目標端 (kafka目標端)
1、 解壓
mkdir -p /ogg
unzip V839824-01.zip
tar xf ggs_Adapters_Linux_x64.tar -C /ogg/
2配置ogg環(huán)境變量
為了簡單方便起見,建議在生產(chǎn)中配置oracle的環(huán)境變量文件/home/oracle/.bash_profile里配置
export JAVA_HOME=/usr/local/java1.8/jre
export PATH=$JAVA_HOME/bin:$PATH
export LD_LIBRARY_PATH=$JAVA_HOME/lib/amd64/server:$JAVA_HOME/lib/amd64:$LD_LIBRARY_PATH
export OGG_HOME=/ogg
export PATH=$PATH:$OGG_HOME
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so
生效環(huán)境變量
source /home/oracle/.bash_profile
OGG初始化
ggsci
create subdirs
ggsci
Oracle GoldenGate Command Interpreter for Oracle
Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO
Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21
Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.
GGSCI (ambari.master.com) 1> create subdirs
Creating subdirectories under current directory /root
Parameter files /root/dirprm: created
Report files /root/dirrpt: created
Checkpoint files /root/dirchk: created
Process status files /root/dirpcs: created
SQL script files /root/dirsql: created
Database definitions files /root/dirdef: created
Extract data files /root/dirdat: created
Temporary files /root/dirtmp: created
Stdout files /root/dirout: created
配置源端Manager
GGSCI (dtproxy as ggsadmin@dtstack) 6> edit param mgr
----添加
PORT 7810 --默認監(jiān)聽端口
DYNAMICPORTLIST 7811-7820 --動態(tài)端口列表
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 --進程有問題,每3分鐘重啟一次,一共重啟五次
PURGEOLDEXTRACTS ./dirdat/, USECHECKPOINTS, MINKEEPDAYS 7
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7 --定期清理trail文件
--ACCESSRULE, PROG , IPADDR 172..., ALLOW --設(shè)定172網(wǎng)段可連接
GGSCI (172-16-101-242) 4> edit param ./GLOBALS
--添加
CHECKPOINTTABLE ggsadmin.checkpoint
全量數(shù)據(jù)同步(oracle to kafka)
1. 配置源端數(shù)據(jù)初始化
1) 配置源端初始化進程
GGSCI (dtproxy as ggsadmin@dtstack) 15> add extract initkfk,sourceistable
2) 配置源端初始化參數(shù)
GGSCI (dtproxy as ggsadmin@dtstack) 16> edit params initkfk
EXTRACT initkfk
SETENV (NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
USERID ggsadmin,PASSWORD oracle
RMTHOST 172.16.101.242, MGRPORT 7810
RMTFILE ./dirdat/ek,maxfiles 999, megabytes 500
table baiyang.ora_to_kfk;
3)源端生成表結(jié)構(gòu)define文件
GGSCI (dtproxy as ggsadmin@dtstack) 17> edit param define_kfk
-- 添加
defsfile /ogg/dirdef/define_kfk.txt
userid ggsadmin,password oracle
table baiyang.ora_to_kfk;
4)獲取oracle全量數(shù)據(jù)
$cd /ogg
$./defgen paramfile dirprm/define_kfk.prm
-- Definitions generated for 1 table in /oradata/oggorcl/ogg/dirdef/define_kfk.txt
5) 將獲取全量數(shù)據(jù)記錄傳送到目標端
- 將此文件傳輸?shù)侥繕硕蝑irdef文件夾
scp /ogg/dirdef/define_kfk.txt 172.16.101.242:/ogg/dirdef/define_kfk.txt
2、 配置目標端數(shù)據(jù)初始化進程
1) 配置目標端初始化進程
GGSCI (172-16-101-242) 3> ADD replicat initkfk,specialrun
2) 配置目標端初始化參數(shù)
GGSCI (172-16-101-242) 6> edit params initkfk
-- 添加
SPECIALRUN
end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
targetdb libfile libggjava.so set property=./dirprm/kafka.props
SOURCEDEFS ./dirdef/define_kfk.txt
REPLACEBADCHAR SKIP
SOURCECHARSET OVERRIDE ISO-8859-1
EXTFILE ./dirdat/ek
reportcount every 1 minutes, rate
grouptransops 10000
map baiyang.ora_to_kfk,target baiyang.ora_to_kfk;
3) 配置ogg 針對kafka相關(guān)參數(shù)
vi ./dirprm/kafka.props
--添加
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.format.includePrimaryKeys=true
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicName=test_ogg --舊版參數(shù),本次使用舊版參數(shù)
#gg.handler.kafkahandler.topicMappingTemplate=test_ogg –-新版本參數(shù)
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.classpath=dirprm/:/kafka/libs/*:/ogg/:/ogg/lib/*
kafka 安裝的位置 ogg安裝的位置
將./dirprm/kafka.props 文件復(fù)制到/ogg/AdapterExamples/big-data/kafka 目錄下
vi ./dirprm/custom_kafka_producer.properties
bootstrap.servers=172.16.101.242:9092 ---kafka地址
acks=-1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400 --數(shù)據(jù)有堆積
linger.ms=10000 --數(shù)據(jù)傳輸kafka有延時
將./dirprm/custom_kafka_producer.properties 文件復(fù)制到/ogg/AdapterExamples/big-data/kafka
3、 開啟抽取全量任務(wù)
源端:
GGSCI (dtproxy) 20> start mgr
GGSCI (dtproxy) 21> start initkfk
目標端全量數(shù)據(jù)應(yīng)用
GGSCI (172-16-101-242) 13> start mgr
cd /ogg
./replicat paramfile ./dirprm/initkfk.prm reportfile ./dirrpt/init01.rpt -p INITIALDATALOAD
--查看應(yīng)用日志是否有錯誤
cd /opt/ogg/dirrpt
more init01.rpt
4、 驗證kafka全量數(shù)據(jù)
cd /kafka
bin/kafka-console-consumer.sh --bootstrap-server 172.16.101.242:9092 --topic test_ogg --from-beginning
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:55.946000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"C_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":2,"DATA_OBJECT_ID":2,"OBJECT_TYPE":"CLUSTER"}}
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:56.289000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"I_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":3,"DATA_OBJECT_ID":3,"OBJECT_TYPE":"INDEX"}}
全量數(shù)據(jù)已經(jīng)同步到目標kafka topic test_ogg
增量數(shù)據(jù)同步(oracle to kafka)
源端配置
1. 源端抽取進程配置
GGSCI (dtproxy) 9> edit param extkfk
-- 添加
extract extkfk
dynamicresolution
SETENV (ORACLE_SID = "orcl")
SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ggsadmin,password oracle
FETCHOPTIONS NOUSESNAPSHOT
GETUPDATEBEFORES
NOCOMPRESSDELETES
NOCOMPRESSUPDATES
exttrail ./dirdat/to
table baiyang.ora_to_kfk;
2、添加extract進程
GGSCI (dtproxy) 10> add extract extkfk,tranlog,begin now
GGSCI (dtproxy) 11> add exttrail ./dirdat/to,extract extkfk
3、配置源端推送進程
GGSCI (dtproxy) 12> edit param pupkfk
-- 添加
extract pupkfk
passthru
dynamicresolution
userid ggsadmin,password oracle
rmthost 172.16.101.242 mgrport 7810
rmttrail ./dirdat/to
table baiyang.ora_to_kfk;
4、添加投遞進程
GGSCI (dtproxy) 13> add extract pupkfk,exttrailsource ./dirdat/to
GGSCI (dtproxy) 14> add rmttrail ./dirdat/to,extract pupkfk
目標端配置
1、 配置目標端恢復(fù)進程
edit param repkfk
-- 添加
REPLICAT repkfk
SOURCEDEFS ./dirdef/define_kfk.txt
targetdb libfile libggjava.so set property=./dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP baiyang.ora_to_kfk, TARGET baiyang.ora_to_kfk;
2、 添加trail文件到replicate進程
add replicat repkfk exttrail ./dirdat/to,checkpointtable ggsadmin.checkpoint
開啟增量實時數(shù)據(jù)抓取
源端:
./ggsci
GGSCI (dtproxy) 5> start extkfk
Sending START request to MANAGER ...
EXTRACT EXTKFK starting
GGSCI (dtproxy) 6> start pupkfk
Sending START request to MANAGER ...
EXTRACT PUPKFK starting
GGSCI (dtproxy) 7> status all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING EXTKFK 00:00:00 00:00:10
EXTRACT RUNNING PUPKFK 00:00:00 00:00:00
目標端:
/ggsci
GGSCI (172-16-101-242) 7> start replicat repkfk
Sending START request to MANAGER ...
REPLICAT REPKFK starting
GGSCI (172-16-101-242) 8> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING REPKFK 00:00:00 00:00:00
測試增量數(shù)據(jù)抓取
源端:
Oracle插入增量數(shù)據(jù)
SQL> insert into baiyang.ora_to_kfk select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id >500 and object_id < 1000;
SQL> commit;
SQL> select count(*) from baiyang.ora_to_kfk;
COUNT(*)
905
目標端:
查看Kafka消息隊列消費數(shù)據(jù)
cd /kafka
bin/kafka-console-consumer.sh --bootstrap-server 172.16.101.242:9092 --topic test_ogg
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042000","pos":"00000000000000075298","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS","SUBOBJECT_NAME":null,"OBJECT_ID":998,"DATA_OBJECT_ID":998,"OBJECT_TYPE":"TABLE"}}
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042001","pos":"00000000000000075459","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS_I","SUBOBJECT_NAME":null,"OBJECT_ID":999,"DATA_OBJECT_ID":999,"OBJECT_TYPE":"INDEX"}}
DDL操作
如果ogg 源端,也就是oracle 端的表增加字段或者刪除字段,或者修改字段等等,只要是修改表結(jié)構(gòu)定義的,就算是DDL操作,在ogg for bigdata 12.2 穩(wěn)定版本中,目前是不支持同步ddl語句的,在12,3版本以后會進行ddl支持。
在12.2 ogg for bigdata 中,源端如果做ddl,需要在源端的定義表結(jié)構(gòu)文件中重新生成define_kfk.txt文件的定義,并將define_kfk.txt文件傳輸?shù)侥繕硕酥小?/p>
舉例說明:
源端:(oracle端)
1) 源表添加id字段
alter table ORA_TO_KFK add id number;
2) ogg 源端需要重新生成表定義文件
mv /ogg/dirdef/define_kfk.txt /ogg/dirdef/define_kfk.txt.bak1
cd /ogg
/defgen paramfile dirprm/define_kfk.prm
3) 將生成的表定義文件scp 到目標端
cd /ogg
scp ./dirdef/define_kfk.txt root@192.168.56.57:/ogg/dirdef/
4) 源端抽取進程需要重啟
GGSCI (edsir1p9) 2> stop EXTKFK
Sending STOP request to EXTRACT EXTKFK ...
Request processed.
GGSCI (edsir1p9) 3> start EXTKFK
Sending START request to MANAGER ...
EXTRACT EXTKFK starting
GGSCI (edsir1p9) 4> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING EXTKFK 00:00:00 00:00:08
EXTRACT RUNNING PUPKFK 00:00:00 00:00:07
目標端:(kafka端)
1)查看目標端的應(yīng)用進程發(fā)生了abend
GGSCI (node) 38> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT ABENDED REPKFK 00:10:27 00:05:29
2)啟動復(fù)制進程
GGSCI (node) 40> start REPKFK
Sending START request to MANAGER ...
REPLICAT REPKFK starting
GGSCI (node) 9> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING REPKFK 00:00:00 00:00:04
測試:
源端插入一條數(shù)據(jù)
SQL> insert into ORA_TO_KFK(OWNER,OBJECT_NAME,OBJECT_ID,ID) values ('gg','gg',876,9);
1 row created.
SQL> commit;
目標端:
cd /kafka
bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.57:9092 --topic ogg_test
數(shù)據(jù)已經(jīng)從源端oracle同步到目標端kafka中。至此oracle新添加一列,可以正常同步到kafka中。
以上是“oracle數(shù)據(jù)如何通過goldengate實時同步到kafka消息隊列中”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。