溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Rxjava功能操作符的使用方法詳解

發(fā)布時(shí)間:2020-09-10 16:20:00 來源:腳本之家 閱讀:153 作者:Genten程澤翔 欄目:編程語言

Rxjava功能個(gè)人感覺很好用,里面的一些操作符很方便,Rxjava有:被觀察者,觀察者,訂閱者,

被觀察者通過訂閱者訂閱觀察者,從而實(shí)現(xiàn)觀察者監(jiān)聽被觀察者返回的數(shù)據(jù)

下面把Rxjava常用的模型代碼列出來,還有一些操作符的運(yùn)用:

依賴:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
  compile 'io.reactivex.rxjava2:rxjava:2.1.5'

這個(gè)是另一種解析數(shù)據(jù)的方法,阿里巴巴旗下的,聽說是解析最快的解析器。。。。

compile 'com.alibaba:fastjson:1.2.39'
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.view.View;
import android.widget.TextView;
 
import com.alibaba.fastjson.JSONObject;
 
import java.io.IOException;
import java.util.concurrent.TimeUnit;
 
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
 
public class MainActivity extends AppCompatActivity {
 
  private TextView name;
 
  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
 
    name = (TextView) findViewById(R.id.name);
    //用來調(diào)用下面的方法,監(jiān)聽。
    name.setOnClickListener(new View.OnClickListener() {
      @Override
      public void onClick(View v) {
 
        interval();
      }
    });
  }
 
  //例1:Observer
  public void observer() {
    //觀察者
    Observer<string> observer = new Observer<string>() {
      @Override
      public void onSubscribe(@NonNull Disposable d) {
 
      }
      @Override
      public void onNext(@NonNull String s) {
        //接收從被觀察者中返回的數(shù)據(jù)
        System.out.println("onNext :" + s);
      }
      @Override
      public void onError(@NonNull Throwable e) {
 
      }
      @Override
      public void onComplete() {
 
      }
    };
    //被觀察者
    Observable<string> observable = new Observable<string>() {
      @Override
      protected void subscribeActual(Observer<!--? super String--> observer) {
        observer.onNext("11111");
        observer.onNext("22222");
        observer.onComplete();
      }
    };
    //產(chǎn)生了訂閱
    observable.subscribe(observer);
  }
 
  //例2:Flowable
  private void flowable(){
    //被觀察者
    Flowable.create(new FlowableOnSubscribe<string>() {
      @Override
      public void subscribe(@NonNull FlowableEmitter<string> e) throws Exception {
        for (int i = 0; i < 100; i++) {
          e.onNext(i+"");
        }
      }
      //背壓的策略,buffer緩沖區(qū)        觀察者
      //背壓一共給了五種策略
      // BUFFER、
      // DROP、打印前128個(gè),后面的刪除
      // ERROR、
      // LATEST、打印前128個(gè)和最后一個(gè),其余刪除
      // MISSING
      //這里的策略若不是BUFFER 那么,會(huì)出現(xiàn)著名的:MissingBackpressureException錯(cuò)誤
    }, BackpressureStrategy.BUFFER).subscribe(new Consumer<string>() {
      @Override
      public void accept(String s) throws Exception {
        System.out.println("subscribe accept"+s);
        Thread.sleep(1000);
      }
    });
  }
 
  //例3:線程調(diào)度器 Scheduler
  public void flowable1(){
    Flowable.create(new FlowableOnSubscribe<string>() {
      @Override
      public void subscribe(@NonNull FlowableEmitter<string> e) throws Exception {
        for (int i = 0; i < 100; i++) {
          //輸出在哪個(gè)線程
          System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());
          e.onNext(i+"");
        }
      }
    },BackpressureStrategy.BUFFER)
        //被觀察者一般放在子線程
        .subscribeOn(Schedulers.io())
        //觀察者一般放在主線程
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<string>() {
          @Override
          public void accept(String s) throws Exception {
            System.out.println("s"+ s);
            Thread.sleep(100);
            //輸出在哪個(gè)線程
            System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());
          }
        });
  }
 
 
  //例4:http請(qǐng)求網(wǎng)絡(luò),map轉(zhuǎn)化器,fastjson解析器
  public void map1(){
    Observable.create(new ObservableOnSubscribe<string>() {
      @Override
      public void subscribe(@NonNull final ObservableEmitter<string> e) throws Exception {
        OkHttpClient client = new OkHttpClient();
        Request request = new Request.Builder()
            .url("https://qhb.2dyt.com/Bwei/login")
            .build();
        client.newCall(request).enqueue(new Callback() {
          @Override
          public void onFailure(Call call, IOException e) {
 
          }
 
          @Override
          public void onResponse(Call call, Response response) throws IOException {
            String result = response.body().string();
            e.onNext(result);
          }
        });
      }
    })
        //map轉(zhuǎn)換器 flatmap(無序),concatmap(有序)
        .map(new Function<string, bean="">() {
      @Override
      public Bean apply(@NonNull String s) throws Exception {
        //用fastjson來解析數(shù)據(jù)
        return JSONObject.parseObject(s,Bean.class);
      }
    }).subscribe(new Consumer<bean>() {
      @Override
      public void accept(Bean bean) throws Exception {
        System.out.println("bean = "+ bean.toString() );
      }
    });
  }
 
  //常見rxjava操作符
  //例 定時(shí)發(fā)送消息
  public void interval(){
    Observable.interval(2,1, TimeUnit.SECONDS)
        .take(10)
        .subscribe(new Consumer<long>() {
          @Override
          public void accept(Long aLong) throws Exception {
            System.out.println("aLong = " + aLong);
          }
        });
  }
 
 
  //例 zip字符串合并
  public void zip(){
    Observable observable1 = Observable.create(new ObservableOnSubscribe<string>() {
      @Override
      public void subscribe(@NonNull ObservableEmitter<string> e) throws Exception {
        e.onNext("1");
        e.onNext("2");
        e.onNext("3");
        e.onNext("4");
        e.onComplete();
 
      }
    });
    Observable observable2 = Observable.create(new ObservableOnSubscribe<string>() {
      @Override
      public void subscribe(@NonNull ObservableEmitter<string> e) throws Exception {
        e.onNext("A");
        e.onNext("B");
        e.onNext("C");
        e.onNext("D");
        e.onComplete();
      }
    });
 
    Observable.zip(observable1, observable2, new BiFunction<string,string,string>() {
      @Override
      public String apply(@NonNull String o, @NonNull String o2) throws Exception {
        return o + o2;
      }
    }).subscribe(new Consumer<string>() {
      @Override
      public void accept(String o) throws Exception {
        System.out.println("o"+ o);
      }
    });
  }

總結(jié)

以上就是本文關(guān)于Rxjava功能操作符的使用方法詳解的全部?jī)?nèi)容,希望對(duì)大家有所幫助。感興趣的朋友可以繼續(xù)參閱本站:Javaweb應(yīng)用使用限流處理大量的并發(fā)請(qǐng)求詳解、分享一個(gè)簡(jiǎn)單的java爬蟲框架、Java線程之線程同步synchronized和volatile詳解等,有什么問題可以隨時(shí)留言,小編會(huì)及時(shí)回復(fù)大家的。感謝朋友們對(duì)本站的支持!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI