You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: Part 4 - Concurrency/1. Scheduling and threading.md
+27-17Lines changed: 27 additions & 17 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -2,7 +2,7 @@
2
2
3
3
Because Rx is targeted at asynchronous systems and because Rx can naturally support multithreading, new Rx developers sometimes assume that Rx is multithreaded by default. It is important clarify before anything else that __Rx is single-threaded by default__.
4
4
5
-
_Unless you specify otherwise_, every call to `onNext`/`onError`/`onCompleted` executes the entire chain of operators synchronously, including the actions of the final subscriber. We can see than in the following examples:
5
+
_Unless you specify otherwise_, every call to `onNext`/`onError`/`onCompleted` executes the entire chain of operators synchronously, including the actions of the final subscriber. We can see than in the following example:
@@ -48,7 +48,7 @@ In Rx you don't juggle threads directly. Instead you wrap them in policies calle
48
48
49
49
### subscribeOn
50
50
51
-
With `subscribeOn` you decide on what thread the `Observable.create` is executed. Even if you're not calling `create` yourself, there is an internal equivalent to it. Consider the following example:
51
+
With `subscribeOn` you decide on what Scheduler the `Observable.create` is executed. Even if you're not calling `create` yourself, there is an internal equivalent to it. Consider the following example:
We see here that, non only is everything executed on the same thread, it is actually sequential: `subscribe` does not unblock until it has completed creating (and subscribing to) its observable, which means executing all of `create`'s lambda. The calls to `onNext` within that lambda execute the entire chain of operators, all the way to the `println`. Effectively, `create` is blocking.
78
+
We see here that, not only is everything executed on the same thread, it is actually sequential: `subscribe` does not unblock until it has completed subscribing to (and thus creating) the observable, which includes executing the body of `create`'s lambda parameter. The calls to `onNext` within that lambda execute the entire chain of operators, all the way to the `println`. Effectively, subscribing on a `create`d observable is blocking.
79
79
80
80
If you uncomment `.subscribeOn(Schedulers.newThread())`, the output now is
81
81
```
@@ -88,7 +88,7 @@ Received 2 on 11
88
88
89
89
`Schedulers.newThread()` provided a new thread for our lambda function to run on. `subscribe` no longer blocks until `create`'s lambda is executed and the main thread is free to proceed.
90
90
91
-
Some observables create their own threads regardless of you what you requested. For example, `Observable.interval` is asynchronous regardless. In such cases, `subscribeOn` will dictate on what thread to run the function which creates the resources, which typically won't be helpful. It gives you no control over what resources the source of your observable requires.
91
+
Some observables create their own threads regardless of you what you requested. For example, `Observable.interval` is asynchronous regardless. In such cases, `subscribeOn` will dictate on what thread to run the function which creates the resources, which typically won't be helpful. It gives you no control over what resources will be leased.
Unlike `subscribeOn`, `observeOn`'s control doesn't jump to the end of the pipeline. It just changes the thread for the operators that come after it. You can think of it as intercepting events and changing their thread for the rest of the chain. Here's an example for this:
134
+
Unlike `subscribeOn`, `observeOn`'s effect doesn't jump to the start of the pipeline. It just changes the thread for the operators that come after it. You can think of it as intercepting events and changing the thread for the rest of the chain. Here's an example for this:
135
135
136
136
```java
137
137
Observable.create(o -> {
@@ -156,13 +156,13 @@ After 1 on 13
156
156
After 2 on 13
157
157
```
158
158
159
-
We can see here that events start on thread that calls `onNext` and stay on that thread until they encounter the `observeOn` operator. After that, they continue on the new thread. This way you can assign different threading policies to different parts of an Rx pipeline.
159
+
We can see here that events start on the thread that calls `onNext` and stay on that thread until they encounter the `observeOn` operator. After that, they continue on the new thread. This way you can assign different threading policies to different parts of an Rx pipeline.
160
160
161
-
This is very useful if you, as the consumer of an observable, know that processing is time-consuming and you don't want to block the producing thread. A typical case for this is applications with a GUI. Handlers to GUI events are invoked on the GUI thread by default. In order not to have your application freeze for the duration of the processing, you can take the processing to another thread. You can use `observeOn` again to return to the GUI thread, when the data is ready to be displayed.
161
+
This is very useful if you, as the consumer of an observable, know that processing is time-consuming and you don't want to block the producing thread. A typical case for this is applications with a GUI. GUI libraries have a special thread that has the exclusive right to access GUI components (buttons etc). While the GUI thread is busy, everything becomes non-responsive. Handlers to GUI events are invoked on the GUI thread and everything the handler does will block the GUI thread. In order not to have your application freeze for the duration of the processing, you must move heavy processing to another thread. You can use `observeOn` to move away and again to return to the GUI thread, when the data is ready to be displayed.
162
162
163
163
### unsubscribeOn
164
164
165
-
As we have seen, some observables depend on resources which are leased on subscription and released when subscriptions end. Typically, releasing resources is cheap. In exceptional cases, where you need the unsubscription actions to not block, you can specify the scheduler that will execute those actions with `unsubscribeOn`
165
+
As we have seen, some observables depend on resources which are leased on subscription and released when subscriptions end. Typically, releasing resources is cheap. In exceptional cases, where you need the unsubscription actions to not block or to specifically take place on a special thread, you can specify the scheduler that will execute those actions with `unsubscribeOn`
166
166
167
167
```java
168
168
Observable<Object> source =Observable.using(
@@ -197,7 +197,7 @@ The `using` method executes 3 functions, one that leases a resource, one that us
197
197
198
198
## Schedulers
199
199
200
-
The `observeOn` and `subscribeOn` methods take as an argument a [Scheduler](http://reactivex.io/RxJava/javadoc/rx/Scheduler.html). A `Scheduler`, as the name suggests, is a tool that can schedule individual actions to be performed. The specifics of how the action will be invoked depends on the implementation of the scheduler used. You can create your own implementation of scheduler, but Rx already has you covered with a set of Schedulers for the common cases. You can get the existing implementations from the factory methods on [Schedulers](http://reactivex.io/RxJava/javadoc/rx/schedulers/Schedulers.html).
200
+
The `observeOn` and `subscribeOn` methods take as an argument a [Scheduler](http://reactivex.io/RxJava/javadoc/rx/Scheduler.html). A `Scheduler`, as the name suggests, is a tool that can schedule individual actions to be performed. The specifics of how the action will be invoked depends on the implementation of the scheduler used. You can create your own implementation of scheduler, but most of time you'll find that RxJava already has you covered with a set of Schedulers for the common cases. You can get the existing implementations from the factory methods on [Schedulers](http://reactivex.io/RxJava/javadoc/rx/schedulers/Schedulers.html).
201
201
202
202
The existing schedulers are as follows
203
203
*`immediate` executes the scheduled action synchronously. No actual scheduling takes place.
@@ -209,14 +209,14 @@ The existing schedulers are as follows
209
209
210
210
In the current implementation, `computation` and `io` schedulers aren't actually unique implementations. The point of this separation is to have unique instances, while also documenting your intent.
211
211
212
-
Most of the Rx operators use schedulers internally. If you revisit all the operators on the [Observable](http://reactivex.io/RxJava/javadoc/rx/Observable.html) that you've seen so far, you'll see that most of them have overloads that take a `Scheduler`. You can dictate exactly what scheduler each operator uses. You can also see which scheduler they use when you don't provide one.
212
+
Many of the Rx operators use schedulers internally. If you revisit the [Observable](http://reactivex.io/RxJava/javadoc/rx/Observable.html)operators that you've seen so far, you'll see that all the asynchronous operators have overloads that take a `Scheduler`. You can dictate exactly what scheduler each operator uses. You can also find in the documentation which scheduler they use when you don't provide one.
213
213
214
214
215
215
## Advanced features of schedulers
216
216
217
-
The approaches and implementations for scheduling in Rx aren't a special innovation. In fact, they are quite standard and could be used without any Rx code. An Rx developer doesn't need to implement new schedulers or even use them in their raw form. However, since the original www.introtorx.com when at length about schedulers, we are also going to present scheduling in RxJava, albeit more briefly.
217
+
The approaches and implementations for scheduling used in Rx schedulers aren't specific to Rx. In fact, they are quite standard and could be used without any Rx code. You typically won't have to use schedulers directly, except for when you are designing a custom asynchronous operator. Using schedulers in custom operators isn't only convenient, but it also allow asynchronous operators to become testable, as we will see in the next chapter.
218
218
219
-
An implementation of `Scheduler` has two parts. One is the notion of time, which you can get through the `now()` method. Implementing time through the scheduler is going to prove useful when testing Rx, but for now this feature isn't interesting.
219
+
An implementation of `Scheduler` has two parts. One is the notion of time, which you can get through the `now()` method. Implementing time through the scheduler is going to prove useful when virtualising time for testing, but for now this feature isn't interesting.
220
220
221
221
The interesting part is `createWorker()`, which returns a [Scheduler.Worker](http://reactivex.io/RxJava/javadoc/rx/Scheduler.Worker.html). A worker accepts actions and executes them sequentially on a single thread. In a way, a worker is a scheduler itself, but we will not refer to it as a scheduler to avoid confusion.
222
222
@@ -234,8 +234,15 @@ The action is then queued to be executed on the thread that the worker is assign
234
234
235
235
As you would expect from a scheduler, you can also schedule actions to be executed in the future once or repeatedly with the following methods:
236
236
```java
237
-
Subscription schedule(Action0 action, long delayTime, java.util.concurrent.TimeUnit unit)
238
-
Subscription schedulePeriodically(Action0 action, long initialDelay, long period, java.util.concurrent.TimeUnit unit)
237
+
Subscription schedule(
238
+
Action0 action,
239
+
long delayTime,
240
+
java.util.concurrent.TimeUnit unit)
241
+
Subscription schedulePeriodically(
242
+
Action0 action,
243
+
long initialDelay,
244
+
long period,
245
+
java.util.concurrent.TimeUnit unit)
239
246
```
240
247
241
248
```java
@@ -255,7 +262,7 @@ Output
255
262
5035
256
263
```
257
264
258
-
We can see here that delay for the execution is measured from the moment of scheduling. It is not a mandatory sleep period. The worker can decide to do work in the meantime or wait until it's time for the next job.
265
+
We can see here that delay for the execution is measured from the moment of scheduling. The specified time is not a mandatory sleep period in between tasks. The worker can do work in the meantime, if there is work ready for execution.
259
266
260
267
### Canceling work
261
268
@@ -302,6 +309,8 @@ Output
302
309
Action interrupted
303
310
```
304
311
312
+
As we saw earlier in the signature of `schedule`, scheduling returns a `Subscription`. Rather that canceling all work, you can cancel individual tasks via that `Subscription` that was created while scheduling.
313
+
305
314
## Existing schedulers
306
315
307
316
### ImmediateScheduler
@@ -326,7 +335,7 @@ End
326
335
327
336
### TrampolineScheduler
328
337
329
-
The `TrampolineScheduler`'s worker is also synchronous but does not nest tasks. Instead, it begins with the initial task and any tasks scheduled in the meantime (such as task scheduled by the initial task itself) are executed after that task has completed.
338
+
The `TrampolineScheduler`'s worker is also synchronous but does not nest tasks. Instead, it begins with the initial task and any tasks scheduled while executing will be queued for after the current task has completed.
330
339
331
340
```
332
341
Scheduler scheduler = Schedulers.trampoline();
@@ -344,7 +353,7 @@ End
344
353
Inner
345
354
```
346
355
347
-
The `TrampolineScheduler`'s worker runs every task on the thread that scheduled the first task. In this implementation, the first call to `schedule` is blocking until the queue is empty. And calls to `schedule` while the queue isn't empty is non-blocking and the task will be executed by the thread is blocked.
356
+
The `TrampolineScheduler`'s worker executes every task on the thread that scheduled the first task. In this implementation, the first call to `schedule` is blocking until the queue is emptied. Any calls to `schedule` while executing will be non-blocking and the task will be executed by the thread is blocked.
348
357
349
358
### NewThreadScheduler
350
359
@@ -356,6 +365,7 @@ public static void printThread(String message) {
356
365
System.out.println(message +" on "+Thread.currentThread().getId());
357
366
}
358
367
```
368
+
Now we schedule work on a `NewThreadScheduler` worker and demonstrate that the worker is bound to a specific thread.
0 commit comments