溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

使用RxJava中遇到的一些”坑“

發(fā)布時(shí)間:2020-09-24 21:42:10 來源:腳本之家 閱讀:192 作者:lshw4320814 欄目:移動(dòng)開發(fā)

前言

大家越用RxJava,越覺得它好用,所以不知不覺地發(fā)現(xiàn)代碼里到處都是RxJava的身影。然而,RxJava也不是銀彈,其中仍然有很多問題需要解決。這里,我簡單地總結(jié)一下自己遇到的一些“坑”,內(nèi)容上可能會(huì)比較松散。

一、考慮主線程的切換

RxJava中一個(gè)常用的使用方法是——在其他線程中做處理,然后切換到UI線程中去更新頁面。其中,線程切換就是使用了observeOn()。后臺(tái)下載文件,前臺(tái)顯示下載進(jìn)度就可以使用這種方式完成。然而,實(shí)踐發(fā)現(xiàn)這其中有坑。如果文件比較大,而下載包的粒度又比較小,這將導(dǎo)致很多通知積壓下來,最終導(dǎo)致錯(cuò)誤。

這種錯(cuò)誤其實(shí)也是可以理解的,畢竟MainLooper是根據(jù)Message來工作的,Message過多必然會(huì)導(dǎo)致一些問題。當(dāng)然,這還是比較想當(dāng)然的想法,最終還是需要到源碼中一探究竟。ObserveOn的原理在前面關(guān)于RxJava的文章已經(jīng)有過分析,這里還是簡單列一下代碼。其中的重點(diǎn)還是OperatorObserveOn的內(nèi)部類——ObserveOnSubscriber。其重要代碼片段如下:

 /** Observe through individual queue per observer. */
 static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
  final Subscriber<? super T> child;
  final Scheduler.Worker recursiveScheduler;
  final NotificationLite<T> on;
  final boolean delayError;
  final Queue<Object> queue;
  /** The emission threshold that should trigger a replenishing request. */
  final int limit;

  // the status of the current stream
  volatile boolean finished;

  final AtomicLong requested = new AtomicLong();

  final AtomicLong counter = new AtomicLong();

  /**
   * The single exception if not null, should be written before setting finished (release) and read after
   * reading finished (acquire).
   */
  Throwable error;

  /** Remembers how many elements have been emitted before the requests run out. */
  long emitted;

  // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
  // not prevent anything downstream from consuming, which will happen if the Subscription is chained
  public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
   this.child = child;
   this.recursiveScheduler = scheduler.createWorker();
   this.delayError = delayError;
   this.on = NotificationLite.instance();
   int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
   // this formula calculates the 75% of the bufferSize, rounded up to the next integer
   this.limit = calculatedSize - (calculatedSize >> 2);
   if (UnsafeAccess.isUnsafeAvailable()) {
    queue = new SpscArrayQueue<Object>(calculatedSize);
   } else {
    queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
   }
   // signal that this is an async operator capable of receiving this many
   request(calculatedSize);
  }

  void init() {
   // don't want this code in the constructor because `this` can escape through the
   // setProducer call
   Subscriber<? super T> localChild = child;

   localChild.setProducer(new Producer() {

    @Override
    public void request(long n) {
     if (n > 0L) {
      BackpressureUtils.getAndAddRequest(requested, n);
      schedule();
     }
    }

   });
   localChild.add(recursiveScheduler);
   localChild.add(this);
  }

  @Override
  public void onNext(final T t) {
   if (isUnsubscribed() || finished) {
    return;
   }
   if (!queue.offer(on.next(t))) {
    onError(new MissingBackpressureException());
    return;
   }
   schedule();
  }

  @Override
  public void onCompleted() {
   if (isUnsubscribed() || finished) {
    return;
   }
   finished = true;
   schedule();
  }

  @Override
  public void onError(final Throwable e) {
   if (isUnsubscribed() || finished) {
    RxJavaHooks.onError(e);
    return;
   }
   error = e;
   finished = true;
   schedule();
  }

  protected void schedule() {
   if (counter.getAndIncrement() == 0) {
    recursiveScheduler.schedule(this);
   }
  }
 }

關(guān)鍵點(diǎn)就在于這個(gè)queue成員,這個(gè)隊(duì)列存放了需要進(jìn)行發(fā)送給下行線程的消息。對于主線程來說,符合其實(shí)是比較重的,從消息的生產(chǎn)者和消費(fèi)者的模式講,過多過快的消息會(huì)導(dǎo)致消息阻塞。甚至,都到不了阻塞的情況,因?yàn)閝ueue的大小會(huì)有上限,在onNext()方法中的queue.offer()可能會(huì)產(chǎn)生異常,這取決于queue的實(shí)現(xiàn)方式。但無論如何都不可能無限大,所以無法保證絕對不出異常。

解決這個(gè)問題的方法其實(shí)也很簡單,可以在生產(chǎn)者降低消息的產(chǎn)生頻率。也可以在消息處理的時(shí)候先不進(jìn)行線程切換,而是通過判斷,在必要的時(shí)候進(jìn)行線程切換,比如使用runOnUIThread() 。

二、RxJava避免內(nèi)存泄漏

RxJava的響應(yīng)式機(jī)制本質(zhì)上還是回調(diào)實(shí)現(xiàn)的,因此內(nèi)存泄漏也是會(huì)出現(xiàn)的。倘若不對Subscription進(jìn)行管理,內(nèi)存泄漏會(huì)非常嚴(yán)重。對于Subscription,其實(shí)有幾個(gè)比較廣泛使用的方法,比如RxLifecycle,以及簡單的CompositeSubscription。至于它們的使用方法,其實(shí)都非常簡單,這里就不贅述了。

說到內(nèi)存泄漏,就談點(diǎn)題外話,動(dòng)畫也可能導(dǎo)致內(nèi)存泄漏。其原因仍然是一些回調(diào)函數(shù),這些回調(diào)函數(shù)實(shí)現(xiàn)的View變化的功能,但是在被撤銷以后,回調(diào)函數(shù)沒有取消掉,同時(shí)View可能持有Context信息,從而導(dǎo)致內(nèi)存泄漏。最近才發(fā)現(xiàn),LoadToastView這個(gè)開源庫一直存在內(nèi)存泄漏,其原因正如上文所說。

總結(jié)

以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作能帶來一定的幫助,如果有疑問大家可以留言交流,謝謝大家對億速云的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI