入坑rxjava:什么是RxJava?

入坑rxjava系列第一章

什么是RxJava?

来自官网的介绍:

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

Rx全称Reactive Extensions,一个异步,基于事件的流式编程库。

Rx是一种基于观察者模式的编程模型,我们使用它的主要目的就是通过采用一致的编程接口来处理异步数据流。

想要明白Rx,首先我们就要明白观察者模式。

什么是观察者模式?

观察者模式(Observer Pattern):定义对象间的一种一对多依赖关系,使得每当一个对象状态发生改变时,其相关依赖对象皆得到通知并被自动更新。观察者模式又叫做发布-订阅(Publish/Subscribe)模式、模型-视图(Model/View)模式、源-监听器(Source/Listener)模式或从属者(Dependents)模式。

观察者模式是一种对象行为型模式。

更详细的说明请点我

举个日常的例子:

去餐厅吃饭的时候。客人就是被观察者,而周围的服务员就是观察者。服务员不需要时刻盯准你一个人,而是当你进店坐下的时候,服务员就知道有新的客人来了,此时就相当于服务员注册(Register)/订阅(Subscribe)新来的客人,而当客人需要服务的时候(比如:下单)就去通知服务员过来。

Talk is cheap,so…:

  1. 自己实现观察者模式:

    public interface Watcher {
        void update(String str);
    }
    
    public class Waiter implements Watcher {
    
        private String name;
    
        public Waiter(String name){
            this.name = name;
        }
    
        @Override
        public void update(String str) {
            System.out.println("Waiter-" + name + ": " + str);
        }
    }
    
    public interface Watched {
        void addWatcher(Watcher watcher);
    
        void removeWatcher(Watcher watcher);
    
        void notifyWatchers(String str);
    }
    
    public class Customer implements Watched {
        private String name;
        private List<Watcher> watchers = new ArrayList<Watcher>();
    
        public Customer(String name){
            this.name = name;
        }
    
        @Override
        public void addWatcher(Watcher watcher) {
            watchers.add(watcher);
        }
    
        @Override
        public void removeWatcher(Watcher watcher) {
            watchers.remove(watcher);
        }
    
        @Override
        public void notifyWatchers(String str) {
            for (Watcher watcher : watchers) {
                watcher.update("Customer-" + name + "-" + str);
            }
        }
    }
    
    public static void main(String[] var0) {
            Watched watched = new Customer("Rabtman");
            Watcher A = new Waiter("A");
            Watcher B = new Waiter("B");
    
            watched.addWatcher(A);
            watched.addWatcher(B);
    
            watched.notifyWatchers("order dishes!");
        }
    

    运行结果:

    Waiter-A: Customer-Rabtman-order dishes!
    Waiter-B: Customer-Rabtman-order dishes!

  2. 通过Java Api实现:

    public class Customer extends Observable {
        private String name;
        private String order = "";
    
        public Customer(String name){
            this.name = name;
        }
    
        public String getOrder() {
            return order;
        }
    
        public void call(String str) {
            if (str != null && str.length() > 0) {
                this.order = "Customer-" + name + "-" + str;
                setChanged();    //发生变化
                notifyObservers();   //通知观察者
            }
        }
    }
    
    public class Waiter implements Observer {
    
        private String name;
    
        public Waiter(String name, Customer observable) {
            this.name = name;
            observable.addObserver(this);
        }
    
        @Override
        public void update(Observable o, Object arg) {
            System.out.println("Waiter-" + name + ": " + ((Customer) o).getOrder());
        }
    }
    
    public static void main(String[] var0) {
            Customer customer = new Customer("Rabtman");
            Waiter C = new Waiter("C", customer);
            Waiter D = new Waiter("D", customer);
    
            customer.call("order dishes!!");
        }
    

    运行结果:

    Waiter-D: Customer-Rabtman-order dishes!!
    Waiter-C: Customer-Rabtman-order dishes!!

为什么用RxJava?

RxJava可以为我们带来:

  • 流式编程,使得代码逻辑更简洁明了
  • 方便快捷的线程调度,解决了万恶的回调地狱
  • 统一的错误处理
  • 强大的操作符简化复杂的操作

这里我引用一个扔物线的例子:

假设有这样一个需求:界面上有一个自定义的视图 imageCollectorView ,它的作用是显示多张图片,并能使用 addImage(Bitmap) 方法来任意增加显示的图片。现在需要程序将一个给出的目录数组 File[] folders 中每个目录下的 png 图片都加载出来并显示在 imageCollectorView 中。需要注意的是,由于读取图片的这一过程较为耗时,需要放在后台执行,而图片的显示则必须在 UI 线程执行。

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();

而如果使用 RxJava ,实现方式是这样的:

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

虽然上面的例子看起来代码不减反增,但是其实仔细观察可以发现Rx的实现至上而下,一气呵成的链式调用,完美解决了回调地狱的问题,使得代码在逻辑结构上清晰明了,妈妈再也不用担心还有更复杂的需求了!

结合Retrolambda食用效果更佳:

Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

怎么用RxJava?

基本概念

首先我们需要知道RxJava的基础概念:

  1. Observable:被观察者
  2. Observer:观察者
  3. Subscriber:继承于Observer,对其进行了扩展
  4. subscribe:订阅

添加依赖

compile 'io.reactivex:rxjava:1.1.6'
//android上使用还要添加这个
compile 'io.reactivex:rxandroid:1.2.1'

基本使用

我们来把最初的那个例子用rxjava来实现一下:

  1. 创建被观察者(顾客):

    Observable<String> customer = Observable.create(
                    new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            subscriber.onNext("Customer-Rabtman-order dishes!!!");
                            subscriber.onCompleted();
                        }
                    }
            );
    
  2. 创建观察者(服务员):

    其中ObserverSubscriber的基本使用方式是完全一致的

        Observer<String> E = new Observer<String>() {
            @Override
            public void onCompleted() {
    
            }
    
            @Override
            public void onError(Throwable e) {
    
            }
    
            @Override
            public void onNext(String s) {
                System.out.println("Waiter-E: " + s);
            }
        };
    
        Subscriber<String> F = new Subscriber<String>() {
            @Override
            public void onCompleted() {
    
            }
    
            @Override
            public void onError(Throwable e) {
    
            }
    
            @Override
            public void onNext(String s) {
                System.out.println("Waiter-F: " + s);
            }
        };
    
  3. 订阅:

    customer.subscribe(E);
    customer.subscribe(F);
    
  4. 运行结果:

    Waiter-E: Customer-Rabtman-order dishes!!!
    Waiter-F: Customer-Rabtman-order dishes!!!

代码都在demo中,请点传送门

至此,我们完成了对RxJava的一些初步认识。对于操作符,线程切换等操作具体可以到官网查阅,或者这个中文翻译文档,图文并茂已经写得相当详细。

感谢

参考文章:给 Android 开发者的 RxJava 详解

版权声明:

本文标题:入坑rxjava:什么是RxJava?

作者:Rabtman

原始链接:https://rabtman.com/2017/02/13/zero_rx_introduce/

本文采用署名-非商业性使用-禁止演绎4.0进行许可。

非商业转载请保留以上信息。商业转载请联系作者本人。