您好,登錄后才能下訂單哦!
這篇文章主要介紹“SpringBoot怎么整合dataworks”的相關(guān)知識(shí),小編通過(guò)實(shí)際案例向大家展示操作過(guò)程,操作方法簡(jiǎn)單快捷,實(shí)用性強(qiáng),希望這篇“SpringBoot怎么整合dataworks”文章能幫助大家解決問(wèn)題。
這里測(cè)試主要是調(diào)用拉取dataworks上拉取的腳本,并存儲(chǔ)到本地。
腳本包含兩部分
1、開(kāi)發(fā)的odps腳本(通過(guò)OpenApi獲取)2、建表語(yǔ)句腳本(通過(guò)dataworks信息去連接maxCompute獲取建立語(yǔ)句)
阿里云Dataworks的openApi分頁(yè)查詢(xún)限制,一次最多查詢(xún)100條。我們拉取腳本需要分多頁(yè)查詢(xún)
該項(xiàng)目使用到了MaxCompute的SDK/JDBC方式連接,SpringBoot操作MaxCompute SDK/JDBC連接
實(shí)現(xiàn)主要是編寫(xiě)工具類(lèi),如果需要?jiǎng)t可以配置成SpringBean,注入容器即可使用
<properties> <java.version>1.8</java.version> <!--maxCompute-sdk-版本號(hào)--> <max-compute-sdk.version>0.40.8-public</max-compute-sdk.version> <!--maxCompute-jdbc-版本號(hào)--> <max-compute-jdbc.version>3.0.1</max-compute-jdbc.version> <!--dataworks版本號(hào)--> <dataworks-sdk.version>3.4.2</dataworks-sdk.version> <aliyun-java-sdk.version>4.5.20</aliyun-java-sdk.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!--max compute sdk--> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-core</artifactId> <version>${max-compute-sdk.version}</version> </dependency> <!--max compute jdbc--> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-jdbc</artifactId> <version>${max-compute-jdbc.version}</version> <classifier>jar-with-dependencies</classifier> </dependency> <!--dataworks需要引入aliyun-sdk和dataworks本身--> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>${aliyun-java-sdk.version}</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-dataworks-public</artifactId> <version>${dataworks-sdk.version}</version> </dependency> </dependencies>
/** * @Description * @Author itdl * @Date 2022/08/09 15:12 */ @Data public class DataWorksOpenApiConnParam { /** * 區(qū)域 eg. cn-shanghai */ private String region; /** * 訪(fǎng)問(wèn)keyId */ private String aliyunAccessId; /** * 密鑰 */ private String aliyunAccessKey; /** * 訪(fǎng)問(wèn)端點(diǎn) 就是API的URL前綴 */ private String endPoint; /** * 數(shù)據(jù)庫(kù)類(lèi)型 如odps */ private String datasourceType; /** * 所屬項(xiàng)目 */ private String project; /** * 項(xiàng)目環(huán)境 dev prod */ private String projectEnv; }
基礎(chǔ)類(lèi)準(zhǔn)備,拉取腳本之后的回調(diào)函數(shù)
為什么需要回調(diào)函數(shù),因?yàn)槔〉氖撬心_本,如果合并每次分頁(yè)結(jié)果的話(huà),會(huì)導(dǎo)致內(nèi)存溢出,而使用回調(diào)函數(shù)只是每次循環(huán)增加處理函數(shù)
/** * @Description * @Author itdl * @Date 2022/08/09 15:12 */ @Data public class DataWorksOpenApiConnParam { /** * 區(qū)域 eg. cn-shanghai */ private String region; /** * 訪(fǎng)問(wèn)keyId */ private String aliyunAccessId; /** * 密鑰 */ private String aliyunAccessKey; /** * 訪(fǎng)問(wèn)端點(diǎn) 就是API的URL前綴 */ private String endPoint; /** * 數(shù)據(jù)庫(kù)類(lèi)型 如odps */ private String datasourceType; /** * 所屬項(xiàng)目 */ private String project; /** * 項(xiàng)目環(huán)境 dev prod */ private String projectEnv; }
主要是實(shí)例化dataworks openApi接口的客戶(hù)端信息,maxCompute連接的工具類(lèi)初始化(包括JDBC,SDK方式)
private static final String MAX_COMPUTE_JDBC_URL_FORMAT = "http://service.%s.maxcompute.aliyun.com/api"; /**默認(rèn)的odps接口地址 在Odps中也可以看到該變量*/ private static final String defaultEndpoint = "http://service.odps.aliyun.com/api"; /** * dataworks連接參數(shù) * */ private final DataWorksOpenApiConnParam connParam; /** * 可以使用dataworks去連接maxCompute 如果連接的引擎是maxCompute的話(huà) */ private final MaxComputeJdbcUtil maxComputeJdbcUtil; private final MaxComputeSdkUtil maxComputeSdkUtil; private final boolean odpsSdk; /** * 客戶(hù)端 */ private final IAcsClient client; public DataWorksOpenApiUtil(DataWorksOpenApiConnParam connParam, boolean odpsSdk) { this.connParam = connParam; this.client = buildClient(); this.odpsSdk = odpsSdk; if (odpsSdk){ this.maxComputeJdbcUtil = null; this.maxComputeSdkUtil = buildMaxComputeSdkUtil(); }else { this.maxComputeJdbcUtil = buildMaxComputeJdbcUtil(); this.maxComputeSdkUtil = null; } } private MaxComputeSdkUtil buildMaxComputeSdkUtil() { final MaxComputeSdkConnParam param = new MaxComputeSdkConnParam(); // 設(shè)置賬號(hào)密碼 param.setAliyunAccessId(connParam.getAliyunAccessId()); param.setAliyunAccessKey(connParam.getAliyunAccessKey()); // 設(shè)置endpoint param.setMaxComputeEndpoint(defaultEndpoint); // 目前只處理odps的引擎 final String datasourceType = connParam.getDatasourceType(); if (!"odps".equals(datasourceType)){ throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR); } // 獲取項(xiàng)目環(huán)境,根據(jù)項(xiàng)目環(huán)境連接不同的maxCompute final String projectEnv = connParam.getProjectEnv(); if ("dev".equals(projectEnv)){ // 開(kāi)發(fā)環(huán)境dataworks + _dev就是maxCompute的項(xiàng)目名 param.setProjectName(String.join("_", connParam.getProject(), projectEnv)); }else { // 生產(chǎn)環(huán)境dataworks的項(xiàng)目名和maxCompute一致 param.setProjectName(connParam.getProject()); } return new MaxComputeSdkUtil(param); } private MaxComputeJdbcUtil buildMaxComputeJdbcUtil() { final MaxComputeJdbcConnParam param = new MaxComputeJdbcConnParam(); // 設(shè)置賬號(hào)密碼 param.setAliyunAccessId(connParam.getAliyunAccessId()); param.setAliyunAccessKey(connParam.getAliyunAccessKey()); // 設(shè)置endpoint param.setEndpoint(String.format(MAX_COMPUTE_JDBC_URL_FORMAT, connParam.getRegion())); // 目前只處理odps的引擎 final String datasourceType = connParam.getDatasourceType(); if (!"odps".equals(datasourceType)){ throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR); } // 獲取項(xiàng)目環(huán)境,根據(jù)項(xiàng)目環(huán)境連接不同的maxCompute final String projectEnv = connParam.getProjectEnv(); if ("dev".equals(projectEnv)){ // 開(kāi)發(fā)環(huán)境dataworks + _dev就是maxCompute的項(xiàng)目名 param.setProjectName(String.join("_", connParam.getProject(), projectEnv)); }else { // 生產(chǎn)環(huán)境dataworks的項(xiàng)目名和maxCompute一致 param.setProjectName(connParam.getProject()); } return new MaxComputeJdbcUtil(param); }
調(diào)用OpenApi拉取所有腳本
/** * 根據(jù)文件夾路徑分頁(yè)查詢(xún)?cè)撀窂较碌奈募_本) * @param pageSize 每頁(yè)查詢(xún)多少數(shù)據(jù) * @param folderPath 文件所在目錄 * @param userType 文件所屬功能模塊 可不傳 * @param fileTypes 設(shè)置文件代碼類(lèi)型 逗號(hào)分割 可不傳 */ public void listAllFiles(Integer pageSize, String folderPath, String userType, String fileTypes, CallBack.FileCallBack callBack) throws ClientException { pageSize = setPageSize(pageSize); // 創(chuàng)建請(qǐng)求 final ListFilesRequest request = new ListFilesRequest(); // 設(shè)置分頁(yè)參數(shù) request.setPageNumber(1); request.setPageSize(pageSize); // 設(shè)置上級(jí)文件夾 request.setFileFolderPath(folderPath); // 設(shè)置區(qū)域和項(xiàng)目名稱(chēng) request.setSysRegionId(connParam.getRegion()); request.setProjectIdentifier(connParam.getProject()); // 設(shè)置文件所屬功能模塊 if (!ObjectUtils.isEmpty(userType)){ request.setUseType(userType); } // 設(shè)置文件代碼類(lèi)型 if (!ObjectUtils.isEmpty(fileTypes)){ request.setFileTypes(fileTypes); } // 發(fā)起請(qǐng)求 ListFilesResponse res = client.getAcsResponse(request); // 獲取分頁(yè)總數(shù) final Integer totalCount = res.getData().getTotalCount(); // 返回結(jié)果 final List<ListFilesResponse.Data.File> resultList = res.getData().getFiles(); // 計(jì)算能分幾頁(yè) long pages = totalCount % pageSize == 0 ? (totalCount / pageSize) : (totalCount / pageSize) + 1; // 只有1頁(yè) 直接返回 if (pages <= 1){ callBack.handle(resultList); return; } // 第一頁(yè)執(zhí)行回調(diào) callBack.handle(resultList); // 分頁(yè)數(shù)據(jù) 從第二頁(yè)開(kāi)始查詢(xún) 同步拉取,可以?xún)?yōu)化為多線(xiàn)程拉取 for (int i = 2; i <= pages; i++) { //第1頁(yè) request.setPageNumber(i); //每頁(yè)大小 request.setPageSize(pageSize); // 發(fā)起請(qǐng)求 res = client.getAcsResponse(request); final List<ListFilesResponse.Data.File> tableEntityList = res.getData().getFiles(); if (!ObjectUtils.isEmpty(tableEntityList)){ // 執(zhí)行回調(diào)函數(shù) callBack.handle(tableEntityList); } } }
內(nèi)部連接MaxCompute拉取所有DDL腳本內(nèi)容
DataWorks工具類(lèi)代碼,通過(guò)回調(diào)函數(shù)處理
/** * 獲取所有的DDL腳本 * @param callBack 回調(diào)處理函數(shù) */ public void listAllDdl(CallBack.DdlCallBack callBack){ if (odpsSdk){ final List<TableMetaInfo> tableInfos = maxComputeSdkUtil.getTableInfos(); for (TableMetaInfo tableInfo : tableInfos) { final String tableName = tableInfo.getTableName(); final String sqlCreateDesc = maxComputeSdkUtil.getSqlCreateDesc(tableName); callBack.handle(tableName, sqlCreateDesc); } } }
MaxCompute工具類(lèi)代碼,根據(jù)表名獲取建表語(yǔ)句, 以SDK為例, JDBC直接執(zhí)行show create table即可拿到建表語(yǔ)句
/** * 根據(jù)表名獲取建表語(yǔ)句 * @param tableName 表名 * @return */ public String getSqlCreateDesc(String tableName) { final Table table = odps.tables().get(tableName); // 建表語(yǔ)句 StringBuilder mssqlDDL = new StringBuilder(); // 獲取表結(jié)構(gòu) TableSchema tableSchema = table.getSchema(); // 獲取表名表注釋 String tableComment = table.getComment(); //獲取列名列注釋 List<Column> columns = tableSchema.getColumns(); /*組裝成mssql的DDL*/ // 表名 mssqlDDL.append("CREATE TABLE IF NOT EXISTS "); mssqlDDL.append(tableName).append("\n"); mssqlDDL.append(" (\n"); //列字段 int index = 1; for (Column column : columns) { mssqlDDL.append(" ").append(column.getName()).append("\t\t").append(column.getTypeInfo().getTypeName()); if (!ObjectUtils.isEmpty(column.getComment())) { mssqlDDL.append(" COMMENT '").append(column.getComment()).append("'"); } if (index == columns.size()) { mssqlDDL.append("\n"); } else { mssqlDDL.append(",\n"); } index++; } mssqlDDL.append(" )\n"); //獲取分區(qū) List<Column> partitionColumns = tableSchema.getPartitionColumns(); int partitionIndex = 1; if (!ObjectUtils.isEmpty(partitionColumns)) { mssqlDDL.append("PARTITIONED BY ("); } for (Column partitionColumn : partitionColumns) { final String format = String.format("%s %s COMMENT '%s'", partitionColumn.getName(), partitionColumn.getTypeInfo().getTypeName(), partitionColumn.getComment()); mssqlDDL.append(format); if (partitionIndex == partitionColumns.size()) { mssqlDDL.append("\n"); } else { mssqlDDL.append(",\n"); } partitionIndex++; } if (!ObjectUtils.isEmpty(partitionColumns)) { mssqlDDL.append(")\n"); } // mssqlDDL.append("STORED AS ALIORC \n"); // mssqlDDL.append("TBLPROPERTIES ('comment'='").append(tableComment).append("');"); mssqlDDL.append(";"); return mssqlDDL.toString(); }
public static void main(String[] args) throws ClientException { final DataWorksOpenApiConnParam connParam = new DataWorksOpenApiConnParam(); connParam.setAliyunAccessId("您的阿里云賬號(hào)accessId"); connParam.setAliyunAccessKey("您的阿里云賬號(hào)accessKey"); // dataworks所在區(qū)域 connParam.setRegion("cn-chengdu"); // dataworks所屬項(xiàng)目 connParam.setProject("dataworks所屬項(xiàng)目"); // dataworks所屬項(xiàng)目環(huán)境 如果不分環(huán)境的話(huà)設(shè)置為生產(chǎn)即可 connParam.setProjectEnv("dev"); // 數(shù)據(jù)引擎類(lèi)型 odps connParam.setDatasourceType("odps"); // ddataworks接口地址 connParam.setEndPoint("dataworks.cn-chengdu.aliyuncs.com"); final DataWorksOpenApiUtil dataWorksOpenApiUtil = new DataWorksOpenApiUtil(connParam, true); // 拉取所有ODPS腳本 dataWorksOpenApiUtil.listAllFiles(100, "", "", "10", files -> { // 處理文件 for (ListFilesResponse.Data.File file : files) { final String fileName = file.getFileName(); System.out.println(fileName); } }); // 拉取所有表的建表語(yǔ)句 dataWorksOpenApiUtil.listAllDdl((tableName, tableDdlContent) -> { System.out.println("======================================="); System.out.println("表名:" + tableName + "內(nèi)容如下:\n"); System.out.println(tableDdlContent); System.out.println("======================================="); }); }
test_001腳本
test_002腳本
test_003腳本
test_004腳本
test_005腳本
=======================================
表名:test_abc_info內(nèi)容如下:CREATE TABLE IF NOT EXISTS test_abc_info
(
test_abc1 STRING COMMENT '字段1',
test_abc2 STRING COMMENT '字段2',
test_abc3 STRING COMMENT '字段3',
test_abc4 STRING COMMENT '字段4',
test_abc5 STRING COMMENT '字段5',
test_abc6 STRING COMMENT '字段6',
test_abc7 STRING COMMENT '字段7',
test_abc8 STRING COMMENT '字段8'
)
PARTITIONED BY (p_date STRING COMMENT '數(shù)據(jù)日期'
)
;
=======================================
Disconnected from the target VM, address: '127.0.0.1:59509', transport: 'socket'
關(guān)于“SpringBoot怎么整合dataworks”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí),可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會(huì)為大家更新不同的知識(shí)點(diǎn)。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。