溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么使用DataX同步MaxCompute數(shù)據(jù)到TableStore

發(fā)布時間:2021-11-03 16:48:26 來源:億速云 閱讀:146 作者:柒染 欄目:建站服務(wù)器

這篇文章給大家介紹怎么使用DataX同步MaxCompute數(shù)據(jù)到TableStore,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

概述

現(xiàn)在越來越多的技術(shù)架構(gòu)下會組合使用MaxCompute和TableStore,用MaxCompute作大數(shù)據(jù)分析,計(jì)算的結(jié)果會導(dǎo)出到TableStore提供在線訪問。MaxCompute提供海量數(shù)據(jù)計(jì)算的能力,而TableStore提供海量數(shù)據(jù)高并發(fā)低延遲讀寫的能力。

將  MaxCompute 內(nèi)數(shù)據(jù)導(dǎo)出至TableStore,目前可選的幾種主要途徑包括:

自己編寫工具:使用MaxCompute SDK通過Tunnel讀取表數(shù)據(jù),再通過TableStore SDK再寫入數(shù)據(jù)。
DataX :自己在服務(wù)器上托管執(zhí)行DataX任務(wù)。
使用數(shù)據(jù)集成服務(wù):其系統(tǒng)底層也是DataX,額外提供了服務(wù)化以及分布式的能力。
其中第二種是我們最常推薦給用戶做臨時的數(shù)據(jù)導(dǎo)出使用的,如果沒有需要對數(shù)據(jù)做特殊處理的需求,我們一般不推薦第一種途徑。

DataX在阿里集團(tuán)內(nèi)部已經(jīng)應(yīng)用了很多年,經(jīng)歷了多次雙十一的考驗(yàn),是一個穩(wěn)定、易用、高效的工具。隨著MaxCompute上結(jié)果數(shù)據(jù)越來越龐大,數(shù)據(jù)導(dǎo)出的速率越來越被看重,海量的數(shù)據(jù)需要在基線內(nèi)完成導(dǎo)出。本篇文章,主要會介紹幾種優(yōu)化手段,以提高使用DataX來進(jìn)行MaxCompute向TableStore數(shù)據(jù)導(dǎo)出的吞吐量。

優(yōu)化過程

我們會以實(shí)際的場景,來演示如何通過一步步的優(yōu)化,提升數(shù)據(jù)導(dǎo)出的速度。在數(shù)據(jù)導(dǎo)出的整個鏈路上,主要有三個環(huán)節(jié),一是MaxCompute數(shù)據(jù)通道的讀,二是DataX的數(shù)據(jù)交換,三是TableStore的在線寫,這三個環(huán)節(jié)任意一個成為瓶頸,都會影響導(dǎo)出的速度。

MaxCompute數(shù)據(jù)通道的讀的性能比較高,一般不會成為瓶頸,本文主要是針對后兩個環(huán)節(jié)來優(yōu)化。優(yōu)化的核心指導(dǎo)方針就是:1. 提高并發(fā),2. 降低寫入延遲。接下來列舉的幾種優(yōu)化手段,也是圍繞這兩點(diǎn),來不斷進(jìn)行優(yōu)化。

實(shí)驗(yàn)選擇使用TableStore的測試環(huán)境,在MaxCompute上,我們會創(chuàng)建一張表并準(zhǔn)備1億行數(shù)據(jù)。TableStore的測試環(huán)境規(guī)模以及DataX Job宿主機(jī)的規(guī)格都較小,所以整個實(shí)驗(yàn)最終達(dá)到的速率是比較小的,主要為了演示速率如何提升。而在真實(shí)的TableStore生產(chǎn)環(huán)境上,規(guī)模足夠的情況下,我們幫助過應(yīng)用優(yōu)化到每秒上百M(fèi)甚至上G的速度,優(yōu)化手段相同。

數(shù)據(jù)準(zhǔn)備
首先在MaxCompute內(nèi)創(chuàng)建如下表:

md5 string,
  userid string,
  name string,
  comments string,
  attr0 string,
  attr1 string,
  attr2 string,
  attr3 string,
  create_time string,
  udpate_time string
);

其次在表內(nèi)倒入1億行數(shù)據(jù),每行數(shù)據(jù)約200個字節(jié),其中userid列采用隨機(jī)值,計(jì)算出的md5值取4個字節(jié)作為md5列,數(shù)據(jù)樣例如下:

怎么使用DataX同步MaxCompute數(shù)據(jù)到TableStore

測試數(shù)據(jù)導(dǎo)入使用的是MaxCompute Tunnel,速度還是比較可觀的。

數(shù)據(jù)準(zhǔn)備完畢后,在TableStore上創(chuàng)建一張表,使用md5和userid作為主鍵列:

TableMeta tableMeta = new TableMeta("DataTable");
  tableMeta.addPrimaryKeyColumn("md5", PrimaryKeyType.STRING);
  tableMeta.addPrimaryKeyColumn("userid", PrimaryKeyType.STRING);
  CapacityUnit capacityUnit = new CapacityUnit(0, 0);
  CreateTableRequest request = new CreateTableRequest();
  request.setTableMeta(tableMeta);
  request.setReservedThroughput(capacityUnit);
  ots.createTable(request);

表和數(shù)據(jù)均準(zhǔn)備完畢后,使用如下DataX Job配置類進(jìn)行一次數(shù)據(jù)導(dǎo)出:

"job": {
        "setting": {
            "speed": {
                "channel": "1"
            }
        },
        "content": [
            {
                "reader": {
                    "name": "odpsreader",
                    "parameter": {
                        "accessId": "accessid",
                        "accessKey": "accesskey",
                        "project": "aliyun_ots_dev",
                        "table": "data_for_ots",
                        "partition": [],
                        "column": ["md5","userid","name","comments","attr0","attr1","attr2","attr3","create_time","udpate_time"],
                        "packageAuthorizedProject": "",
                        "splitMode": "record",
                        "odpsServer": "****",
                        "tunnelServer": "****"
                    }
                },
                "writer": {
                    "name": "otswriter",
                    "parameter": {
                        "endpoint":"http://data-import-test.cn-hangzhou.ots.aliyuncs.com",
                        "accessId":"accessid",
                        "accessKey":"accesskey",
                        "instanceName":"data-import-test",
                        "table":"DataTable",
                        "primaryKey":[
                            {"name":"md5", "type":"string"},
                            {"name":"userid", "type":"string"}
                        ],
                        "column":[
                            {"name":"name","type":"string"},
                            {"name":"comments","type":"string"},
                            {"name":"attr0","type":"string"},
                            {"name":"attr1","type":"string"},
                            {"name":"attr2","type":"string"},
                            {"name":"attr3","type":"string"},
                            {"name":"create_time","type":"string"},
                            {"name":"update_time","type":"string"}
                        ],
                        "writeMode":"UpdateRow"
                    }
                }
            }
        ]
    }
}

啟動DataX任務(wù),從標(biāo)準(zhǔn)輸出中可以看到當(dāng)前數(shù)據(jù)導(dǎo)出的速度:

2017-02-07 08:41:49.285 [job-0] INFO  StandAloneJobContainerCommunicator - Total 271520 records, 55194052 bytes | Speed 1.05MB/s, 5404 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 4.501s |  All Task WaitReaderTime 47.815s | Percentage 0.00%
2017-02-07 08:41:59.286 [job-0] INFO  StandAloneJobContainerCommunicator - Total 324640 records, 65992457 bytes | Speed 1.03MB/s, 5312 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 5.474s |  All Task WaitReaderTime 55.068s | Percentage 0.00%
2017-02-07 08:42:09.288 [job-0] INFO  StandAloneJobContainerCommunicator - Total 377600 records, 76758462 bytes | Speed 1.03MB/s, 5296 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 6.479s |  All Task WaitReaderTime 62.297s | Percentage 0.00%
2017-02-07 08:42:19.289 [job-0] INFO  StandAloneJobContainerCommunicator - Total 431072 records, 87628377 bytes | Speed 1.04MB/s, 5347 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 7.469s |  All Task WaitReaderTime 69.559s | Percentage 0.00%
2017-02-07 08:42:29.290 [job-0] INFO  StandAloneJobContainerCommunicator - Total 484672 records, 98524462 bytes | Speed 1.04MB/s, 5360 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 8.421s |  All Task WaitReaderTime 76.892s | Percentage 0.00%
2017-02-07 08:42:39.292 [job-0] INFO  StandAloneJobContainerCommunicator - Total 538144 records, 109394175 bytes | Speed 1.04MB/s, 5347 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 9.428s |  All Task WaitReaderTime 83.889s | Percentage 0.00%

可以看到,當(dāng)前的速度大約是1MB/s,接下來會演示如何進(jìn)行優(yōu)化,一步一步將速度給提升上去。

一:配置合理的DataX基礎(chǔ)參數(shù)
第一步是對DataX的幾個基礎(chǔ)參數(shù)進(jìn)行調(diào)優(yōu),先大致了解下一個DataX Job內(nèi)部,任務(wù)的運(yùn)行結(jié)構(gòu):

怎么使用DataX同步MaxCompute數(shù)據(jù)到TableStore

一個DataX Job會切分成多個Task,每個Task會按TaskGroup進(jìn)行分組,一個Task內(nèi)部會有一組Reader->Channel->Writer。Channel是連接Reader和Writer的數(shù)據(jù)交換通道,所有的數(shù)據(jù)都會經(jīng)由Channel進(jìn)行傳輸。

在DataX內(nèi)部對每個Channel會有嚴(yán)格的速度控制,默認(rèn)的速度限制是1MB/s,這也是為何我們使用默認(rèn)配置,速度為1MB/s的原因。所以第一個需要優(yōu)化的基礎(chǔ)參數(shù)就是單個Channel的速度限制,更改配置如下:

"core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 5242880
                }
            }
        }
    },
    "job": {
        ...
    }
}

我們把單個Channel的速度上限配置為5MB。這個值需要針對不同的場景進(jìn)行不同的配置,例如對于MaxCompute,單個Channel的速度可以達(dá)到幾十MB,對于TableStore,在列較小較多的場景下,單個Channel的速度是幾MB,而在列較大的場景下,可能速度就會上到幾十MB。

我們當(dāng)前默認(rèn)配置中配置啟動的Job內(nèi)Channel數(shù)為1,要提高速度,并發(fā)必須提高,這個是第二步要做的優(yōu)化。但是在做第二個優(yōu)化之前,還需要調(diào)整一個基礎(chǔ)參數(shù),那就是DataX Job啟動的JVM的內(nèi)存大小配置。

目前DataX啟動的JVM默認(rèn)的配置是"-Xms1g -Xmx1g",當(dāng)一個Job內(nèi)Channel數(shù)變多后,內(nèi)存的占用會顯著增加,因?yàn)镈ataX作為數(shù)據(jù)交換通道,在內(nèi)存中會緩存較多的數(shù)據(jù),例如Channel中會有一個Buffer,作為臨時的數(shù)據(jù)交換的緩沖區(qū),而在部分Reader和Writer的中,也會存在一些Buffer。

調(diào)整JVM參數(shù)的方式有兩種,一種是直接更改datax.py,另一種是在啟動的時候,加上對應(yīng)的參數(shù),如下:

python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" ots.json

通常我們建議將內(nèi)存設(shè)置為4G或者8G,這個也可以根據(jù)實(shí)際情況來調(diào)整。

在優(yōu)化完單Channel的限速和JVM的內(nèi)存參數(shù)之后,我們重新跑一下任務(wù):

2017-02-07 08:44:53.188 [job-0] INFO  StandAloneJobContainerCommunicator - Total 153920 records, 31289079 bytes | Speed 1.67MB/s, 8608 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 2.873s |  All Task WaitReaderTime 12.098s | Percentage 0.00%
2017-02-07 08:45:03.189 [job-0] INFO  StandAloneJobContainerCommunicator - Total 256064 records, 52051995 bytes | Speed 1.98MB/s, 10214 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 4.892s |  All Task WaitReaderTime 17.194s | Percentage 0.00%
2017-02-07 08:45:13.191 [job-0] INFO  StandAloneJobContainerCommunicator - Total 360864 records, 73356370 bytes | Speed 2.03MB/s, 10480 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 9.221s |  All Task WaitReaderTime 19.192s | Percentage 0.00%
2017-02-07 08:45:23.192 [job-0] INFO  StandAloneJobContainerCommunicator - Total 464384 records, 94400221 bytes | Speed 2.01MB/s, 10352 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 11.754s |  All Task WaitReaderTime 22.278s | Percentage 0.00%
2017-02-07 08:45:33.194 [job-0] INFO  StandAloneJobContainerCommunicator - Total 570176 records, 115905214 bytes | Speed 2.05MB/s, 10579 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 14.827s |  All Task WaitReaderTime 25.367s | Percentage 0.00%
2017-02-07 08:45:43.195 [job-0] INFO  StandAloneJobContainerCommunicator - Total 675328 records, 137281049 bytes | Speed 2.04MB/s, 10515 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 18.515s |  All Task WaitReaderTime 27.810s | Percentage 0.00%
2017-02-07 08:45:53.197 [job-0] INFO  StandAloneJobContainerCommunicator - Total 778752 records, 158304152 bytes | Speed 2.00MB/s, 10342 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 20.403s |  All Task WaitReaderTime 32.418s | Percentage 0.00%

當(dāng)前數(shù)據(jù)導(dǎo)出的速度已經(jīng)從1MB提升到2MB。

二:提升DataX Job內(nèi)Channel并發(fā)
在上一點(diǎn)中指出,當(dāng)前Job內(nèi)部,只有單個Channel在執(zhí)行導(dǎo)出任務(wù),而要提升速率,要做的就是提升Channel的并發(fā)數(shù)。

DataX內(nèi)部對每個Channel會做限速,可以限制每秒byte數(shù),也可以限制每秒record數(shù)。除了對每個Channel限速,在全局還會有一個速度限制的配置,默認(rèn)是不限。

提升Channel并發(fā)數(shù)有三種途徑:

1, 配置全局Byte限速以及單Channel Byte限速,Channel個數(shù) = 全局Byte限速 / 單Channel Byte限速。(下面示例中最終Channel個數(shù)為10)

"core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 1048576
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "byte" : 10485760
            }
        },
        ...
    }
}

2,配置全局Record限速以及單Channel Record限速,Channel個數(shù) = 全局Record限速 / 單Channel Record限速。(下面示例中最終Channel個數(shù)為3)

"core": {
        "transport": {
            "channel": {
                "speed": {
                    "record": 100
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "record" : 300
            }
        },
        ...
    }
}

3, 全局不限速,直接配置Channel個數(shù)。(下面示例中最終Channel個數(shù)為5)

"core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 1048576
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "channel" : 5
            }
        },
        ...
    }
}

第三種方式最簡單直接,但是這樣就缺少了全局的限速。在選擇Channel個數(shù)時,同樣需要注意,Channel個數(shù)并不是越多越好。Channel個數(shù)的增加,帶來的是更多的CPU消耗以及內(nèi)存消耗。如果Channel并發(fā)配置過高導(dǎo)致JVM內(nèi)存不夠用,會出現(xiàn)的情況是發(fā)生頻繁的Full GC,導(dǎo)出速度會驟降,適得其反。

可以在DataX的輸出日志中,找到本次任務(wù)的Channel的數(shù):

2017-02-07 13:27:45.016 [job-0] INFO  JobContainer - DataX Reader.Job [odpsreader] splits to [15] tasks.
2017-02-07 13:27:45.017 [job-0] INFO  OtsWriterMasterProxy - Begin split and MandatoryNumber : 15
2017-02-07 13:27:45.025 [job-0] INFO  OtsWriterMasterProxy - End split.
2017-02-07 13:27:45.025 [job-0] INFO  JobContainer - DataX Writer.Job [otswriter] splits to [15] tasks.

在我們這次實(shí)驗(yàn)中,我們把Channel數(shù)直接配置為10,再進(jìn)行一次導(dǎo)出:

2017-02-07 08:58:24.366 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2465984 records, 501286700 bytes | Speed 9.19MB/s, 47414 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 362.875s |  All Task WaitReaderTime 378.978s | Percentage 0.00%
2017-02-07 08:58:34.368 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2941792 records, 598009404 bytes | Speed 9.22MB/s, 47580 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 459.910s |  All Task WaitReaderTime 379.002s | Percentage 0.00%
2017-02-07 08:58:44.369 [job-0] INFO  StandAloneJobContainerCommunicator - Total 3436064 records, 698484741 bytes | Speed 9.58MB/s, 49427 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 556.324s |  All Task WaitReaderTime 379.026s | Percentage 0.00%
2017-02-07 08:58:54.371 [job-0] INFO  StandAloneJobContainerCommunicator - Total 3905856 records, 793982836 bytes | Speed 9.11MB/s, 46979 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 652.749s |  All Task WaitReaderTime 379.050s | Percentage 0.00%
2017-02-07 08:59:04.372 [job-0] INFO  StandAloneJobContainerCommunicator - Total 4384512 records, 891284760 bytes | Speed 9.28MB/s, 47865 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 749.464s |  All Task WaitReaderTime 379.074s | Percentage 0.00%
2017-02-07 08:59:14.373 [job-0] INFO  StandAloneJobContainerCommunicator - Total 4875136 records, 991017582 bytes | Speed 9.51MB/s, 49062 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 846.522s |  All Task WaitReaderTime 379.098s | Percentage 0.00%

可以看到在Channel數(shù)從1提升到10之后,速度從2MB/s提升到了9MB/s。此時若再提高Channel數(shù)到15,速度已經(jīng)不見漲,而從服務(wù)端監(jiān)控看,每批次導(dǎo)入的寫入延遲確在漲,說明當(dāng)前瓶頸在TableStore寫入端。

三:對TableStore表進(jìn)行預(yù)分區(qū),并進(jìn)一步提升DataX Channel并發(fā)

在上面幾個優(yōu)化做完后,DataX數(shù)據(jù)交換這一環(huán)節(jié)已經(jīng)不是瓶頸,當(dāng)前瓶頸在TableStore端的寫入能力上。TableStore是分布式的存儲,一張大表會被切分成很多的分區(qū),分區(qū)會分散到后端的各個物理機(jī)上提供服務(wù)。一張新創(chuàng)建的表,默認(rèn)分區(qū)數(shù)為1,當(dāng)這張表越來越大,TableStore會將其分裂,此時分裂是自動完成的。分區(qū)的個數(shù),一定程度上與能提供的服務(wù)能力相關(guān)。某些業(yè)務(wù)場景,新建表后,就需要對表進(jìn)行大規(guī)模的數(shù)據(jù)導(dǎo)入,此時默認(rèn)的單個分區(qū)肯定是不夠用的,當(dāng)然可以等數(shù)據(jù)量慢慢漲上來后等表自動分裂,但是這個周期會比較長。此時,我們推薦的做法是在創(chuàng)建表的時候進(jìn)行預(yù)分區(qū)。

不過目前我們還沒有對外開放通過SDK來進(jìn)行預(yù)分區(qū)的功能,所以如果需要對表進(jìn)行預(yù)分區(qū),可以先通過工單來聯(lián)系我們幫助進(jìn)行預(yù)分區(qū)。

我們新建一張表,并將表預(yù)分4個分區(qū),partition key為md5列,采用md5列的主要原因是在其上數(shù)據(jù)的分區(qū)基本是均勻的。如果數(shù)據(jù)在partition key分布不均勻,則即使做了預(yù)分區(qū),導(dǎo)入性能也不會得到明顯的提升。以相同的Job配置,再跑一下導(dǎo)出任務(wù):

2017-02-08 13:48:18.692 [job-0] INFO  StandAloneJobContainerCommunicator - Total 11395424 records, 2316456451 bytes | Speed 18.79MB/s, 96940 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 666.003s |  All Task WaitReaderTime 336.048s | Percentage 0.00%
2017-02-08 13:48:28.693 [job-0] INFO  StandAloneJobContainerCommunicator - Total 12340192 records, 2508508780 bytes | Speed 18.32MB/s, 94476 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 716.743s |  All Task WaitReaderTime 349.424s | Percentage 0.00%
2017-02-08 13:48:38.694 [job-0] INFO  StandAloneJobContainerCommunicator - Total 13197472 records, 2682776109 bytes | Speed 16.62MB/s, 85728 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 776.487s |  All Task WaitReaderTime 359.132s | Percentage 0.00%
2017-02-08 13:48:48.695 [job-0] INFO  StandAloneJobContainerCommunicator - Total 14085856 records, 2863367678 bytes | Speed 17.22MB/s, 88838 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 826.191s |  All Task WaitReaderTime 378.034s | Percentage 0.00%
2017-02-08 13:48:58.696 [job-0] INFO  StandAloneJobContainerCommunicator - Total 15063328 records, 3062065378 bytes | Speed 18.95MB/s, 97747 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 867.363s |  All Task WaitReaderTime 401.640s | Percentage 0.00%
2017-02-08 13:49:08.697 [job-0] INFO  StandAloneJobContainerCommunicator - Total 15908736 records, 3233917750 bytes | Speed 16.39MB/s, 84540 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 921.193s |  All Task WaitReaderTime 418.862s | Percentage 0.00%

此時速度從9MB/s提升到18MB/s左右,在TableStore服務(wù)端能夠提高更多的服務(wù)能力后,我們嘗試再將Channel的并發(fā)從10提高到15:

2017-02-08 13:51:54.546 [job-0] INFO  StandAloneJobContainerCommunicator - Total 8194848 records, 1665844036 bytes | Speed 20.97MB/s, 108160 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 884.016s |  All Task WaitReaderTime 263.742s | Percentage 0.00%
2017-02-08 13:52:04.547 [job-0] INFO  StandAloneJobContainerCommunicator - Total 9351040 records, 1900875263 bytes | Speed 22.41MB/s, 115619 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 1,007.206s |  All Task WaitReaderTime 263.789s | Percentage 0.00%
2017-02-08 13:52:14.548 [job-0] INFO  StandAloneJobContainerCommunicator - Total 10460064 records, 2126318844 bytes | Speed 21.50MB/s, 110902 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 1,140.113s |  All Task WaitReaderTime 263.824s | Percentage 0.00%
2017-02-08 13:52:24.549 [job-0] INFO  StandAloneJobContainerCommunicator - Total 11662112 records, 2370669233 bytes | Speed 23.30MB/s, 120204 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 1,269.070s |  All Task WaitReaderTime 263.863s | Percentage 0.00%
2017-02-08 13:52:34.550 [job-0] INFO  StandAloneJobContainerCommunicator - Total 12874240 records, 2617069638 bytes | Speed 23.50MB/s, 121212 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 1,396.991s |  All Task WaitReaderTime 263.913s | Percentage 0.00%

此時速度又進(jìn)一步提升,從18MB/s提升到22MB/s左右。

四:提高每次批量寫行數(shù)

我們構(gòu)建的場景,每行大約是200字節(jié)左右大小。DataX的OTSWriter寫入插件底層是使用的TableStore SDK提供的BatchWrite接口進(jìn)行數(shù)據(jù)寫入,默認(rèn)一次請求寫入100行數(shù)據(jù),也就是說一次請求只會導(dǎo)入約20KB大小的數(shù)據(jù)。每次寫過來的數(shù)據(jù)包都比較小,非常的不經(jīng)濟(jì)。

當(dāng)前TableStore的BatchWrite的限制比較不靈活,會限制行數(shù)和數(shù)據(jù)大小,其中行數(shù)默認(rèn)上限是200行。在每行都比較小的場景下,200行一次批量寫入是非常不經(jīng)濟(jì)的,在我們的這次實(shí)驗(yàn)中,我們將上限改為1000行,并將DataX TableStore寫入插件內(nèi)部一次批量寫入的行數(shù)也改為1000行,來驗(yàn)證將每次寫入的包變大后,對寫入效率的提升。任務(wù)配置更改如下(配置項(xiàng)為job.content.writer.parameter.batchWriteCount):

"job": {
        "content": [
            {
                "reader": {
                    ...
                },
                "writer": {
                    "name": "otswriter",
                    "parameter": {
                        "batchWriteCount":1000,
                        ...
                    }
                }
            }
        ]
    }
}

再次執(zhí)行任務(wù),速度如下:

2017-02-08 13:55:16.924 [job-0] INFO  StandAloneJobContainerCommunicator - Total 11413216 records, 2320073926 bytes | Speed 29.44MB/s, 151849 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 72.662s |  All Task WaitReaderTime 1,030.787s | Percentage 0.00%
2017-02-08 13:55:36.925 [job-0] INFO  StandAloneJobContainerCommunicator - Total 14462240 records, 2939879188 bytes | Speed 29.55MB/s, 152451 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 85.228s |  All Task WaitReaderTime 1,297.655s | Percentage 0.00%
2017-02-08 13:55:46.927 [job-0] INFO  StandAloneJobContainerCommunicator - Total 15979552 records, 3248317815 bytes | Speed 29.41MB/s, 151731 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 89.841s |  All Task WaitReaderTime 1,432.022s | Percentage 0.00%
2017-02-08 13:55:56.928 [job-0] INFO  StandAloneJobContainerCommunicator - Total 17488864 records, 3555129299 bytes | Speed 29.26MB/s, 150931 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 100.300s |  All Task WaitReaderTime 1,558.120s | Percentage 0.00%
2017-02-08 13:56:06.929 [job-0] INFO  StandAloneJobContainerCommunicator - Total 19018240 records, 3866017412 bytes | Speed 29.65MB/s, 152937 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 106.391s |  All Task WaitReaderTime 1,691.072s | Percentage 0.00%

速度再次提升,從22MB/s提升到29MB/s。TableStore后續(xù)會優(yōu)化對BatchWrite的行數(shù)限制,對于行比較小的場景采用一個比較友好的策略。

五:MaxCompute表分區(qū),提高DataX Job并發(fā)
以上優(yōu)化策略都是在單個DataX Job的場景下進(jìn)行的優(yōu)化,單個DataX Job只能夠運(yùn)行在單臺服務(wù)器上,沒有辦法分布式的執(zhí)行。D2上的托管服務(wù)器,一般是千兆網(wǎng)卡,也就是說最多提供100MB/s的速度。若想要進(jìn)一步的速度提升,則必須采用多個DataX Job分布在多臺服務(wù)器上執(zhí)行才行。

DataX內(nèi)的ODPSReader,可以通過配置一次導(dǎo)出整張表或者表的某個Partition。我們可以利用Partition,來將一張表拆分成多個Job分散導(dǎo)出,但是要求表必須是多分區(qū)的。
在我們的實(shí)驗(yàn)中,創(chuàng)建的MaxCompute表并不是多分區(qū)的,我們重新創(chuàng)建一張多分區(qū)的表:

md5 string,
    userid string,
    name string,
    comments string,
    attr0 string,
    attr1 string,
    attr2 string,
    attr3 string,
    create_time string,
    udpate_time string
)
PARTITIONED BY (
    partid string
)

增加一列為partid,作為分區(qū),我們通過一個SQL將原表的數(shù)據(jù)導(dǎo)入到新表,并自動均勻的分散到partid:

attr0, attr1, attr2, attr3, create_time, udpate_time, SUBSTR(md5, 1, 1) from data_for_ots;

以上SQL會將partid的值取自md5列的第一個字符,md5是一個十六進(jìn)制的值,字符的取值范圍是:0-f,這樣我們就將原表切成了一個帶16個分區(qū)的表。我們希望在每個分區(qū)內(nèi),數(shù)據(jù)都是均勻的,為了避免長尾,這也是為什么要設(shè)計(jì)一個md5列的原因。

在將一張表拆成多個分區(qū)后,我們就可以選擇在不同的服務(wù)器上,為每個分區(qū)啟動一個任務(wù),配置如下(job.content.reader.parameter.partition):

"job": {
          "content": [
              {
                  "reader": {
                      "name": "odpsreader",
                      "parameter": {
                          ...
                          "partition": ["partid=0"],
                          ...
                      }
                  },
                  "writer": {
                      ...
                  }
              }
          ]
      }
  }

由于測試集群規(guī)模的原因,我們不演示多個Job并發(fā)后的速度提升。在TableStore服務(wù)端能力不是瓶頸的情況下,通過擴(kuò)展DataX Job的并發(fā),速度是能線性提升的。

總結(jié)下上面的幾個優(yōu)化點(diǎn):

對DataX的幾個基本參數(shù)進(jìn)行調(diào)整,包括:Channel數(shù)、單個Channel的限速以及JVM的內(nèi)存參數(shù)。
創(chuàng)建TableStore表的時候盡量采取預(yù)分區(qū),在設(shè)計(jì)partition key的時候盡量保證在每個partition key上導(dǎo)入數(shù)據(jù)的分布均勻。
如果導(dǎo)入TableStore的數(shù)據(jù)行都比較小,則需要考慮提高單批次的導(dǎo)入行數(shù)。
若單個DataX Job已成瓶頸,則需要考慮將任務(wù)拆成多個DataX Job并行執(zhí)行。

關(guān)于怎么使用DataX同步MaxCompute數(shù)據(jù)到TableStore就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI