溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

TiDB+FLINK進(jìn)行數(shù)據(jù)實(shí)時(shí)統(tǒng)計(jì)的方法是什么

發(fā)布時(shí)間:2021-12-22 11:47:54 來(lái)源:億速云 閱讀:123 作者:iii 欄目:大數(shù)據(jù)

本篇內(nèi)容主要講解“TiDB+FLINK進(jìn)行數(shù)據(jù)實(shí)時(shí)統(tǒng)計(jì)的方法是什么”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“TiDB+FLINK進(jìn)行數(shù)據(jù)實(shí)時(shí)統(tǒng)計(jì)的方法是什么”吧!

1.TiCDC 具體配置如下

# 指定配置文件中涉及的庫(kù)名、表名是否為大小寫(xiě)敏感
# 該配置會(huì)同時(shí)影響 filter 和 sink 相關(guān)配置,默認(rèn)為 true
case-sensitive = true

# 是否輸出 old value,從 v4.0.5 開(kāi)始支持
enable-old-value = true

[filter]
# 忽略指定 start_ts 的事務(wù)
ignore-txn-start-ts = [1, 2]

# 過(guò)濾器規(guī)則
# 過(guò)濾規(guī)則語(yǔ)法:https://docs.pingcap.com/zh/tidb/stable/table-filter#表庫(kù)過(guò)濾語(yǔ)法 指定了我的銷售表
rules = ['dspdev.sales_order_header']

[mounter]
# mounter 線程數(shù),用于解碼 TiKV 輸出的數(shù)據(jù)
worker-num = 16

[sink]
# 對(duì)于 MQ 類的 Sink,可以通過(guò) dispatchers 配置 event 分發(fā)器
# 支持 default、ts、rowid、table 四種分發(fā)器,分發(fā)規(guī)則如下:
# - default:有多個(gè)唯一索引(包括主鍵)時(shí)按照 table 模式分發(fā);只有一個(gè)唯一索引(或主鍵)按照 rowid 模式分發(fā);如果開(kāi)啟了 old value 特性,按照 table 分發(fā)
# - ts:以行變更的 commitTs 做 Hash 計(jì)算并進(jìn)行 event 分發(fā)
# - rowid:以所選的 HandleKey 列名和列值做 Hash 計(jì)算并進(jìn)行 event 分發(fā)
# - table:以表的 schema 名和 table 名做 Hash 計(jì)算并進(jìn)行 event 分發(fā)
# matcher 的匹配語(yǔ)法和過(guò)濾器規(guī)則語(yǔ)法相同
dispatchers = [
    {matcher = ['dspdev.*'], dispatcher = "ts"}
]
# 對(duì)于 MQ 類的 Sink,可以指定消息的協(xié)議格式
# 目前支持 default、canal、avro 和 maxwell 四種協(xié)議。default 為 TiCDC Open Protocol
protocol = "canal"

[cyclic-replication]
# 是否開(kāi)啟環(huán)形同步
enable = false
# 當(dāng)前 TiCDC 的復(fù)制 ID
replica-id = 1
# 需要過(guò)濾掉的同步 ID
filter-replica-ids = [2,3]
# 是否同步 DDL
sync-ddl = true

2 cdc sink 配置下游為kafka

--sink-uri="kafka://127.0.0.1:9092/cdc-test?kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"

這樣就會(huì)將tidb cdc 數(shù)據(jù)以protobuf數(shù)據(jù)發(fā)完kafka,我們只需要在下游做解析就好 具體配置解釋參考:tidb配置連接

3 新建spring boot項(xiàng)目 引入 canal-client,kafka等配置

pom引入如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.konka.dsp</groupId>
    <artifactId>kafka-parse</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-parse</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>11</java.version>
        <fastjson.version>1.2.70</fastjson.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>org.springframework.cloud</groupId>-->
<!--            <artifactId>spring-cloud-starter</artifactId>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.springframework.cloud</groupId>-->
<!--            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>-->
<!--        </dependency>-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

properties 如下:

###########【Kafka集群】###########
spring.kafka.bootstrap-servers=192.168.8.71:9092
###########【初始化生產(chǎn)者配置】###########
# 重試次數(shù)
spring.kafka.producer.retries=0
# 應(yīng)答級(jí)別:多少個(gè)分區(qū)副本備份完成時(shí)向生產(chǎn)者發(fā)送ack確認(rèn)(可選0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延時(shí)
spring.kafka.producer.properties.linger.ms=0
# 當(dāng)生產(chǎn)端積累的消息達(dá)到batch-size或接收到消息linger.ms后,生產(chǎn)者就會(huì)將消息提交給kafka
# linger.ms為0表示每接收到一條消息就提交給kafka,這時(shí)候batch-size其實(shí)就沒(méi)用了
?
# 生產(chǎn)端緩沖區(qū)大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化類
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定義分區(qū)器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
###########【初始化消費(fèi)者配置】###########
# 默認(rèn)的消費(fèi)組ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自動(dòng)提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延時(shí)(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 當(dāng)kafka中沒(méi)有初始o(jì)ffset或offset超出范圍時(shí)將自動(dòng)重置offset
# earliest:重置為分區(qū)中最小的offset;
# latest:重置為分區(qū)中最新的offset(消費(fèi)分區(qū)中新產(chǎn)生的數(shù)據(jù));
# none:只要有一個(gè)分區(qū)不存在已提交的offset,就拋出異常;
spring.kafka.consumer.auto-offset-reset=latest
# 消費(fèi)會(huì)話超時(shí)時(shí)間(超過(guò)這個(gè)時(shí)間consumer沒(méi)有發(fā)送心跳,就會(huì)觸發(fā)rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消費(fèi)請(qǐng)求超時(shí)時(shí)間
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化類
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.alibaba.otter.canal.client.kafka.MessageDeserializer
# 消費(fèi)端監(jiān)聽(tīng)的topic不存在時(shí),項(xiàng)目啟動(dòng)會(huì)報(bào)錯(cuò)(關(guān)掉)
spring.kafka.listener.missing-topics-fatal=false
#過(guò)濾table和字段
table.data = {"sales_order_header":"id,customer_name,total_amount,created_date"}
# 設(shè)置批量消費(fèi)
# spring.kafka.listener.type=batch
# 批量消費(fèi)每次最多消費(fèi)多少條消息

sprint boot kafka 消費(fèi)端代碼如下:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.konka.dsp.kafkaparse.CanalKafkaClientExample;
import com.konka.dsp.kafkaparse.tidb.KafkaMessage;
import com.konka.dsp.kafkaparse.tidb.TicdcEventData;
import com.konka.dsp.kafkaparse.tidb.TicdcEventDecoder;
import com.konka.dsp.kafkaparse.tidb.TicdcEventFilter;
import com.konka.dsp.kafkaparse.tidb.value.TicdcEventDDL;
import com.konka.dsp.kafkaparse.tidb.value.TicdcEventResolve;
import com.konka.dsp.kafkaparse.tidb.value.TicdcEventRowChange;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;

@Component
public class kafkaConsumer {
    protected final static Logger logger  = LoggerFactory.getLogger(CanalKafkaClientExample.class);
    // 消費(fèi)監(jiān)聽(tīng)
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("#{${table.data}}")
    private Map<String,String> map;

    @KafkaListener(topics = {"cdc-test"})
    public void onMessage1(ConsumerRecord<String, Message> consumerRecord) throws UnsupportedEncodingException {
        Message message = consumerRecord.value();
        long batchId = message.getId();
        FlatMessage fm = new FlatMessage();
        List<CanalEntry.Entry> entrys = message.getEntries();
        for (CanalEntry.Entry entry : entrys) {

            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            fm.setId(entry.getHeader().getExecuteTime());
            fm.setDatabase(entry.getHeader().getSchemaName());
            fm.setEs(entry.getHeader().getExecuteTime());
            fm.setTs(entry.getHeader().getExecuteTime());
            fm.setTable(entry.getHeader().getTableName());
            fm.setType(rowChage.getEventType().name());
            CanalEntry.EventType eventType = rowChage.getEventType();
            fm.setIsDdl(rowChage.getIsDdl());
            fm.setSql(rowChage.getSql());
            Map<String,String> mysqlTypes = new HashMap<>();
            Map<String,Integer> sqlType = new HashMap<>();
            List<String> pkNames = new ArrayList<>();
            logger.info(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            String[] filtercolumn = map.get(entry.getHeader().getTableName()).split(",");
            logger.info(" filter --> column {}",filtercolumn);
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    fm.setData(saveRowData(rowData.getBeforeColumnsList(),pkNames,filtercolumn));
                    fm.setMysqlType(setMysqlTypes(rowData.getBeforeColumnsList(),filtercolumn));
                    fm.setSqlType(setSqlTypes(rowData.getBeforeColumnsList(),filtercolumn));
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    fm.setData(saveRowData(rowData.getAfterColumnsList(),pkNames,filtercolumn));
                    fm.setMysqlType(setMysqlTypes(rowData.getAfterColumnsList(),filtercolumn));
                    fm.setSqlType(setSqlTypes(rowData.getAfterColumnsList(),filtercolumn));
                } else {

                    logger.info("-------&gt; before->{}",rowData.getBeforeColumnsList().size());
                    fm.setOld(saveRowData(rowData.getBeforeColumnsList(),pkNames,filtercolumn));
                    logger.info("-------&gt; after");
                    fm.setData(saveRowData(rowData.getAfterColumnsList(),pkNames,filtercolumn));
                    fm.setMysqlType(setMysqlTypes(rowData.getAfterColumnsList(),filtercolumn));
                    fm.setSqlType(setSqlTypes(rowData.getAfterColumnsList(),filtercolumn));
                    if(rowData.getBeforeColumnsList().size()==0&&rowData.getAfterColumnsList().size()>0){
                        fm.setType("INSERT");
                    }
                }
            }
            HashSet h = new HashSet(pkNames);
            pkNames.clear();
            pkNames.addAll(h);
            fm.setPkNames(pkNames);

        }

        logger.info("json解析:{}",JSON.toJSONString(fm, SerializerFeature.WriteMapNullValue));
        kafkaTemplate.send("canal-data",JSON.toJSONString(fm, SerializerFeature.WriteMapNullValue));
//
//        FlatMessage flatMessage = (FlatMessage)JSON.parseObject(flatMessageJson, FlatMessage.class);
        // 消費(fèi)的哪個(gè)topic、partition的消息,打印出消息內(nèi)容
//         KafkaMessage kafkaMessage = new KafkaMessage();
//         kafkaMessage.setKey(consumerRecord.key());
//         kafkaMessage.setValue(consumerRecord.value());
//         kafkaMessage.setOffset(consumerRecord.offset());
//         kafkaMessage.setPartition(consumerRecord.partition());
//         kafkaMessage.setTimestamp(consumerRecord.timestamp());
//         TicdcEventFilter filter = new TicdcEventFilter();
//            TicdcEventDecoder ticdcEventDecoder = new TicdcEventDecoder(kafkaMessage);
//            while (ticdcEventDecoder.hasNext()) {
//                TicdcEventData data = ticdcEventDecoder.next();
//                if (data.getTicdcEventValue() instanceof TicdcEventRowChange) {
//                    boolean ok = filter.check(data.getTicdcEventKey().getTbl(), data.getTicdcEventValue().getKafkaPartition(), data.getTicdcEventKey().getTs());
//                    if (ok) {
//                        // deal with row change event
//                    } else {
//                        // ignore duplicated messages
//                    }
//                } else if (data.getTicdcEventValue() instanceof TicdcEventDDL) {
//                    // deal with ddl event
//                } else if (data.getTicdcEventValue() instanceof TicdcEventResolve) {
//                    filter.resolveEvent(data.getTicdcEventValue().getKafkaPartition(), data.getTicdcEventKey().getTs());
//                    // deal with resolve event
//                }
//                System.out.println(JSON.toJSONString(data, true));
//            }

    }
    private List<Map<String,String>> saveRowData(List<CanalEntry.Column> columns,List<String> pkNames,String[] filter) {
        Map map = new HashMap<>();
        List<Map<String,String>> rowdata = new ArrayList<>();
        columns.forEach(column -> {
            if(column.hasIsKey()){
                pkNames.add(column.getName());
            }
            if(Arrays.asList(filter).contains(column.getName())){
                map.put(column.getName(),column.getValue().equals("")?"NULL":column.getValue());
            }
           //防止flink接收""報(bào)錯(cuò)
        });
        rowdata.add(map);
        return rowdata;
//        rabbitTemplate.convertAndSend(tableEventType.toUpperCase(),JSON.toJSONString(map));
    }

    private Map<String,String> setMysqlTypes(List<CanalEntry.Column> columns,String[] filter){
        Map<String,String> map = new HashMap<>();
        columns.forEach(column -> {
            if(Arrays.asList(filter).contains(column.getName())){
                map.put(column.getName(),column.getMysqlType());
            }

        });
        return map;
    }

    private Map<String,Integer> setSqlTypes(List<CanalEntry.Column> columns,String[] filter){
        Map<String,Integer> map = new HashMap<>();
        columns.forEach(column -> {
            if(Arrays.asList(filter).contains(column.getName())){
                map.put(column.getName(),column.getSqlType());
            }

        });
        return map;
    }


    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

}

這里基本上將 tidb的數(shù)據(jù)轉(zhuǎn)化為canal-json格式數(shù)據(jù),這里我們繼續(xù)將轉(zhuǎn)化后的數(shù)據(jù)發(fā)完kafka,以便kafka 繼續(xù)消費(fèi),這里有個(gè)點(diǎn)就是不知道為什么tidb出來(lái)的insert和update eventtype類型都是UPDATE,所以我在代碼做了判斷沒(méi)有OLD的話基本上就是INSERT了

4.flink 本地開(kāi)發(fā) 建議下載搭建好環(huán)境參考flink table 配置

具體參考官網(wǎng) flinktable配置 把table相關(guān)jar包拷貝到flink下的lib目錄下即可 這里的會(huì)用到另外一個(gè)知乎開(kāi)源的相關(guān)包項(xiàng)目地址如下: https://github.com/pingcap-incubator/TiBigData/ 把項(xiàng)目編譯完成以后把flink相關(guān)jar包拷貝到flink下的lib下 TiDB+FLINK進(jìn)行數(shù)據(jù)實(shí)時(shí)統(tǒng)計(jì)的方法是什么

5 最后在我們的相關(guān)業(yè)務(wù)庫(kù)配置表這里我上代碼了:

import org.apache.flink.api.java.DataSet;
import org.apache.flink.table.api.*;
import org.apache.flink.table.expressions.TimeIntervalUnit;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.*;


public class SalesOrderStream {
    public static Table report(Table transactions) {
        return transactions.select(
                $("customer_name"),
                $("created_date"),
                $("total_amount"))
                .groupBy($("customer_name"), $("created_date"))
                .select(
                        $("customer_name"),
                        $("total_amount").sum().as("total_amount"),
                        $("created_date")
                        );

    }

    public static void main(String[] args) throws Exception {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

//        tEnv.executeSql("CREATE TABLE sales_order_header_stream (\n" +
////                "   id  BIGINT not null,\n" +
//                "    customer_name STRING,\n"+
////                "    dsp_org_name STRING,\n"+
//                "    total_amount      DECIMAL(38,2),\n" +
////                "    total_discount      DECIMAL(16,2),\n" +
////                "    pay_amount      DECIMAL(16,2),\n" +
////                "    total_amount      DECIMAL(16,2),\n" +
//                "    created_date TIMESTAMP(3)\n" +
//                ") WITH (\n" +
//                " 'connector' = 'mysql-cdc',\n" +
//                " 'hostname' = '192.168.8.73',\n" +
//                " 'port' = '4000',\n"+
//                " 'username' = 'flink',\n"+
//                " 'password' = 'flink',\n"+
//                " 'database-name' = 'dspdev',\n"+
//                " 'table-name' = 'sales_order_header'\n"+
//                ")");
        tEnv.executeSql("CREATE TABLE sales_order_header_stream (\n" +
                        " `id` BIGINT,\n"+
                        " `total_amount` DECIMAL(16,2) ,\n"+
                        " `customer_name` STRING,\n"+
                        " `created_date` TIMESTAMP(3) ,\n"+
                        " PRIMARY KEY (`id`) NOT ENFORCED "+
                ") WITH (\n" +
                "'connector' = 'kafka',\n"+
                "'topic' = 'canal-data',\n"+
                "'properties.bootstrap.servers' = '192.168.8.71:9092',\n"+
                "'properties.group.id' = 'test',\n"+
                "'scan.startup.mode' = 'earliest-offset',\n"+
                "'format' = 'canal-json'\n"+
                ")");

        tEnv.executeSql("CREATE TABLE spend_report (\n" +
                "    customer_name STRING,\n" +
//                "    total_amount    DECIMAL(16,2),\n" +
//                "    total_discount  DECIMAL(16,2),\n" +
//                "    pay_amount      DECIMAL(16,2),\n" +
                "    total_amount    DECIMAL(16,2),\n" +
                "    created_date TIMESTAMP(3),\n" +
                "    PRIMARY KEY (customer_name,created_date) NOT ENFORCED" +
                ") WITH (\n" +
                        "  'connector' = 'tidb',\n" +
                        "  'tidb.database.url' = 'jdbc:mysql://192.168.8.73:4000/dspdev',\n" +
                        "  'tidb.username' = 'flink',\n"+
                        "  'tidb.password' = 'flink',\n"+
                        "  'tidb.database.name' = 'dspdev',\n"+
                        "  'tidb.table.name' = 'spend_report'\n"+
                ")");

        Table transactions = tEnv.from("sales_order_header_stream");

        report(transactions).executeInsert("spend_report");
    }
}

這樣在我數(shù)據(jù)庫(kù)里面就可以實(shí)時(shí)統(tǒng)計(jì)當(dāng)前的銷售總價(jià)并寫(xiě)入數(shù)據(jù)庫(kù)里,最后數(shù)據(jù)庫(kù)數(shù)據(jù)如下: TiDB+FLINK進(jìn)行數(shù)據(jù)實(shí)時(shí)統(tǒng)計(jì)的方法是什么 

到此,相信大家對(duì)“TiDB+FLINK進(jìn)行數(shù)據(jù)實(shí)時(shí)統(tǒng)計(jì)的方法是什么”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI