Gordon
Assistant Engineer
Assistant Engineer
  • UID622
  • Fans1
  • Follows0
  • Posts52
Reads:1312Replies:0

For RxJava rookies(1)

Created#
More Posted time:Oct 31, 2016 9:57 AM
What is RxJava?
A library on Java VM for composing asynchronous and event-based programs by using observable sequences.
What advantages does RxJava boast?
1. Concise logic
2. Asynchronization
What can RxJava do?
1. EventBus event center
2. Work with Retrofit for network processing
3. RxBinding...
Initialize RxJava
Let's first look at this lump of code:
new Thread() {
        @Override
        public void run() {
            super.run();
            for (File folder : folders) {
                File[] files = folder.listFiles();
                for (File file : files) {
                    if (file.getName().endsWith(".png")) {
                        final Bitmap bitmap = getBitmapFromFile(file);
                        getActivity().runOnUiThread(new Runnable() {
                            @Override
                            public void run() {
                                imageCollectorView.addImage(bitmap);
                            }
                        });
                    }
                }
            }
        }}.start();


The above shows an image traversing, filtering and loading process. Programmers with some obsessive mindset may suffer because they once again see the code that affects their mood.
How does RxJava achieve it?
Observable.from(folders)
        .flatMap(new Func1<File, Observable<File>>() {
            @Override
            public Observable<File> call(File file) {
                return Observable.from(file.listFiles());
            }
        })
        .filter(new Func1<File, Boolean>() {
            @Override
            public Boolean call(File file) {
                return file.getName().endsWith(".png");
            }
        })
        .map(new Func1<File, Bitmap>() {
            @Override
            public Bitmap call(File file) {
                return getBitmapFromFile(file);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<Bitmap>() {
            @Override
            public void call(Bitmap bitmap) {
                imageCollectorView.addImage(bitmap);
            }
        });


Simmer down, My Lord. I know the code is not reduced, but what I said is simple logic. Isn't the chain structure very clear?
I believe you will see the benefits after an epiphany of five seconds.
Usage and principles
Basic concepts
1. Extended observer pattern
The asynchronous implementation of RxJava is achieved through an extended observer pattern. Object A (the observer) is highly sensitive to some change in Object B (the observable) and needs to make an instant response upon the change in Object B.
2. Four basic concepts about RxJava:
1. Observable
2. Observer
3. Subscribe
4. Events: onNext(), onCompleted() and onError()
Events
onNext(): General events, the event after each event execution.

onCompleted(): The end of an event queue. RxJava not only handles each event separately, but also treats them as a queue. RxJava rules that when there will be no new onNext() event dispatched, the onCompleted() method should be triggered as a mark.

onError(): Event queue exceptions. When an exception occurs during event processing, the onError() event will be triggered and the queue will be stopped automatically at the same time, prohibiting dispatching more events.
Note:
Only one event, either the onCompleted() event or the onError() event, can exist at the same time in a correctly-run event sequence, and it should be the last event in the event sequence. It is worth noting that the onCompleted() and onError() events are also mutually exclusive, that is, if you call one of them in the queue, you cannot call the other.


Basic implementations
Based on the concepts above, there are primarily three basic implementations of RxJava:
1) Create an Observer
The Observer determines the actions when the event is triggered. Implementation methods of the Observer interface in RxJava:
Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }};


Apart from the Observer interface, RxJava also has a built-in abstract class implementing Observer: Subscriber. The Subscriber class makes some extensions on the Observer interface, but their basic usage models are identical:
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }};


Besides, the Observer will always be first converted into a Subscriber for use as a matter of fact in the subscribe process of RxJava. If you only want to use the basic functions, there is no difference for you to select the Observer or the Subscriber. For users, their differences mainly include two aspects:
onStart(): This is a method added by Subscriber class. It will be called when the subscribe process just starts and before the event is dispatched, for some preparation work, such as data clearing or resetting. This method is optional. Its implementation is empty by default. Note that the onStart() event does not apply if you have a requirement on the preparation threads (for example, a pop-up dialog to show the progress, which must be executed in the main thread). This is because it is always invoked by the thread where the subscribe process occurs, and you cannot specify the thread for it. In order to conduct the preparation in the specified thread, you can use the doOnSubscribe() method. Details of its usage can be found in the later part of this article.
unsubscribe(): This is the method of Subscription, another interface implemented by Subscriber, and is used to cancel the subscription. After this method is called, the Subscriber will not receive any more events. Before this method is called, you usually can judge the state using the isUnsubscribed() method. The unsubscribe() method is very important. Because after you call the subscribe() method, the Observable will hold the reference of the Subscriber. If this reference fails to be released in time, the risk of memory leakage may exist. Therefore, it is a good idea to stick to one principle: call unsubscribe() to terminate unused reference relationship where appropriate as soon as possible (such as in onPause() and onStop() methods) to avoid memory leakage.
2) Create an Observable
The Observable determines when to trigger the event and what kind of event to be triggered. RxJava adopts the create() method to create an Observable and defines the event triggering rules for the Observable:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }});


We can see that here an OnSubscribe object is passed in as the parameter. OnSubscribe will be stored in the returned Observable object. It functions as a schedule. When the Observable is subscribed to, the call() method of OnSubscribe will be automatically called, and the event sequence will be triggered in turn as configured (For the code above, the onNext() method of the observer Subscriber will be called three times and the onCompleted() method, once). In this way, the Observable calls the callback method of the Observer, achieving event transmission from the Observable to the Observer, that is, the observer pattern.
This example is quite simple: the event content is a string, rather than some complex objects; the event content is fixed, rather than to be determined like some observer patterns (For example, the result of a network request is unknown before the result is returned); all the events are dispatched in an instant, rather than at certain or uncertain time intervals or triggered by some kind of trigger. In short, this example seems to have no practical value. But it only serves to facilitate the description. In fact, as long as you want, you can write on your own a variety of event dispatching rules. The specific operations will be detailed later, but not now. Only after we clearly explain the basic principles can application at the upper layer be easier to clarify.
The create() method is the most basic method for creating an event sequence in RxJava. Based on this method, RxJava also provides some methods to quickly create event queues, such as:
just(T...): Dispatch the incoming parameters in proper sequence.
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// It will call the following in order:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();


from(T[]) / from(Iterable<? extends T>): Split the array or iterable into specific objects and then dispatch them in proper sequence.
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// It will call the following in order:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();


The above examples of just(T...) and from(T[]) are equivalent to the previous example of create(OnSubscribe).
3) Subscribe
After the Observable and Observer are created, connect them using the subscribe() method, and the whole chain will be able to work. The code is simple in the form:
observable.subscribe(observer);
// Or:
observable.subscribe(subscriber);


Someone may have noticed that the subscribe() method looks a little strange: it seems that “the Observable subscribed to the Observer/Subscriber” instead of “the Observer/Subscriber subscribed to the Observable”. This is like reversing the object relationship, namely “a magazine subscribes to a reader”. This is quite uncomfortable to read. But if we design the API to be observer.subscribe (observable)/subscriber.subscribe (observable), although more in line with the logic of thinking, it affects the stream API design, which is obviously a Pyrrhic victory.
The implementation inside the Observable.subscribe(Subscriber) is like this (core code only):
// Attention: This is not the source code of subscribe(), but the core code after ditching the code related to performance, compatibility and expansibility in the source code.
// To view the source code, you can download it from the GitHub repository of RxJava.
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}


From the code, we can see that the subscribe event mainly performs the following tasks:
1. The onstart preparation operations
2. The call business logic processing, and the business logic processing is not run when the Observable is created. Instead, it is run when the call() method is executed.
3. Return the Subscriber passed in as the Subscription. This is for the convenience of unsubscribe().
The relationships between objects in the entire process are shown in the figure below:


Besides subscribe(Observer) and subscribe(Subscriber), the subscribe() method also supports incompletely-defined callbacks. RxJava will automatically create the Subscriber according to the definition. The code is as follows:
Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

// Automatically create the Subscriber, and define onNext() using onNextAction
observable.subscribe(onNextAction);
// Automatically create the Subscriber, and define onNext() and onError() using onNextAction and onErrorAction
Observable.subscribe (onNextAction, onErrorAction);
// Automatically create the Subscriber, and define onNext(), onError() and onCompleted() using onNextAction, onErrorAction and onCompletedAction.
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);


A simple explanation of the Action1 and Action0 in this code snippet: Action0 is an interface of RxJava and has only one method: the call(). The method has neither parameters nor returned values. Since the onCompleted() method also has no parameters or returned values, Action0 can be treated as one wrapper object to wrap the content of onCompleted() and pass itself as a parameter into subscribe() to implement incompletely-defined callbacks. As a matter of fact, you can also regard it as passing the onCompleted() method into subscribe() as a parameter, equivalent to the “closure” in some other languages. Action1 is also an interface with only one method: call(T param). This method also has no returned values, but it has one parameter. Like Action0, since onNext(T obj) and onError(Throwable error) also have only one parameter with no returned values, Action1 can wrap onNext(obj) and onError(error) and pass them into subscribe() to implement incompletely-defined callbacks. In fact, although Action0 and Action1 are the most widely used APIs, RxJava provides multiple interfaces in the ActionX form (such as Action2 and Action3). They can be used to wrap various methods that have no returned values.
Note: As mentioned earlier, the Observer and Subscriber share the same role and an Observer will be eventually converted to a Subscriber object in the subscribe() process. Therefore, I will refer to the Observer as the Subscriber hereinafter for the sake of rigor.
4) Demo scenarios
I will name two examples below:
In order to expound the principle in a clearer way, examples in this article are all with as simple functions as possible. As a result, some sample code may seem “superfluous”, and you may feel that “the problem is obviously easier to solve without RxJava”. Seeing this, do not jump to a conclusion that RxJava is too long-winded. It is because it is not conducive to the analysis of the principle if we give examples of real scenarios too early. That is why I deliberately selected simple scenarios.
a. Print a string array
Print all the strings in the string array names in order:
String[] names = ...;
Observable.from(names)
    .subscribe(new Action1<String>() {
        @Override
        public void call(String name) {
            Log.d(tag, name);
        }
    });


b. Get the image by ID and display it
Get the image by a specified drawable file ID, drawableRes, and display the image in ImageView, and print inToast to report an error in the case of exceptions:
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
}).subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});


Like the two examples above, create the Observable and the Subscriber, connect them with subscribe(), and a basic usage of RxJava is completed. It is very easy.
In default RxJava rules, the event dispatching and consumption are in the same thread. That is to say, with only the methods above, you are only implementing a synchronous observer pattern. The objective of the observer pattern is to achieve the asynchronous mechanism of “background processing and foreground callbacks”, so asynchronization is crucial for RxJava. To implement asynchronization, another concept in RxJava should be introduced: Scheduler.
Thread control - basics of scheduler
When no thread is specified, RxJava adheres to the principle of no changing of thread, that is: produce events on the thread where the subscribe() is called; and consume events on the thread where the events are produced. If you want to switch the thread, you should use the Scheduler.
1) Basic APIs of Scheduler
In RxJava, the Scheduler serves as a thread controller. RxJava uses it to specify the thread on which every code snippet runs on. RxJava has several built-in schedulers which are applicable to most of the scenarios:

Schedulers.immediate(): It runs directly on the current thread, that is, no thread is specified. This is the default scheduler.

Schedulers.newThread(): It always starts a new thread and executes operations in the new thread.

Schedulers.io(): It is the scheduler used for I/O operations (reads and writes of files, reads and writes of databases, and network information interaction among others). Its behavior pattern is similar to that of the newThread(), only different in that the internal implementation of io() is a thread pool with no upper limit of the thread number, and where idle threads can be reused. So in most cases, the io() is more efficient than the newThread(). Do not place the computing jobs in the io() scheduler to avoid creating unnecessary threads.

Schedulers.computation(): The scheduler used to calculate the schedulers used. The calculation here refers to CPU-intensive computing, that is, it is an operation free from the performance limits by I/O or other operations, such as graphical computing. This scheduler uses a fixed thread pool in a size of the number of CPU. Do not place I/O operations in computation(), otherwise the I/O wait time will cause CPU waste.

In addition, Android also has a dedicated AndroidSchedulers.mainThread(). The operations it specifies will be executed on the Android main thread.
With these several schedulers, you can use the subscribeOn() and observeOn() methods to manage the threads.
subscribeOn(): It specifies the thread for the subscribe() action, that is, the thread of Observable.OnSubscribe when it is activated. It can also be called the thread where the event is produced.

observeOn(): It specifies the thread where the Subscriber is executed. It can also be called the thread where the event is consumed.
Texts are hard to understand anyway, so code should go on the stage:
Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // It specifies that subscribe() occurs in the IO thread.
    .observeOn(AndroidSchedulers.mainThread()) // It specifies that the subscriber callback occurs in the main thread.
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });


In the code snippet above, specified by subscribeOn(Schedulers.io()), the content No 1, 2, 3, and 4 of the created event will be dispatched from the IO thread; specified by observeOn(AndroidScheculers.mainThread()), the subscriber number will be printed in the main thread. As a matter of fact, it is commonly seen to write subscribeOn(Scheduler.io()) and observeOn(AndroidSchedulers.mainThread()) before subscribe(). Such a practice is applicable to most programs with a strategy of “getting data from background threads and displaying data on the main thread”.
In the afore-mentioned example to get an image by the image ID and display it, if the two statements are added:
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
})
.subscribeOn(Schedulers.io()) // It specifies that subscribe() occurs in the IO thread.
.observeOn(AndroidSchedulers.mainThread()) // It specifies that the subscriber callback occurs in the main thread.
.subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});


Loading image will occur in the IO thread, and setting image is set to the main thread. This means that even the image loading takes dozens or hundreds of milliseconds, it won't lead to any stuck interfaces.
2) Principle basics of scheduler
The scheduler API of RxJava is very convenient and miraculous (Add a statement and the thread is switched. How is it achieved? Besides, isn't subscribe() a method directly called by the outermost layer? It can be run on a specified thread, too?). However, the principle of the Scheduler has to be discussed later, because it is based on the principle of the Switching in the next section.
Okay I agree that I actually said nothing in this section. But I added the comments just to assure you that I didn't forget to introduce the principle, but I postpone the discussion to a more appropriate section.
Guest