溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

storm中trident是什么

發(fā)布時(shí)間:2021-12-10 13:46:27 來源:億速云 閱讀:118 作者:小新 欄目:云計(jì)算

這篇文章主要介紹storm中trident是什么,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!

簡介

Storm是一個(gè)實(shí)時(shí)流計(jì)算框架,Trident是對storm的一個(gè)更高層次的抽象,Trident最大的特點(diǎn)以batch的形式處理stream。

     一些最基本的操作函數(shù)有Filter、Function,F(xiàn)ilter可以過濾掉tuple,F(xiàn)unction可以修改tuple內(nèi)容,輸出0或多個(gè)tuple,并能把新增的字段追加到tuple后面。

     聚合有partitionAggregate和Aggregator接口。partitionAggregate對當(dāng)前partition中的tuple進(jìn)行聚合,它不是重定向操作。Aggregator有三個(gè)接口:CombinerAggregator, ReducerAggregator,Aggregator,它們屬于重定向操作,它們會把stream重定向到一個(gè)partition中進(jìn)行聚合操作。

     重定向操作會改變數(shù)據(jù)流向,但不會改變數(shù)據(jù)內(nèi)容,重定向操會產(chǎn)生網(wǎng)絡(luò)傳輸,可能影響一部分效率。而Filter、Function、partitionAggregate則屬于本地操作,不會產(chǎn)生網(wǎng)絡(luò)傳輸。

     GroupBy會根據(jù)指定字段,把整個(gè)stream切分成一個(gè)個(gè)grouped stream,如果在grouped stream上做聚合操作,那么聚合就會發(fā)生在這些grouped stream上而不是整個(gè)batch。如果groupBy后面跟的是aggregator,則是聚合操作,如果跟的是partitionAggregate,則不是聚合操作。

Trident主要有5類操作:

1、作用在本地的操作,不產(chǎn)生網(wǎng)絡(luò)傳輸。

2、對數(shù)據(jù)流的重分布,不改變流的內(nèi)容,但是產(chǎn)生網(wǎng)絡(luò)傳輸。

3、聚合操作,有可能產(chǎn)生網(wǎng)絡(luò)傳輸。

4、作用在分組流(grouped streams)上的操作。

5、Merge和join

partition

概念

partition中文意思是分區(qū),有人將partition理解為Storm里面的task,即并發(fā)的基本執(zhí)行單位。我理解應(yīng)該是像數(shù)據(jù)庫里面的分區(qū),是將一個(gè)batch的數(shù)據(jù)分區(qū),分成多個(gè)partition,或者可以理解為多個(gè)子batch,然后多個(gè)partition可以并發(fā)處理。這里關(guān)鍵的區(qū)別是:partition是數(shù)據(jù),不是執(zhí)行的代碼。你把數(shù)據(jù)(tuple)分區(qū)以后,如果你沒有多個(gè)task(并發(fā)度)來處理這些分區(qū)后的數(shù)據(jù),那分區(qū)也是沒有作用的。所以這里的關(guān)系是這樣的:先有batch,因?yàn)門rident內(nèi)部是基于batch來實(shí)現(xiàn)的;然后有partition;分區(qū)后再分配并發(fā)度,然后才能進(jìn)行并發(fā)處理。并發(fā)度的分配是利用parallelismHint來實(shí)現(xiàn)的。

操作

既然有partition的概念,那么也就有partition的操作。Trident提供的分區(qū)操作,類似于Storm里面講的grouping。分區(qū)操作有:

重分區(qū)操作通過運(yùn)行一個(gè)函數(shù)改變元組在任務(wù)之間的分布,也可以調(diào)整分區(qū)的數(shù)量(比如重分區(qū)之后將并行度調(diào)大),重分區(qū)需要網(wǎng)絡(luò)傳輸?shù)膮⑴c。重分區(qū)函數(shù)包含以下這幾個(gè):

  1. shuffle:使用隨機(jī)輪詢算法在所有目標(biāo)分區(qū)間均勻分配元組;

  2. broadcast:每個(gè)元組復(fù)制到所有的目標(biāo)分區(qū)。這在DRPC中非常有用,例如,需要對每個(gè)分區(qū)的數(shù)據(jù)做一個(gè)stateQuery操作;

  3. partitionBy:接收一些輸入字段,根據(jù)這些字段輸入字段進(jìn)行語義分區(qū)。通過對字段取hash值或者取模來選擇目標(biāo)分區(qū)。partitionBy保證相同的字段一定被分配到相同的目標(biāo)分區(qū);

  4. global:所有的元組分配到相同的分區(qū),該分區(qū)是流種所有batch決定的;

  5. batchGlobal:同一個(gè)batch中的元組被分配到相同的目標(biāo)分區(qū),不同batch的元組有可能被分配到不同的目標(biāo)分區(qū);

  6. partition:接收一個(gè)自定義的分區(qū)函數(shù),自定義分區(qū)函數(shù)需要實(shí)現(xiàn)backtype.storm.grouping.CustomStreamGrouping接口。

注意,除了這里明確提出來的分區(qū)操作,Trident里面還有aggregate()函數(shù)隱含有分區(qū)的操作,它用的是global()操作,這個(gè)在后面接收聚合操作的時(shí)候還會再介紹。

API

each() 方法

     作用:操作batch中的每一個(gè)tuple內(nèi)容,一般與Filter或者Function函數(shù)配合使用。

     下面通過一個(gè)例子來介紹each()方法,假設(shè)我們有一個(gè)FakeTweetsBatchSpout,它會模擬一個(gè)Stream,隨機(jī)產(chǎn)生一個(gè)個(gè)消息。我們可以通過設(shè)置這個(gè)Spout類的構(gòu)造參數(shù)來改變這個(gè)Spout的batch Size的大小。

    1.Filter類:過濾tuple

     一個(gè)通過actor字段過濾消息的Filter:

public static class PerActorTweetsFilter extends BaseFilter {
  String actor;

  public PerActorTweetsFilter(String actor) {
    this.actor = actor;
  }
  @Override
  public boolean isKeep(TridentTuple tuple) {
    return tuple.getString(0).equals(actor);
  }
}

Topology:

topology.newStream("spout", spout)
  .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
  .each(new Fields("actor", "text"), new Utils.PrintFilter());

從上面例子看到,each()方法有一些構(gòu)造參數(shù)

  • 第一個(gè)構(gòu)造參數(shù):作為Field Selector,一個(gè)tuple可能有很多字段,通過設(shè)置Field,我們可以隱藏其它字段,僅僅接收指定的字段(其它字段實(shí)際還在)。

  • 第二個(gè)是一個(gè)Filter:用來過濾掉除actor名叫"dave"外的其它消息。

     2.Function類:加工處理tuple內(nèi)容

     一個(gè)能把tuple中text內(nèi)容變成大寫的Function:

public static class UppercaseFunction extends BaseFunction {
  @Override
  public void execute(TridentTuple tuple, TridentCollector collector) {
    collector.emit(new Values(tuple.getString(0).toUpperCase()));
  }
}

Topology:

topology.newStream("spout", spout)
  .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
  .each(new Fields("text", "actor"), new UppercaseFunction(), new Fields("uppercased_text"))
  .each(new Fields("actor", "text", "uppercased_text"), new Utils.PrintFilter());

首先,UppercaseFunction函數(shù)的輸入是Fields("text", "actor"),其作用是把其中的"text"字段內(nèi)容都變成大寫。

  其次,它比Filter多出一個(gè)輸出字段,作用是每個(gè)tuple在經(jīng)過這個(gè)Function函數(shù)處理后,輸出字段都會被追加到tuple后面,在本例中,執(zhí)行完Function之后的tuple內(nèi)容多了一個(gè)"uppercased_text",并且這個(gè)字段排在最后面。

    3. Field Selector與project

   我們需要注意的是,上面每個(gè)each()方法的第一個(gè)Field字段僅僅是隱藏掉沒有指定的字段內(nèi)容,實(shí)際上被隱藏的字段依然還在tuple中,如果想要徹底丟掉它們,我們就需要用到project()方法。

   投影操作作用是僅保留Stream指定字段的數(shù)據(jù),比如有一個(gè)Stream包含如下字段: [“a”, “b”, “c”, “d”],運(yùn)行如下代碼:

mystream.project(new Fields("b", "d"))

則輸出的流僅包含 [“b”, “d”]字段。

aggregation的介紹

首先聚合操作分兩種:partitionAggregate(),以及aggregate()。

    1.partitionAggregate

partitionAggregate()的操作是在partition上,一個(gè)batch的tuple被分成多個(gè)partition后,每個(gè)partition都會單獨(dú)運(yùn)行partitionAggregate中指定的聚合操作。分區(qū)聚合在一批tuple的每一個(gè)分區(qū)上運(yùn)行一個(gè)函數(shù)。與函數(shù)不同的是,分區(qū)聚合的輸出元組會覆蓋掉輸入元組。請看如下示例:

mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

假設(shè)你有一個(gè)包含a,b兩個(gè)字段的輸入流,元組的分區(qū)情況如下:

Partition 0:
["a", 1]
["b", 2]

Partition 1:
["a", 3]
["c", 8]

Partition 2:
["e", 1]
["d", 9]
["d", 10]

運(yùn)行上面的那一行代碼將會輸出如下的元組,這些元組只包含一個(gè)sum字段: 

Partition 0:
[3]

Partition 1:
[11]

Partition 2:
[20]

    2.aggregate

aggregate()隱含了一個(gè)global分區(qū)操作,也就是它做的是全局聚合操作。它針對的是整個(gè)batch的聚合計(jì)算。

這兩種聚合操作,都可以傳入不同的aggregator實(shí)現(xiàn)具體的聚合任務(wù)。Trident中有三種aggregator接口,分別為:ReducerAggregator,CombinerAggregator,Aggregator。

下面是CombinerAggregator接口的定義:

public interface CombinerAggregator<T> extends Serializable {
    T init(TridentTuple tuple);
    T combine(T val1, T val2);
    T zero();
}

CombinerAggregator返回只有一個(gè)字段的一個(gè)元組。CombinerAggregator在每個(gè)輸入元組上運(yùn)行init函數(shù),然后通過combine函數(shù)聚合結(jié)果值直到只剩下一個(gè)元組。如果分區(qū)中沒有任何元組,CombinerAggregator將返回zero函數(shù)中定義的元組。比如,下面是Count聚合器的實(shí)現(xiàn):

public class Count implements CombinerAggregator<Long> {
    public Long init(TridentTuple tuple) {
        return 1L;
    }

    public Long combine(Long val1, Long val2) {
        return val1 + val2;
    }

    public Long zero() {
        return 0L;
    }
}

ReducerAggregator接口的定義如下:

public interface ReducerAggregator<T> extends Serializable {
    T init();
    T reduce(T curr, TridentTuple tuple);
}

ReducerAggregator通過init函數(shù)得到一個(gè)初始的值,然后對每個(gè)輸入元組調(diào)用reduce方法計(jì)算值,產(chǎn)生一個(gè)元組作為輸出。比如Count的ReducerAggregator實(shí)現(xiàn)如下:

public class Count implements ReducerAggregator<Long> {
    public Long init() {
        return 0L;
    }

    public Long reduce(Long curr, TridentTuple tuple) {
        return curr + 1;
    }
}

最常用的聚合器的接口是Aggregator,它的定義如下:

public interface Aggregator<T> extends Operation {
    T init(Object batchId, TridentCollector collector);
    void aggregate(T state, TridentTuple tuple, TridentCollector collector);
    void complete(T state, TridentCollector collector);
}

Aggregator能夠發(fā)射任意數(shù)量,任意字段的元組。并且可以在執(zhí)行期間的任何時(shí)候發(fā)射元組,它的執(zhí)行流程如下:

  1. 處理batch之前調(diào)用init方法,init函數(shù)的返回值是一個(gè)表示聚合狀態(tài)的對象,該對象會傳遞到aggregate和complete函數(shù);

  2. 每個(gè)在batch分區(qū)中的元組都會調(diào)用aggregate方法,該方法能夠更新聚合狀態(tài)并且發(fā)射元組;

  3. 當(dāng)batch分區(qū)中的所有元組都被aggregate函數(shù)處理完時(shí)調(diào)用complete函數(shù)。

下面是使用Aggregator接口實(shí)現(xiàn)的Count聚合器:

public class CountAgg extends BaseAggregator<CountState> {
    static class CountState {
        long count = 0;
    }

    public CountState init(Object batchId, TridentCollector collector) {
        return new CountState();
    }

    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
        state.count+=1;
    }

    public void complete(CountState state, TridentCollector collector) {
        collector.emit(new Values(state.count));
    }
}

有些時(shí)候,我們需要通知執(zhí)行很多個(gè)聚合器,則可以使用如下的鏈?zhǔn)秸{(diào)用執(zhí)行:

mystream.chainedAgg()
        .partitionAggregate(new Count(), new Fields("count"))
        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
        .chainEnd()

上面的代碼將會在每一個(gè)分區(qū)執(zhí)行Count和Sum聚合器,輸出結(jié)果是包含count和sum兩個(gè)字段的元組。

最重要的區(qū)別是CombinerAggregator,它是先在partition上做partial aggregate,然后再將這些部分聚合結(jié)果通過global分區(qū)到一個(gè)總的分區(qū),在這個(gè)總的分區(qū)上對結(jié)果進(jìn)行匯總。

groupBy()分組操作

首先它包含兩個(gè)操作,一個(gè)是分區(qū)操作,一個(gè)是分組操作。

如果后面是partitionAggregate()的話,就只有分組操作:在每個(gè)partition上分組,分完組后,在每個(gè)分組上進(jìn)行聚合;

如果后面是aggregate()的話,先根據(jù)partitionBy分區(qū),在每個(gè)partition上分組,,分完組后,在每個(gè)分組上進(jìn)行聚合。

parallelismHint并發(fā)度的介紹

它設(shè)置它前面所有操作的并發(fā)度,直到遇到某個(gè)repartition操作為止。

topology.newStream("spout", spout)
      .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
      .parallelismHint(5)
      .each(new Fields("actor", "text"), new Utils.PrintFilter());

意味著:parallelismHit之前的spout,each都是5個(gè)相同的操作一起并發(fā),對,一共有5個(gè)spout同時(shí)發(fā)射數(shù)據(jù),其實(shí)parallelismHint后面的each操作,也是5個(gè)并發(fā)。分區(qū)操作是作為Bolt劃分的分界點(diǎn)的。

如果想單獨(dú)設(shè)置Spout怎么辦?要在Spout之后,Bolt之前增加一個(gè)ParallelismHint,并且還要增加一個(gè)分區(qū)操作:

topology.newStream("spout", spout)
	  .parallelismHint(2)
	  .shuffle()
	  .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
	  .parallelismHint(5)
	  .each(new Fields("actor", "text"), new Utils.PrintFilter());

很多人只是設(shè)置了Spout的并發(fā)度,而沒有調(diào)用分區(qū)操作,這樣是達(dá)不到效果的,因?yàn)門rident是不會自動進(jìn)行分區(qū)操作的。像我之前介紹的,先分區(qū),再設(shè)置并發(fā)度。如果Spout不設(shè)置并發(fā)度,只設(shè)置shuffle,默認(rèn)是1個(gè)并發(fā)度,這樣后面設(shè)置5個(gè)并發(fā)度不會影響到Spout,因?yàn)椴l(fā)度的影響到shuffle分區(qū)操作就停止了。

例子

groupBy+aggregate+parallelismHint

package com.demo;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Values;
import storm.trident.operation.BaseAggregator;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;



public class MyAgg extends BaseAggregator<Map<String, Integer>> {
	

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	
	/**
	 * 屬于哪個(gè)分區(qū)
	 */
	private int partitionId;

	/**
	 * 分區(qū)數(shù)量
	 */
	private int numPartitions;
	private String batchId;
	
	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map conf, TridentOperationContext context) {
		partitionId = context.getPartitionIndex();
		numPartitions = context.numPartitions();
		
	}

	public void aggregate(Map<String, Integer> val, TridentTuple tuple,
			TridentCollector collector) {
		String word = tuple.getString(0);
		Integer value = val.get(word);
		if (value == null) {
			value = 0;
		}
		value++;
		// 把數(shù)據(jù)保存到一個(gè)map對象中
		val.put(word, value);
		System.err.println("I am partition [" + partitionId
				+ "] and I have kept a tweet by: " + numPartitions + " " + word + " " +batchId);
	}

	public void complete(Map<String, Integer> val, TridentCollector collector) {
		collector.emit(new Values(val));
	}

	public Map<String, Integer> init(Object arg0, TridentCollector arg1) {
		this.batchId = arg0.toString();
		return new HashMap<String, Integer>();
	}

}
		FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
				new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));
		spout.setCycle(false);
		TridentTopology tridentTopology = new TridentTopology();
		tridentTopology
				.newStream("spout", spout)
				.shuffle()
				.groupBy(new Fields("sentence"))
				.aggregate(new Fields("sentence"), new MyAgg(),
						new Fields("Map"))
			    .parallelismHint(2)
I am partition [0] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 2:0
I am partition [1] and I have kept a tweet by: 2 d 2:0
I am partition [0] and I have kept a tweet by: 2 e 3:0
I am partition [1] and I have kept a tweet by: 2 f 3:0

groupBy+partitionAggregate+parallelismHint

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
				new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));
		spout.setCycle(false);
		TridentTopology tridentTopology = new TridentTopology();
		tridentTopology
				.newStream("spout", spout)
				.shuffle()
				.groupBy(new Fields("sentence"))
				.partitionAggregate(new Fields("sentence"), new MyAgg(),
						new Fields("Map")))
				.toStream()
			    .parallelismHint(2)
I am partition [0] and I have kept a tweet by: 2 a 1:0
I am partition [1] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 2:0
I am partition [1] and I have kept a tweet by: 2 d 2:0
I am partition [0] and I have kept a tweet by: 2 e 3:0
I am partition [1] and I have kept a tweet by: 2 f 3:0

由于shuffle已經(jīng)把tuple平均分配給5個(gè)partition了,用groupBy+partitionAggregate來聚合又沒有partitionBy分區(qū)的作用,所以,直接在5個(gè)分區(qū)上進(jìn)行聚合,結(jié)果就是每個(gè)分區(qū)各有一個(gè)tuple。

而用groupBy+aggregate,雖然也是shuffle,但是由于具有partitiononBy分區(qū)的作用,值相同的tuple都分配到同一個(gè)分區(qū),結(jié)果就是每個(gè)分區(qū)根據(jù)不同的值來做匯聚。

aggregate+parallelismHint(沒有g(shù)roupBy)

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
				new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));
		spout.setCycle(false);
		TridentTopology tridentTopology = new TridentTopology();
		tridentTopology
				.newStream("spout", spout)
				.shuffle()
				.aggregate(new Fields("sentence"), new MyAgg(),
						new Fields("Map"))
			    .parallelismHint(2)
I am partition [1] and I have kept a tweet by: 2 a 1:0
I am partition [1] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 2:0
I am partition [0] and I have kept a tweet by: 2 d 2:0
I am partition [1] and I have kept a tweet by: 2 e 3:0
I am partition [1] and I have kept a tweet by: 2 f 3:0

partitionAggregate+parallelismHint(沒有g(shù)roupBy操作)

		FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
				new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));
		spout.setCycle(false);
		TridentTopology tridentTopology = new TridentTopology();
		tridentTopology
				.newStream("spout", spout)
				.shuffle()
				.partitionAggregate(new Fields("sentence"), new MyAgg(),
						new Fields("Map"))
				.toStream()
			    .parallelismHint(2)
I am partition [1] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 1:0
I am partition [1] and I have kept a tweet by: 2 a 2:0
I am partition [0] and I have kept a tweet by: 2 d 2:0
I am partition [0] and I have kept a tweet by: 2 e 3:0
I am partition [1] and I have kept a tweet by: 2 f 3:0

我們可以發(fā)現(xiàn),partitionAggregate加上groupBy,或者不加上groupBy,對結(jié)果都一樣:groupBy對于partitionAggregate沒有影響。但是對于aggregate來說,加上groupBy,就不是做全局聚合了,而是對分組做聚合;不加上groupBy,就是做全局聚合。

如果spout設(shè)置并行度,但是沒有加shuffle,不會起作用,分區(qū)默認(rèn)為1,;如果不設(shè)置并行度并且沒有加shuffle,分區(qū)默認(rèn)為1。

Merge和Joins

api的最后一部分便是如何把各種流匯聚到一起。最簡單的方式就是把這些流匯聚成一個(gè)流。我們可以這么做:   

topology.merge(stream1, stream2, stream3);

Trident指定新的合并之后的流中的字段為stream1中的字段。
另一種合并流的方式就是join。一個(gè)標(biāo)準(zhǔn)的join就像是一個(gè)sql,必須有標(biāo)準(zhǔn)的輸入,因此,join只針對符合條件的Stream。join應(yīng)用在來自Spout的每一個(gè)小Batch中。

下面的例子中,stream1流包含key,val1,val2三個(gè)字段,stream2流包含x,val1兩個(gè)字段:

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));

stream1流的key字段與stream2流的x字段組join操作,另外,Trident要求所有新流的輸出字段被重命名,因?yàn)檩斎肓骺赡馨嗤淖侄蚊Q。連接流發(fā)射的元組將會包含:

  1. 連接字段的列表。在上面的例子中,字段key對應(yīng)stream1的key,stream2的x;

  2. 來自所有流的所有非連接字段的列表,按照傳遞到連接方法的順序排序。在上面的例子中,字段a與字段b對應(yīng)stream1的val1和val2,c對應(yīng)于stream2的val1.

     當(dāng)join的是來源于不同Spout的stream時(shí),這些Spout在發(fā)射數(shù)據(jù)時(shí)需要同步,一個(gè)Batch所包含的tuple會來自各個(gè)Spout。   

以上是“storm中trident是什么”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI