Gordon
Assistant Engineer
Assistant Engineer
  • UID622
  • Fans1
  • Follows0
  • Posts52
Reads:814Replies:0

For RxJava rookies(2)

Created#
More Posted time:Oct 31, 2016 10:21 AM
Switching
RxJava supports switching the sequence of events, which is one of its core functions and also the biggest reason for most people to comment that “RxJava is so good.” The so-called switching refers to process the objects in an event sequence or the entire sequence, and transform the event sequence into one with different events or into a different event sequence. The concept is difficult to comprehend. Let's look at the API.
1) API
First, let's look at an example of map():
Observable.just(”images/logo.png”) // The input type: String
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String filePath) { // Parameter type: String
            return getBitmapFromPath(filePath); // Returned type: Bitmap
        }
    })
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) { // Parameter type: Bitmap
            showBitmap(bitmap);
        }
    });


Here a class called Func1 appears. It is very similar to Action1 and is also an interface of RxJava. It is used to wrap the methods with only one parameter. The difference between the Func1 and Action lies in that Func1 wraps the methods with returned values. In addition, like ActionX, there are also several FuncX interfaces, used for methods with different number of parameters. The difference between the FuncX and ActionX lies in that FuncX wraps the methods with returned values.
We can see that the map() method transforms the String object in the parameter into a Bitmap object and returns it. After the processing of the map() method, the parameter type of the event is transformed from String to Bitmap. Such transformation that directly converts and returns the object is the most common form of transformation and the easiest to understand. But the transformation in RxJava is far from only this type. The transformation in RxJava can not only target the event objects, but also can target the entire event queue, which makes RxJava quite flexible. I will list several frequently-used transformations:
map(): The direct transformation of event objects. The specific functions have been introduced earlier. It is the most common transformation in RxJava.
A map() diagram:



flatMap(): This is a very useful but very hard-to-understand transformation. For this reason, I decided to spend a good page talking about it. First, suppose that there is a requirement: there is a data structure “student”, and now you need to print out the names of a group of students. The implementation method is very simple:
Student[] students = ...;
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onNext(String name) {
            Log.d(tag, name);
        }
        ...
    };
    Observable.from(students)
        .map(new Func1<Student, String>() {
            @Override
            public String call(Student student) {
                return student.getName();
            }
        })
        .subscribe(subscriber);


Very easy. Then what if we need to print out the names of all the courses that every student needs to take? (The difference in the requirement lies in that every student has only one name, but many courses.) First, we can implement it like this:
Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
    @Override
    public void onNext(Student student) {
        List<Course> courses = student.getCourses();
        for (int i = 0; i < courses.size(); i++) {
            Course course = courses.get(i);
            Log.d(tag, course.getName());
        }
    }
    ...
};
Observable.from(students)
    .subscribe(subscriber);


Still very easy. But what if I don't want to use the for loop in the Subscriber, but want to pass in single Course objects directly in the Subscriber (this is vital for code reusage)? The map() method won't work obviously, because map() initiates one-to-one transformation, but what I want now is a one-to-many transformation. So how can I transform a Student into multiple Courses?
You will need flatMap() this time:
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);


From the code above, we can see that flatMap() shares one feature with map(): it also converts the incoming parameter and returns another object. But you should be aware that different from map(), the flatMap() method returns an Observable object, and this Observable object is not directly dispatched to the callback method of the Subscriber.
The principle of flatMap() is like this:
1. Create an Observable object using the incoming event object;
2. Activate, instead of dispatching, the Observable which will then start to dispatch events;
3. Every event dispatched by the created Observable feeds into the same Observable which is responsible for delivering these events to the callback method of the Subscriber in a uniform way. The three steps split the event into two levels and “flatten” and dispatch the initial object through a group of newly created Observables following a uniform path. This “flattening” is the so-called “flat” of flatMap().
A flatMap() diagram:



Extended: Because of its capability of adding asynchronous code in the embedded Observable, flatMap() is also often used in embedded asynchronous operations, such as embedded network requests. The sample code (Retrofit + RxJava):
networkClient.token() // Returns Observable<String>. It requests the token at the subscription and sends the token after the response.
    .flatMap(new Func1<String, Observable<Messages>>() {
        @Override
        public Observable<Messages> call(String token) {
            // Returns Observable<Messages>. It requests the message list at the subscription and sends the requested message list after the response.
            return networkClient.messages();
        }
    })
    .subscribe(new Action1<Messages>() {
        @Override
        public void call(Messages messages) {
            // Processes the displayed message list
            showMessages(messages);
        }
    });


The traditional embedded requests require the use of embedded callbacks for implementation. Through flatMap(), you can write the embedded requests in the same chain to maintain clear program logic.
throttleFirst(): Discard the new event after a certain time period after every event triggering. It is often used for dejitter filtering, such as the click monitor of buttons:

RxView.clickEvents(button) // RxBinding code which will be explained in later articles.
    .throttleFirst(500, TimeUnit.MILLISECONDS) // Set the dejittering interval to 500ms.
    .subscribe(subscriber);


Wow. You don’t have to worry about opening two identical pages because of a trembling hand.
In addition, RxJava also provides many convenient ways to implement event sequence switching, which I will not list them all here.
2) Principle of transformation: lift()
These transformation operations, although varying in function, are all about processing and re-dispatching the event sequence in essence. While inside RxJava, they are all based on the same basic transformation method: lift(Operator). First, let's take a look at the internal implementation of lift() (core code only):
// Attention: This is not the source code of lift(), but the core code after ditching the code related to performance, compatibility and expansibility in the source code.
// To view the source code, you can download it from the GitHub repository of RxJava.
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);
        }
    });
}


This code snippet is very interesting: it generates a new Observable and returns it. What's more, the implementation in the callback method call() of the OnSubscribe parameter used for creating new Observables seems the same with the Observable.subscribe() discussed earlier. But they are not the same actually. The key difference is in the object of the onSubscribe in onSubscribe.call(subscriber) in the second line. (Alert: The following sentences may make you feel terrible):
In the subscribe() statement, the onSubscribe refers to the onSubscribe object in the Observable. No problem with this. But things start to get complicated after lift().

When lift() is contained:
1. After lift() creates an Observable, there will be two Observables, with the original Observable counted in;
2. Similarly, there will be two OnSubscribe: the new OnSubscribe in the new Observable and the original OnSubscribe in the original Observable;
3. When you call the subscribe() of the Observable processed by lift(), you are using the new Observable returned by the lift(), so the onSubscribe.call(subscriber) it triggers also uses the new OnSubscribe in the new Observable, that is, the OnSubscribe generated in lift();
4. The onSubscribe in the call() method of this new OnSubscribe refers to the original OnSubscribe in the original Observable. In this call() method, the new OnSubscribe uses operator.call(subscriber) to generate a new Subscriber (It is here that the operator associates the new Subscriber with the original Subscriber through its own call() method and inserts its “transformation” code to achieve a transformation), and then utilizes this new Subscriber to subscribe to the original Observable.
In this way, the lift() process is implemented. It is a little like the agent mechanism, implementing event sequent transformation through event interception and processing.

The process can be summarized into this: After the Observable executes the lift(Operator) method, it will return a new Observable which will act like an agent to receive the events dispatched by the original Observable and send the processed events to the Subscriber.
A diagram is provided below, if you prefer concrete thinking:
A diagram of lift() principle


Two and more times of lift() processes follow the same principle, as shown below:


Below is the implementation of a specific operator. In the example below, the Integer object in the event is converted to a String object, just for reference:
observable.lift(new Observable.Operator<String, Integer>() {
    @Override
    public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
        // Transforms the Integer object in the event sequence to a String object.
        return new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                subscriber.onNext("" + integer);
            }

            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }
        };
    }
});


I explained the principle of lift() to help you better understand and use RxJava. No matter whether you have understood the principle of lift(), RxJava does not recommend developers to customize the operator to use lift() directly. Instead, it recommends them to use the combinations of existing lift() wrapping methods (such as map() and flatMap()) to meet the requirements. This is because using lift() directly easily causes some errors that are hard to detect.

3) Compose: An overall transformation of the Observable
Apart from lift(), the Observable has another transformation method called compose(Transformer). It is different from lift() in that lift() targets event items and event sequences, while compose() targets the transformation of the Observable itself. For example, suppose there are multiple Observables in a program, and they all need to apply the same group of lift() transformation. You can write the code like this:
observable1
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);
observable2
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber2);
observable3
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber3);
observable4
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);


But you thought the code was far from the software engineering style, so you changed it like this:
private Observable liftAll(Observable observable) {
    return observable
        .lift1()
        .lift2()
        .lift3()
        .lift4();
}
...
liftAll(observable1).subscribe(subscriber1);
liftAll(observable2).subscribe(subscriber2);
liftAll(observable3).subscribe(subscriber3);
liftAll(observable4).subscribe(subscriber4);


The readability and maintainability are both improved. However, the Observable is wrapped by a method. This has added some restrictions on the flexibility of Observables. How can we solve this problem? At this time, the compose() should play its role:
public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
    @Override
    public Observable<String> call(Observable<Integer> observable) {
        return observable
            .lift1()
            .lift2()
            .lift3()
            .lift4();
    }
}
...
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);


In the code above, leveraging the compose() method, the Observable can utilize the call method of the incoming Transformer object to process itself directly without being wrapped in the method.
The principle of compose() is comparatively easy and I won't illustrate it with a diagram here.
Thread control: advanced features of scheduler
Apart from flexible transformation, RxJava has another awesome feature - the free control over threads.
1) Advanced APIs of Scheduler
Previously we have discussed that subscribeOn() and observeOn() can be used in combination to achieve thread control so that the production and consumption of events can occur in different threads. But after learning about the transformation methods of map() and flatMap(), some prying people (well, it is me, when I first started using RxJava) may ask: Can I switch the thread several more times?
The answer is: Yes. Because the observeOn() specifies the thread for the Subscriber, while this Subscriber is not (or is not necessarily, to be strict, but here we can understand it as “is not”) the Subscriber in the subscribe() parameter, but the Subscriber corresponding to the current Observable in the observeOn() execution, that is, its direct subordinate Subscriber. In other words, the observeOn() specifies the thread for its subsequent operations. As a result, if you want to switch the threads multiple times, you only need to call observeOn() once at every location where you want to switch the thread. The code is as follows:
Observable.just(1, 2, 3, 4) // IO thread, specified by the subscribeOn().
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // New thread, specified by the observeOn().
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO thread, specified by the observeOn().
    .observeOn(AndroidSchedulers.mainThread)
    .subscribe(subscriber);  // Android main thread, specified by the observeOn().


As shown above, through multiple calls of observeOn(), the program achieves multiple switches of threads.
However, unlike the observeOn(), the location of subscribeOn() is unrestricted, but it can be called only once.
Some prying people (well, it was me again) may ask again: What if I want to call subscribeOn() multiple times? What will the result be?
Let's leave this question open and proceed to the thread control principle in RxJava.
2) Principle of scheduler (II)
In fact, the internal implementation of subscribeOn() and observeOn() also utilizes lift(). For details, see the diagram (different colors of the arrows indicate different threads):
A diagram of the subscribeOn() principle:


A diagram of the observeOn() principle:


From the diagrams, we can see that both subscribeOn() and observeOn() perform thread switching (as indicated in the “schedule...” section). But the difference is that the thread switching of subscribeOn() happens in the OnSubscribe process, that is, the event hasn't been dispatched yet when it notifies OnSubscribe to the upper level. Therefore, the thread control of subscribeOn() can exert influences from the beginning of the dispatched event; on the contrary, the thread switching of observeOn() happens in its built-in Subscriber, that is, when it is about to dispatch events to the subordinate Subscriber. Therefore, the observeOn() controls its subsequent threads.
At last, I want to use a diagram to explain how thread scheduling is performed when multiple subscribeOn() and observeOn() methods are used in combination (because of the many objects in the diagram, I simplified the structure in the diagram compared with the previous diagrams):
Comprehensive calls of thread controls


There are totally five operations on the event in the diagram. From the diagram, we can see that ① and ②, affected by the first subscribeOn() method, run in the red thread; ③ and ④, affected by the first observeOn() method, run in the green thread; ⑤, affected by the second onserveOn() method, runs in the purple thread; the second subscribeOn() method, because the thread is intercepted by the first subscribeOn() method during the notification process, exerts no impact to the entire procedure. By now the previous question has been answered: when multiple subscribeOn() methods are used, only the first subscribeOn() method is effective.
3) Extended: doOnSubscribe()
However, although more than one subscribeOn() method poses no impact to the event processing procedure, they can be utilized before the procedure.
In the previous section on Subscriber, I mentioned that the onStart() method of Subscriber can be used for initialization before the procedure starts. But since the onStart() method has been called before the subscribe() method, you cannot specify the thread, but can only be executed in the thread when the subscribe() method is called. As a result, if the onStart() method contains code with requirements on the thread (such as displaying a Progress Bar on the interface, which must be executed in the main thread), the risk of an illegal thread may arise. Because sometimes you cannot forecast which thread the subscribe() method will be executed in.
There is a method corresponding to the Subscriber.onStart() method, the Observable.doOnSubscribe(). Like Subscriber.onStart(), it is also executed after the subscribe() method is called and before the event is dispatched, but the difference is that is can specify the thread. By default, the doOnSubscribe() method is executed in the thread in the subscribe() method; if the doOnSubscribe() method is followed by the subscribeOn() method, it is executed in the thread specified in the subscribeOn() method that is the closest to it.
Sample code:
Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // Need to be executed in the main thread.
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // Specify the main thread.
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);


As shown above, the doOnSubscribe() method is followed by a subscribeOn() method, and the thread for the preparation work can be specified.
Guest