溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

SpringBoot怎么整合dataworks

發(fā)布時(shí)間:2022-08-12 14:21:55 來(lái)源:億速云 閱讀:177 作者:iii 欄目:開(kāi)發(fā)技術(shù)

這篇文章主要介紹“SpringBoot怎么整合dataworks”的相關(guān)知識(shí),小編通過(guò)實(shí)際案例向大家展示操作過(guò)程,操作方法簡(jiǎn)單快捷,實(shí)用性強(qiáng),希望這篇“SpringBoot怎么整合dataworks”文章能幫助大家解決問(wèn)題。

    注意事項(xiàng)

    這里測(cè)試主要是調(diào)用拉取dataworks上拉取的腳本,并存儲(chǔ)到本地。
    腳本包含兩部分

    1、開(kāi)發(fā)的odps腳本(通過(guò)OpenApi獲取)2、建表語(yǔ)句腳本(通過(guò)dataworks信息去連接maxCompute獲取建立語(yǔ)句)

    阿里云Dataworks的openApi分頁(yè)查詢(xún)限制,一次最多查詢(xún)100條。我們拉取腳本需要分多頁(yè)查詢(xún)

    該項(xiàng)目使用到了MaxCompute的SDK/JDBC方式連接,SpringBoot操作MaxCompute SDK/JDBC連接

    整合實(shí)現(xiàn)

    實(shí)現(xiàn)主要是編寫(xiě)工具類(lèi),如果需要?jiǎng)t可以配置成SpringBean,注入容器即可使用

    依賴(lài)引入

    <properties>
        <java.version>1.8</java.version>
        <!--maxCompute-sdk-版本號(hào)-->
        <max-compute-sdk.version>0.40.8-public</max-compute-sdk.version>
        <!--maxCompute-jdbc-版本號(hào)-->
        <max-compute-jdbc.version>3.0.1</max-compute-jdbc.version>
        <!--dataworks版本號(hào)-->
        <dataworks-sdk.version>3.4.2</dataworks-sdk.version>
        <aliyun-java-sdk.version>4.5.20</aliyun-java-sdk.version>
    </properties>
    <dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <!--max compute sdk-->
    <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>odps-sdk-core</artifactId>
        <version>${max-compute-sdk.version}</version>
    </dependency>
    <!--max compute jdbc-->
    <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>odps-jdbc</artifactId>
        <version>${max-compute-jdbc.version}</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>
    <!--dataworks需要引入aliyun-sdk和dataworks本身-->
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <version>${aliyun-java-sdk.version}</version>
    </dependency>
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
        <version>${dataworks-sdk.version}</version>
    </dependency>
    </dependencies>

    請(qǐng)求參數(shù)類(lèi)編寫(xiě)

    /**
     * @Description
     * @Author itdl
     * @Date 2022/08/09 15:12
     */
    @Data
    public class DataWorksOpenApiConnParam {
        /**
         * 區(qū)域 eg. cn-shanghai
         */
        private String region;
    
        /**
         * 訪(fǎng)問(wèn)keyId
         */
        private String aliyunAccessId;
        /**
         * 密鑰
         */
        private String aliyunAccessKey;
    
        /**
         * 訪(fǎng)問(wèn)端點(diǎn)  就是API的URL前綴
         */
        private String endPoint;
    
        /**
         * 數(shù)據(jù)庫(kù)類(lèi)型 如odps
         */
        private String datasourceType;
    
        /**
         * 所屬項(xiàng)目
         */
        private String project;
    
        /**
         * 項(xiàng)目環(huán)境 dev  prod
         */
        private String projectEnv;
    }

    工具類(lèi)編寫(xiě)

    基礎(chǔ)類(lèi)準(zhǔn)備,拉取腳本之后的回調(diào)函數(shù)

    為什么需要回調(diào)函數(shù),因?yàn)槔〉氖撬心_本,如果合并每次分頁(yè)結(jié)果的話(huà),會(huì)導(dǎo)致內(nèi)存溢出,而使用回調(diào)函數(shù)只是每次循環(huán)增加處理函數(shù)

    /**
     * @Description
     * @Author itdl
     * @Date 2022/08/09 15:12
     */
    @Data
    public class DataWorksOpenApiConnParam {
        /**
         * 區(qū)域 eg. cn-shanghai
         */
        private String region;
    
        /**
         * 訪(fǎng)問(wèn)keyId
         */
        private String aliyunAccessId;
        /**
         * 密鑰
         */
        private String aliyunAccessKey;
    
        /**
         * 訪(fǎng)問(wèn)端點(diǎn)  就是API的URL前綴
         */
        private String endPoint;
    
        /**
         * 數(shù)據(jù)庫(kù)類(lèi)型 如odps
         */
        private String datasourceType;
    
        /**
         * 所屬項(xiàng)目
         */
        private String project;
    
        /**
         * 項(xiàng)目環(huán)境 dev  prod
         */
        private String projectEnv;
    }

    初始化操作

    主要是實(shí)例化dataworks openApi接口的客戶(hù)端信息,maxCompute連接的工具類(lèi)初始化(包括JDBC,SDK方式)

    private static final String MAX_COMPUTE_JDBC_URL_FORMAT = "http://service.%s.maxcompute.aliyun.com/api";
    /**默認(rèn)的odps接口地址 在Odps中也可以看到該變量*/
    private static final String defaultEndpoint = "http://service.odps.aliyun.com/api";
    /**
     * dataworks連接參數(shù)
     *
     */
    private final DataWorksOpenApiConnParam connParam;
    
    /**
     * 可以使用dataworks去連接maxCompute 如果連接的引擎是maxCompute的話(huà)
     */
    private final MaxComputeJdbcUtil maxComputeJdbcUtil;
    
    private final MaxComputeSdkUtil maxComputeSdkUtil;
    
    private final boolean odpsSdk;
    
    
    /**
     * 客戶(hù)端
     */
    private final IAcsClient client;
    
    public DataWorksOpenApiUtil(DataWorksOpenApiConnParam connParam, boolean odpsSdk) {
        this.connParam = connParam;
        this.client = buildClient();
        this.odpsSdk = odpsSdk;
        if (odpsSdk){
            this.maxComputeJdbcUtil = null;
            this.maxComputeSdkUtil = buildMaxComputeSdkUtil();
        }else {
            this.maxComputeJdbcUtil = buildMaxComputeJdbcUtil();
            this.maxComputeSdkUtil = null;
        }
    }
    
    private MaxComputeSdkUtil buildMaxComputeSdkUtil() {
        final MaxComputeSdkConnParam param = new MaxComputeSdkConnParam();
    
        // 設(shè)置賬號(hào)密碼
        param.setAliyunAccessId(connParam.getAliyunAccessId());
        param.setAliyunAccessKey(connParam.getAliyunAccessKey());
    
        // 設(shè)置endpoint
        param.setMaxComputeEndpoint(defaultEndpoint);
    
        // 目前只處理odps的引擎
        final String datasourceType = connParam.getDatasourceType();
        if (!"odps".equals(datasourceType)){
            throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR);
        }
    
        // 獲取項(xiàng)目環(huán)境,根據(jù)項(xiàng)目環(huán)境連接不同的maxCompute
        final String projectEnv = connParam.getProjectEnv();
    
        if ("dev".equals(projectEnv)){
            // 開(kāi)發(fā)環(huán)境dataworks + _dev就是maxCompute的項(xiàng)目名
            param.setProjectName(String.join("_", connParam.getProject(), projectEnv));
        }else {
            // 生產(chǎn)環(huán)境dataworks的項(xiàng)目名和maxCompute一致
            param.setProjectName(connParam.getProject());
        }
    
        return new MaxComputeSdkUtil(param);
    }
    
    private MaxComputeJdbcUtil buildMaxComputeJdbcUtil() {
        final MaxComputeJdbcConnParam param = new MaxComputeJdbcConnParam();
    
        // 設(shè)置賬號(hào)密碼
        param.setAliyunAccessId(connParam.getAliyunAccessId());
        param.setAliyunAccessKey(connParam.getAliyunAccessKey());
    
        // 設(shè)置endpoint
        param.setEndpoint(String.format(MAX_COMPUTE_JDBC_URL_FORMAT, connParam.getRegion()));
    
        // 目前只處理odps的引擎
        final String datasourceType = connParam.getDatasourceType();
        if (!"odps".equals(datasourceType)){
            throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR);
        }
    
        // 獲取項(xiàng)目環(huán)境,根據(jù)項(xiàng)目環(huán)境連接不同的maxCompute
        final String projectEnv = connParam.getProjectEnv();
    
        if ("dev".equals(projectEnv)){
            // 開(kāi)發(fā)環(huán)境dataworks + _dev就是maxCompute的項(xiàng)目名
            param.setProjectName(String.join("_", connParam.getProject(), projectEnv));
        }else {
            // 生產(chǎn)環(huán)境dataworks的項(xiàng)目名和maxCompute一致
            param.setProjectName(connParam.getProject());
        }
    
        return new MaxComputeJdbcUtil(param);
    }

    調(diào)用OpenApi拉取所有腳本

    /**
     * 根據(jù)文件夾路徑分頁(yè)查詢(xún)?cè)撀窂较碌奈募_本)
     * @param pageSize 每頁(yè)查詢(xún)多少數(shù)據(jù)
     * @param folderPath 文件所在目錄
     * @param userType 文件所屬功能模塊 可不傳
     * @param fileTypes 設(shè)置文件代碼類(lèi)型 逗號(hào)分割 可不傳
     */
    public void listAllFiles(Integer pageSize, String folderPath, String userType, String fileTypes, CallBack.FileCallBack callBack) throws ClientException {
        pageSize = setPageSize(pageSize);
        // 創(chuàng)建請(qǐng)求
        final ListFilesRequest request = new ListFilesRequest();
    
        // 設(shè)置分頁(yè)參數(shù)
        request.setPageNumber(1);
        request.setPageSize(pageSize);
    
        // 設(shè)置上級(jí)文件夾
        request.setFileFolderPath(folderPath);
    
        // 設(shè)置區(qū)域和項(xiàng)目名稱(chēng)
        request.setSysRegionId(connParam.getRegion());
        request.setProjectIdentifier(connParam.getProject());
    
        // 設(shè)置文件所屬功能模塊
        if (!ObjectUtils.isEmpty(userType)){
            request.setUseType(userType);
        }
        // 設(shè)置文件代碼類(lèi)型
        if (!ObjectUtils.isEmpty(fileTypes)){
            request.setFileTypes(fileTypes);
        }
    
        // 發(fā)起請(qǐng)求
        ListFilesResponse res = client.getAcsResponse(request);
    
        // 獲取分頁(yè)總數(shù)
        final Integer totalCount = res.getData().getTotalCount();
        // 返回結(jié)果
        final List<ListFilesResponse.Data.File> resultList = res.getData().getFiles();
        // 計(jì)算能分幾頁(yè)
        long pages = totalCount % pageSize == 0 ? (totalCount / pageSize) : (totalCount / pageSize) + 1;
        // 只有1頁(yè) 直接返回
        if (pages <= 1){
            callBack.handle(resultList);
            return;
        }
    
        // 第一頁(yè)執(zhí)行回調(diào)
        callBack.handle(resultList);
    
        // 分頁(yè)數(shù)據(jù) 從第二頁(yè)開(kāi)始查詢(xún) 同步拉取,可以?xún)?yōu)化為多線(xiàn)程拉取
        for (int i = 2; i <= pages; i++) {
            //第1頁(yè)
            request.setPageNumber(i);
            //每頁(yè)大小
            request.setPageSize(pageSize);
            // 發(fā)起請(qǐng)求
            res = client.getAcsResponse(request);
            final List<ListFilesResponse.Data.File> tableEntityList = res.getData().getFiles();
            if (!ObjectUtils.isEmpty(tableEntityList)){
                // 執(zhí)行回調(diào)函數(shù)
                callBack.handle(tableEntityList);
            }
        }
    }

    內(nèi)部連接MaxCompute拉取所有DDL腳本內(nèi)容

    DataWorks工具類(lèi)代碼,通過(guò)回調(diào)函數(shù)處理

        /**
         * 獲取所有的DDL腳本
         * @param callBack 回調(diào)處理函數(shù)
         */
        public void listAllDdl(CallBack.DdlCallBack callBack){
            if (odpsSdk){
                final List<TableMetaInfo> tableInfos = maxComputeSdkUtil.getTableInfos();
                for (TableMetaInfo tableInfo : tableInfos) {
                    final String tableName = tableInfo.getTableName();
                    final String sqlCreateDesc = maxComputeSdkUtil.getSqlCreateDesc(tableName);
                    callBack.handle(tableName, sqlCreateDesc);
                }
            }
        }

    MaxCompute工具類(lèi)代碼,根據(jù)表名獲取建表語(yǔ)句, 以SDK為例, JDBC直接執(zhí)行show create table即可拿到建表語(yǔ)句

    /**
     * 根據(jù)表名獲取建表語(yǔ)句
     * @param tableName 表名
     * @return
     */
    public String getSqlCreateDesc(String tableName) {
        final Table table = odps.tables().get(tableName);
        // 建表語(yǔ)句
        StringBuilder mssqlDDL = new StringBuilder();
    
        // 獲取表結(jié)構(gòu)
        TableSchema tableSchema = table.getSchema();
        // 獲取表名表注釋
        String tableComment = table.getComment();
    
        //獲取列名列注釋
        List<Column> columns = tableSchema.getColumns();
        /*組裝成mssql的DDL*/
        // 表名
        mssqlDDL.append("CREATE TABLE IF NOT EXISTS ");
        mssqlDDL.append(tableName).append("\n");
        mssqlDDL.append(" (\n");
        //列字段
        int index = 1;
        for (Column column : columns) {
            mssqlDDL.append("  ").append(column.getName()).append("\t\t").append(column.getTypeInfo().getTypeName());
            if (!ObjectUtils.isEmpty(column.getComment())) {
                mssqlDDL.append(" COMMENT '").append(column.getComment()).append("'");
            }
            if (index == columns.size()) {
                mssqlDDL.append("\n");
            } else {
                mssqlDDL.append(",\n");
            }
            index++;
        }
        mssqlDDL.append(" )\n");
        //獲取分區(qū)
        List<Column> partitionColumns = tableSchema.getPartitionColumns();
        int partitionIndex = 1;
        if (!ObjectUtils.isEmpty(partitionColumns)) {
            mssqlDDL.append("PARTITIONED BY (");
        }
        for (Column partitionColumn : partitionColumns) {
            final String format = String.format("%s %s COMMENT '%s'", partitionColumn.getName(), partitionColumn.getTypeInfo().getTypeName(), partitionColumn.getComment());
            mssqlDDL.append(format);
            if (partitionIndex == partitionColumns.size()) {
                mssqlDDL.append("\n");
            } else {
                mssqlDDL.append(",\n");
            }
            partitionIndex++;
        }
    
        if (!ObjectUtils.isEmpty(partitionColumns)) {
            mssqlDDL.append(")\n");
        }
    //        mssqlDDL.append("STORED AS ALIORC  \n");
    //        mssqlDDL.append("TBLPROPERTIES ('comment'='").append(tableComment).append("');");
        mssqlDDL.append(";");
        return mssqlDDL.toString();
    }

    測(cè)試代碼

    public static void main(String[] args) throws ClientException {
        final DataWorksOpenApiConnParam connParam = new DataWorksOpenApiConnParam();
        connParam.setAliyunAccessId("您的阿里云賬號(hào)accessId");
        connParam.setAliyunAccessKey("您的阿里云賬號(hào)accessKey");
        // dataworks所在區(qū)域
        connParam.setRegion("cn-chengdu");
        // dataworks所屬項(xiàng)目
        connParam.setProject("dataworks所屬項(xiàng)目");
        // dataworks所屬項(xiàng)目環(huán)境 如果不分環(huán)境的話(huà)設(shè)置為生產(chǎn)即可
        connParam.setProjectEnv("dev");
        // 數(shù)據(jù)引擎類(lèi)型 odps
        connParam.setDatasourceType("odps");
        // ddataworks接口地址
        connParam.setEndPoint("dataworks.cn-chengdu.aliyuncs.com");
        final DataWorksOpenApiUtil dataWorksOpenApiUtil = new DataWorksOpenApiUtil(connParam, true);
    
        // 拉取所有ODPS腳本
        dataWorksOpenApiUtil.listAllFiles(100, "", "", "10", files -> {
            // 處理文件
            for (ListFilesResponse.Data.File file : files) {
                final String fileName = file.getFileName();
                System.out.println(fileName);
            }
        });
    
        // 拉取所有表的建表語(yǔ)句
        dataWorksOpenApiUtil.listAllDdl((tableName, tableDdlContent) -> {
            System.out.println("=======================================");
            System.out.println("表名:" + tableName + "內(nèi)容如下:\n");
            System.out.println(tableDdlContent);
            System.out.println("=======================================");
        });
    }

    測(cè)試結(jié)果

    test_001腳本
    test_002腳本
    test_003腳本
    test_004腳本
    test_005腳本
    =======================================
    表名:test_abc_info內(nèi)容如下:

    CREATE TABLE IF NOT EXISTS test_abc_info
     (
        test_abc1        STRING COMMENT '字段1',
        test_abc2        STRING COMMENT '字段2',
        test_abc3        STRING COMMENT '字段3',
        test_abc4        STRING COMMENT '字段4',
        test_abc5        STRING COMMENT '字段5',
        test_abc6        STRING COMMENT '字段6',
        test_abc7        STRING COMMENT '字段7',
        test_abc8        STRING COMMENT '字段8'
     )
    PARTITIONED BY (p_date STRING COMMENT '數(shù)據(jù)日期'
    )
    ;
    =======================================
    Disconnected from the target VM, address: '127.0.0.1:59509', transport: 'socket'

    關(guān)于“SpringBoot怎么整合dataworks”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí),可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會(huì)為大家更新不同的知識(shí)點(diǎn)。

    向AI問(wèn)一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI