You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: Part 4 - Concurrency/4. Backpressure.md
+12-12Lines changed: 12 additions & 12 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -1,6 +1,6 @@
1
1
# Backpressure
2
2
3
-
Rx leads events from the end of a pipeline to another other. The actions that take place on each end can be very dissimilar. What happens when the producer and the consumer require different amounts of time to process a value? In a synchronous model this question isn't an issue. Consider the following example:
3
+
Rx leads events from one end of a pipeline to the other. The actions which take place on each end can be very dissimilar. What happens when the producer and the consumer require different amounts of time to process a value? In a synchronous model, this question isn't an issue. Consider the following example:
4
4
5
5
```java
6
6
// Produce
@@ -79,13 +79,13 @@ Output
79
79
80
80
The are similar operators that can serve the same purpose.
81
81
* The [throttle](/Part 3 - Taming the sequence/5. Time-shifted sequences.md#throttling) family of operators also filters on rate, but allows you to speficy in a diffent way which element to let through when stressed.
82
-
*[Debounce](/Part 3 - Taming the sequence/5. Time-shifted sequences.md#debouncing) does not cut the rate to a fixed maximum. Instead, it will completely remove any burst of information and replace it with a single value.
82
+
*[Debounce](/Part 3 - Taming the sequence/5. Time-shifted sequences.md#debouncing) does not cut the rate to a fixed maximum. Instead, it will completely remove every burst of information and replace it with a single value.
83
83
84
84
#### Collect
85
85
86
-
Instead of sampling the data, you can use `buffer` and `window` to collect overflowing data while the consumer is busy. This is useful if processing items in batches is faster. Alternatively, you can decide manually how many and which of the buffered items to process.
86
+
Instead of sampling the data, you can use `buffer` and `window` to collect overflowing data while the consumer is busy. This is useful if processing items in batches is faster. Alternatively, you can inspect the buffer to manually decide how many and which of the buffered items to process.
87
87
88
-
The example that we saw previouslyprocesses multiple items with the same speed that it processes bulks. Here we slowed down the producer to make the batches fit a line, but the principle remains the same.
88
+
In the example that we saw previously, the consumer processes single items and bulks at practically the same speed. Here we slowed down the producer to make the batches fit a line, but the principle remains the same.
89
89
90
90
```java
91
91
Observable.interval(10, TimeUnit.MILLISECONDS)
@@ -143,7 +143,7 @@ class MySubscriber extends Subscriber<T> {
143
143
}
144
144
```
145
145
146
-
The `request(1)` in `onStart` establishes backpressure and that the observable should only emit the first value. After processing it in `onNext`, we request the next item to be sent, if and when it is available. Calling `request(Long.MAX_VALUE)`would disable backpressure.
146
+
The `request(1)` in `onStart` establishes backpressure and informs the observable that it should only emit the first value. After processing the value in `onNext`, we request the next item to be sent, if and when it is available. Calling `request(Long.MAX_VALUE)`disables backpressure.
147
147
148
148
### doOnRequested
149
149
@@ -153,7 +153,7 @@ public final Observable<T> doOnRequest(Action1<java.lang.Long> onRequest)
153
153
```
154
154
The `doOnRequested` meta-event happens when a subscriber requests for more items. The value supplied to the action is the number of items requested.
155
155
156
-
At this moment, `doOnRequest` is in beta. It is the only beta operator that we will discuss in this book. We're making an exception, because it enables us to peek into stable backpressure functionality that is otherwise hidden. Let's see what happens in the most simple observable
156
+
At this moment, `doOnRequest` is in beta. In this book, we have been avoiding beta operators. We're making an exception here, because it enables us to peek into stable backpressure functionality that is otherwise hidden. Let's see what happens in a simple observable:
157
157
158
158
```java
159
159
Observable.range(0, 3)
@@ -168,7 +168,7 @@ Requested 9223372036854775807
168
168
2
169
169
```
170
170
171
-
We see that `subscribe` requests the maximum number of items from the beginning. That means that `subscribe` doesn't resist values at all. Subscribe will only use backpressure if we provide a subscriber that implements backpressure. Here is a complete example for such an implementation
171
+
We see that `subscribe` requests the maximum number of items from the beginning. This means that `subscribe` doesn't resist values at all. Subscribe will only use backpressure if we provide a subscriber that implements backpressure. Here is a complete example for such an implementation
First we requested no emissions. Then we requested 2 and we got 2 values.
248
+
First we requested no emissions (our `ControlledPullSubscriber` does this in `onStart`). Then we requested 2 and we got 2 values, then we requested 1 and we got 1.
249
249
250
-
Rx operators that use queues and buffers internally should use backpressure to avoid storing an infinite amount of values. Large-scale buffering should be left to operators that explicitly serve this purpose, such as `cache`, `buffer` etc. `zip` is one operator that needs to buffer items: the first observable might emit two or more values before the second observable emits its next value. Such small asymmetries are expected and they shouldn't cause the operator to fail. For that reason, `zip` has a small buffer of 128 items.
250
+
Rx operators that use queues and buffers internally should use backpressure to avoid storing an infinite amount of values. Large-scale buffering should be left to operators that explicitly serve this purpose, such as `cache`, `buffer` etc. An example of an operator that needs to buffer items is `zip`: the first observable might emit two or more values before the second observable emits its next value. Such small asymmetries are expected even when the two sequences are supposed to have the same frequency. Needing to buffer a couple of items shouldn't cause the operator to fail. For that reason, `zip` has a small buffer of 128 items.
251
251
252
252
```java
253
253
Observable.range(0, 300)
@@ -266,12 +266,12 @@ Requested 90
266
266
Requested 90
267
267
```
268
268
269
-
The `zip` operator starts by requesting enough items to fill its buffer, and requests more when it has consumed enough. The details of how many items `zip` requests isn't interesting. What the reader should take away is the realisation that some buffering and backpressure exist in Rx whether the developer requests for it or not. This gives an Rx pipeline some flexibility where you might expect none. This might trick you into thinking that your code is solid, by silently saving small tests from failing, but you're not safe until you have explicitly declared behaviour with regard to backpressure.
269
+
The `zip` operator starts by requesting enough items to fill its buffer, and requests more when it consumes them. The details of how many items `zip` requests isn't interesting. What the reader should take away is the realisation that some buffering and backpressure exist in Rx whether the developer requests for it or not. This gives an Rx pipeline some flexibility, where you might expect none. It might trick you into thinking that your code is solid, by silently saving small tests from failing, but you're not safe until you have explicitly declared behaviour with regard to backpressure.
270
270
271
271
272
272
## Backpressure policies
273
273
274
-
Many Rx operators use backpressure internally to avoid overfilling their internal queues. This way, the problem of a slow consumer is propagated backwards in the chain of operators. Backpressure doesn't make the problem go away. It merely moves it where it may be handled better. We still need to decide what to do with the values of an overproducing observable.
274
+
Many Rx operators use backpressure internally to avoid overfilling their internal queues. This way, the problem of a slow consumer is propagated backwards in the chain of operators: if an operator stops accepting values, then the previous operator will fill its buffers until it stops accepting values too, and so on. Backpressure doesn't make the problem go away. It merely moves it where it may be handled better. We still need to decide what to do with the values of an overproducing observable.
275
275
276
276
There are Rx operators that declare how you want to deal with situations where a subscriber cannot accept the values that are being emitted.
277
277
@@ -348,7 +348,7 @@ Output
348
348
...
349
349
```
350
350
351
-
What we see here is that the first 128 items where consumed normally, but then we jumped forward. The items inbetween were dropped by `onBackPressureDrop`. Even though we did not request it, the first 128 items where still buffered. Rx employs small buffers even when we don't request it.
351
+
What we see here is that the first 128 items where consumed normally, but then we jumped forward. The items inbetween were dropped by `onBackPressureDrop`. Even though we did not request it, the first 128 items where still buffered, since `observeOn` uses a small buffer between switching threads.
0 commit comments