溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何使用ogg將Oracle數(shù)據(jù)傳輸?shù)絝lume刷到kafka

發(fā)布時(shí)間:2021-09-01 14:35:14 來(lái)源:億速云 閱讀:190 作者:chen 欄目:網(wǎng)絡(luò)安全

本篇內(nèi)容主要講解“如何使用ogg將Oracle數(shù)據(jù)傳輸?shù)絝lume刷到kafka”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“如何使用ogg將Oracle數(shù)據(jù)傳輸?shù)絝lume刷到kafka”吧!

源端測(cè)試服務(wù)器

服務(wù)器環(huán)境部署:

命令步驟如下:

[root@test ~]# groupadd oinstall

[root@test ~]# groupadd dba

[root@test ~]# useradd -g oinstall -G dba oracle

[root@test ~]#

修改權(quán)限:

[root@test ~]#  chown -R oracle:oinstall /data

[root@test ~]# 

2. 設(shè)置全局java環(huán)境變量

[root@test ~]# cat /etc/redhat-release 

CentOS release 6.4 (Final)

[root@test ~]#

[oracle@test data]$ tar -zxvf jdk-8u60-linux-x64.tar.gz

在root下執(zhí)行配置:

設(shè)置java環(huán)境變量:

vi /etc/profile

###jdk

export JAVA_HOME=/data/jdk1.8.0_60

export JAVA_BIN=/data/jdk1.8.0_60/bin

export PATH=$PATH:$JAVA_HOME/bin

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export JAVA_HOME JAVA_BIN PATH CLASSPATH

export LD_LIBRARY_PATH=/data/jdk1.8.0_60/jre/lib/amd64/server:$LD_LIBRARY_PATH

切換Oracle用戶核對(duì):

[root@test ~]# su - oracle

[oracle@test ~]$ java -version

java version "1.8.0_60"

Java(TM) SE Runtime Environment (build 1.8.0_60-b27)

Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)

[oracle@test ~]$

如果不生效:

修改java環(huán)境變量:

alternatives --install /usr/bin/java java /data/jdk1.8.0_60/bin/java 100 

alternatives --install /usr/bin/jar jar /data/jdk1.8.0_60/bin/jar 100 

alternatives --install /usr/bin/javac javac /data/jdk1.8.0_60/bin/javac 100 

update-alternatives --install /usr/bin/javac javac /data/jdk1.8.0_60/bin/javac 100

#  /usr/sbin/alternatives --config java

[root@test1 data]#   /usr/sbin/alternatives --config java

There are 4 programs which provide 'java'.

  Selection    Command

-----------------------------------------------

   1           /usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java

*+ 2           /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java

   3           /usr/lib/jvm/jre-1.5.0-gcj/bin/java

   4           /data/jdk1.8.0_60/bin/java

Enter to keep the current selection[+], or type selection number: 4

[root@test1 data]# /usr/sbin/alternatives --config java

There are 4 programs which provide 'java'.

  Selection    Command

-----------------------------------------------

   1           /usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java

*  2           /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java

   3           /usr/lib/jvm/jre-1.5.0-gcj/bin/java

 + 4           /data/jdk1.8.0_60/bin/java

Enter to keep the current selection[+], or type selection number: 

[root@test1 data]# 

[root@test1 data]# java -version

java version "1.8.0_60"

Java(TM) SE Runtime Environment (build 1.8.0_60-b27)

Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)

[root@test1 data]# 

修改flume 參數(shù)配置:

[oracle@test1 conf]$ cat flume-conf.properties

# Licensed to the Apache Software Foundation (ASF) under one

# or more contributor license agreements.  See the NOTICE file

# distributed with this work for additional information

# regarding copyright ownership.  The ASF licenses this file

# to you under the Apache License, Version 2.0 (the

# "License"); you may not use this file except in compliance

# with the License.  You may obtain a copy of the License at

#

#  http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing,

# software distributed under the License is distributed on an

# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

# KIND, either express or implied.  See the License for the

# specific language governing permissions and limitations

# under the License.

# The configuration file needs to define the sources, 

# the channels and the sinks.

# Sources, channels and sinks are defined per agent, 

# in this case called 'agent'

agent.sources = r1

agent.channels = fileChannel

agent.sinks = kafkaSink

# For each one of the sources, the type is defined

agent.sources.seqGenSrc.type = seq

# The channel can be defined as follows.

agent.sources.seqGenSrc.channels = fileChannel

#

agent.sources.r1.type = avro

agent.sources.r1.port = 14141

agent.sources.r1.bind = 192.168.88.66

agent.sources.r1.channels = fileChannel

# Each sink's type must be defined

agent.sinks.loggerSink.type = logger

#Specify the channel the sink should use

agent.sinks.loggerSink.channel = memoryChannel

#kafka sink

agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink

agent.sinks.kafkaSink.topic = my_schema 

agent.sinks.kafkaSink.brokerList = 192.168.88.1:9092,192.168.88.2:9092,192.168.88.3:9092,192.168.88.4:9092

agent.sinks.kafkaSink.requiredAcks = 1

agent.sinks.kafkaSink.batchSize = 20

agent.sinks.kafkaSink.channel = fileChannel

# Each channel's type is defined.

agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)

# can be defined as well

# In this case, it specifies the capacity of the memory channel

agent.channels.memoryChannel.capacity = 100

#File Channel

agent.channels.fileChannel.type = file

agent.channels.fileChannel.transactionCapacity = 20000000

agent.channels.fileChannel.capacity = 50000000

agent.channels.fileChannel.maxFileSize = 2147483648

agent.channels.fileChannel.minimumRequiredSpace = 52428800

agent.channels.fileChannel.keep-alive = 3

agent.channels.fileChannel.checkpointInterval = 20000

agent.channels.fileChannel.checkpointDir = /data/apache-flume-1.6.0-bin/CheckpointDir

agent.channels.fileChannel.dataDirs = /data/apache-flume-1.6.0-bin/DataDir

[oracle@test1 conf]$ 

############配置OGG

主庫(kù)在

源庫(kù)創(chuàng)建新的抽取進(jìn)程:

dblogin userid goldengate, password goldengate

add extract EXTJMS,tranlog, threads 2,begin now

add exttrail /data/goldengate/dirdat/kf, extract EXTJMS megabytes 200

add schematrandata my_schema

add trandata my_schema.*

原抽取進(jìn)程:

extract EXTJMS

setenv (ORACLE_SID="testdb")

setenv (NLS_LANG="AMERICAN_AMERICA.AL32UTF8") 

userid goldengate, password goldengate

TRANLOGOPTIONS DBLOGREADER

exttrail /data/goldengate/dirdat/kf

discardfile  /data/goldengate/dirrpt/EXTJMS.dsc,append

THREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000

numfiles 3000

CHECKPOINTSECS 20

DISCARDROLLOVER AT 05:30

dynamicresolution

GETUPDATEBEFORES

NOCOMPRESSUPDATES

NOCOMPRESSDELETES

RecoveryOptions OverwriteMode

ddl &

include mapped &

exclude objtype 'TRIGGER' &

exclude objtype 'PROCEDURE' &

exclude objtype 'FUNCTION' &

exclude objtype 'PACKAGE' &

exclude objtype 'PACKAGE BODY' &

exclude objtype 'TYPE' &

exclude objtype 'GRANT' &

exclude instr 'GRANT' &

exclude objtype 'DATABASE LINK' &

exclude objtype 'CONSTRAINT' &

exclude objtype 'JOB' &

exclude instr 'ALTER SESSION' &

exclude INSTR 'AS SELECT' &

exclude INSTR 'REPLACE SYNONYM' &

EXCLUDE OBJNAME "my_schema.DBMS_TABCOMP_TEMP_CMP" &

EXCLUDE OBJNAME "my_schema.DBMS_TABCOMP_TEMP_UNCMP"

FETCHOPTIONS  NOUSESNAPSHOT, USELATESTVERSION, MISSINGROW REPORT

TABLEEXCLUDE *.DBMS_TABCOMP_TEMP*;

--extract table user

TABLE my_schema.*;

SQL>  ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE INDEX) COLUMNS;

Database altered.

SQL>  ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (all) COLUMNS;

Database altered.

SQL> select SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI ,FORCE_LOGGING from v$database;

SUPPLEME SUP SUP FOR

-------- --- --- ---

YES      YES YES YES

SQL> 

源端添加新的pump進(jìn)程:

在my_schema源庫(kù)測(cè)試添加pump進(jìn)程:

添加pump進(jìn)程:

添加新的pump:

add extract EDPKF,exttrailsource /data/goldengate/dirdat/kf, begin now

edit param EDPKF

EXTRACT EDPKF

setenv (NLS_LANG = AMERICAN_AMERICA.AL32UTF8)

PASSTHRU

GETUPDATEBEFORES

NOCOMPRESSUPDATES

NOCOMPRESSDELETES

RecoveryOptions OverwriteMode

RMTHOST 192.168.88.66, MGRPORT 7839

RMTTRAIL /data/ogg_for_bigdata/dirdat/kp

DISCARDFILE ./dirrpt/EDPKF.dsc,APPEND,MEGABYTES 5

TABLE my_schema.* ;

add rmttrail /data/ogg_for_bigdata/dirdat/kp, extract EDPKF megabytes 200

edit  param defgen

userid goldengate, password goldengate

defsfile dirdef/my_schema.def

TABLE my_schema.*;

傳遞定義文件:

./defgen paramfile ./dirprm/defgen.prm

目標(biāo)端直接端

mgr:

PORT 7839

DYNAMICPORTLIST 7840-7850

--AUTOSTART replicat *

--AUTORESTART replicat *,RETRIES 5,WAITMINUTES 2

AUTORESTART ER *, RETRIES 3, WAITMINUTES 5, RESETMINUTES 10

PURGEOLDEXTRACTS /data/ogg_for_bigdata/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2

LAGREPORTHOURS 1

LAGINFOMINUTES 30

LAGCRITICALMINUTES 45

添加 UE DATA PUMP:

 使用版本:

Version 12.1.2.1.4 20470586 OGGCORE_12.1.2.1.0OGGBP_PLATFORMS_150303.1209

ADD EXTRACT LOANFLM, EXTTRAILSOURCE /data/ogg_for_bigdata/dirdat/kp

edit param JMSFLM

GGSCI (localhost.localdomain) 18> view param JMSFLM

EXTRACT JMSFLM

SETENV (GGS_USEREXIT_CONF ="dirprm/JMSFLM.props")

GetEnv (JAVA_HOME)

GetEnv (PATH)

GetEnv (LD_LIBRARY_PATH)

SourceDefs dirdef/my_schema.def

CUserExit libggjava_ue.so CUSEREXIT PassThru IncludeUpdateBefores

GetUpdateBefores

NoCompressDeletes

NoCompressUpdates

NoTcpSourceTimer

TABLEEXCLUDE my_schema.MV*;

TABLE my_schema.*;

--alter prodjms extseqno 736, extrba 0

注釋: 在目標(biāo)端完全可以不安裝Oracle數(shù)據(jù)庫(kù),可以和flume環(huán)境放在一起,最終刷數(shù)據(jù)到kafka的服務(wù)器接收消息。

本案例是 通過(guò)flume中轉(zhuǎn)實(shí)現(xiàn)的,完全沒(méi)有問(wèn)題。

當(dāng)然也可以直接將數(shù)據(jù)傳輸?shù)絢afka處理消息,原理都是一樣的。

未來(lái)更多的大數(shù)據(jù)融合也是一個(gè)不錯(cuò)的方案,無(wú)論是mysqlmongodb,hdfs等都可以完美結(jié)合。

 參數(shù)文件:

$ cat JMSFLM.props

gg.handlerlist=flumehandler

gg.handler.flumehandler.type=com.goldengate.delivery.handler.flume.FlumeHandler

gg.handler.flumehandler.host=192.168.88.66

gg.handler.flumehandler.port=14141

gg.handler.flumehandler.rpcType=avro

gg.handler.flumehandler.delimiter=\u0001

gg.handler.flumehandler.mode=op

gg.handler.flumehandler.includeOpType=true

# Indicates if the operation timestamp should be included as part of output in the delimited separated values

# true - Operation timestamp will be included in the output

# false - Operation timestamp will not be included in the output

# Default :- true

#gg.handler.flumehandler.includeOpTimestamp=true

#gg.handler.name.deleteOpKey=D

#gg.handler.name.updateOpKey=U

#gg.handler.name.insertOpKey=I

#gg.handler.name.pKUpdateOpKey=P

#gg.handler.name.includeOpType=true

# Optional properties to use the transaction grouping functionality

#gg.handler.flumehandler.maxGroupSize=1000

#gg.handler.flumehandler.minGroupSize=1000

### native library config ###

goldengate.userexit.nochkpt=TRUE

goldengate.userexit.timestamp=utc

goldengate.log.logname=cuserexit

goldengate.log.level=DEBUG

goldengate.log.tofile=true

goldengate.userexit.writers=javawriter

goldengate.log.level.JAVAUSEREXIT=DEBUG

#gg.brokentrail=true

gg.report.time=30sec

gg.classpath=/data/ogg_for_bigdata/dirprm/flumejar/*:/data/apache-flume-1.6.0-bin/lib/*

javawriter.stats.full=TRUE

javawriter.stats.display=TRUE

javawriter.bootoptions=-Xmx81920m -Xms20480m -Djava.class.path=/data/ogg_for_bigdata/ggjava/ggjava.jar -Dlog4j.configuration=/data/ogg_for_bigdata/cfg/log4j.properties

到此,相信大家對(duì)“如何使用ogg將Oracle數(shù)據(jù)傳輸?shù)絝lume刷到kafka”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI