Skip to content

Commit 96dc36d

Browse files
committed
3.5 Reworked chapter
1 parent d390bae commit 96dc36d

File tree

1 file changed

+24
-23
lines changed

1 file changed

+24
-23
lines changed

Part 3 - Taming the sequence/5. Time-shifted sequences.md

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
# Time-shifted sequences
22

3-
One of the key features in Rx is that you don't know when items will be emitted. Some observables will emit everything immediately (e.g. `range`), some on regular interval, and some may be irregular. For example, mouse events and UDP packets simply arrive when they arrive. We need tools to decide what to do with those events, not only based on what they are, but also based on when they arrived.
3+
One of the key features in Rx is that you don't know when items will be emitted. Some observables will emit everything immediately and synchronously(e.g. `range`), some emit on regular intervals, and some are hard or even impossible to predict. For example, mouse events and UDP packets simply arrive when they arrive. We need tools to decide what to do with those events, not only based on what they are, but also based on when they arrived and at what frequency.
44

55
## Buffer
66

7-
`buffer` allows you to bulk values together and get them in bulks, rather than one at a time. The are several different ways of buffering values.
7+
`buffer` allows you to collect values and get them in bulks, rather than one at a time. The are several different ways of buffering values.
88

99
### Complete, non-overlapping buffering
1010

11-
First we will examine variant of buffer where every value is buffered exactly once, with no losses and no duplicates.
11+
First we will examine variants of buffer where every value is buffered exactly once, with no losses and no duplicates.
1212

1313
#### buffer by count
1414

@@ -30,7 +30,7 @@ Output
3030

3131
#### buffer by time
3232

33-
The next overload allows to buffer based on time. Time is divided into even pieces. Values are collected for the given timespan and when the time is up, a new collection starts.
33+
The next overload allows you to buffer based on time. Time is divided into windows of equal length. Values are collected for the each window and at the end of each window the buffer is emited.
3434

3535
![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer5.png)
3636

@@ -50,7 +50,7 @@ Output
5050
[9]
5151
```
5252

53-
The size of a collection here depends on how many values were emitted in that timespan and not on a desired size. The collection can even be empty.
53+
The size of a collection here depends on how many values were emitted in that timespan and not on a desired size. The collection can even be empty, if there where no events during the window.
5454

5555
#### buffer by count and time
5656

@@ -79,7 +79,7 @@ We see a lot of empty lists here. This is because the buffer is emitted both whe
7979

8080
#### buffer with signal
8181

82-
Instead of fixed points in time, you can also signal to flush a buffer with an observable. Every time your signal emits a value, the values buffered so far will be emitted. Typically, your signal will communicate that the system has just finished processing the previous batch and is ready for the next.
82+
Instead of fixed points in time, you can also signal `buffer` with an observable to flush. Every time the signal emits onNext, the values in the buffer will be emitted will be emitted. Buffering with a signal can be very useful if you want to buffer values until the moment that you are ready for them.
8383

8484
![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer8.png)
8585

@@ -91,15 +91,15 @@ Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
9191
.subscribe(System.out::println);
9292
```
9393

94-
There is a variant for the above way, where you provide the signaling observable through a function: `.buffer(() -> Observable.interval(250, TimeUnit.MILLISECONDS))`. The difference here is that the function that creates the observable is executed when a subscription happens. You can use to start your signal when the subscription starts.
94+
There is a variant for the way above, where you provide the signaling observable through a function: `.buffer(() -> Observable.interval(250, TimeUnit.MILLISECONDS))`. The difference here is that the function that creates the observable is executed when a subscription happens. You can use to start your signal when the subscription starts.
9595

9696
### Overlapping buffers
9797

9898
Every method for buffering that we've seen has an alternative that allows buffers to overloap or to leave out values.
9999

100100
#### buffer by count
101101

102-
When buffering based on the desired buffer size, you can also defined how many elements apart the beginings of the buffers are.
102+
When buffering based on the desired buffer size, you can also declare how far apart the beginings of each buffer are.
103103

104104
![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer4.png)
105105

@@ -127,14 +127,14 @@ As we can see, a new buffer starts every 3 elements, and that buffer contains th
127127

128128
#### buffer by time
129129

130-
We can do a very similar thing for the variant that buffers based on a timespan. You decide how frequently to open new buffers and how long they should last.
130+
We can do a very similar thing for the variant where buffering is based on a timespan. You decide how frequently to open new buffers and how long they should last.
131131
![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer7.png)
132132
Once again, this allows you either to make your buffers overlap or leave out elements.
133133
* When `timespan` > `timeshift`, the buffers overlap
134134
* When `timespan` < `timeshift`, elements are left out
135135
* The case of `timespan` = `timeshift` is equivalent to the simpler case we saw in the previous subchapter.
136136

137-
In the next example we will create a new buffer every 200ms an have each last 350ms. That means that buffer overlap by 150ms.
137+
In the next example we will create a new buffer every 200ms and have it collect for 350ms. That means that buffers overlap by 150ms.
138138

139139
```java
140140
Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
@@ -161,7 +161,7 @@ public final <TOpening,TClosing> Observable<java.util.List<T>> buffer(
161161
```
162162
![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer2.png)
163163

164-
This function takes two arguments. The first argument is an observable. Every time that observable emits a value, a new buffer begins. Along with opening a new buffer, the value that it emitted is passed to the second argument, which is a function. That function uses that value to create a new observable, which will signal the end of the corresponding buffer when it emits its first value.
164+
This function takes two arguments. The first argument, `bufferOpenings`, is an observable. Every time this observable emits a value, a new buffer begins. Along with opening a new buffer, the value which it emitted is passed to the `bufferClosingSelector`, which is a function. This function uses the value to create a new observable, which will signal the end of the corresponding buffer when it emits its first onNext event.
165165

166166
Let's see this in code:
167167
```java
@@ -179,7 +179,7 @@ Output
179179
[9]
180180
```
181181

182-
We've created an `Observable.interval` that signals the opening of a new buffer every 250ms. Because observables created with `interval` do not immediately emit a value, the first values were lost. For the closing of a buffer, we provided a lambda function that took every value emitted by our buffer-opening observable. The values generated by `interval` are the natural progression 1,2,3... but that doesn't matter, because we discarded that value. Such an example would be too complicated. Instead, we just created an observable that waits 200ms and then emits a single value. That means that each buffer last exactly 200ms, similarily to when buffering by time.
182+
We've created an `Observable.interval`, which signals the opening of a new buffer every 250ms. Because observables created with `interval` do not immediately emit a value, and first buffer actually starts at 250ms and the values before were lost. For the closing of a buffer, we provided a lambda function that took every value emitted by `bufferOpenings`. The values generated by `interval` are the natural progression 0,1,2,3... but we don't actually use the value, because such an example would be too complicated. Instead, we just created an observable that waits 200ms and then emits a single value. That means that each buffer last exactly 200ms, similarily to buffering by time.
183183

184184
### takeLastBuffer
185185

@@ -205,7 +205,7 @@ Output
205205

206206
![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeLastBuffer.tn.png)
207207

208-
`takeLastBuffer` by time will emit, as a buffer, the items that were received during the specified timespan. The timespan has the specified length and ends at the end of the source sequence.
208+
`takeLastBuffer` by time will emit, as a buffer, the items that were received during the specified timespan, which is measure from the end of the source sequence.
209209

210210
```java
211211
Observable.interval(100, TimeUnit.MILLISECONDS)
@@ -242,7 +242,7 @@ As we saw in the previous example, the last 200ms include three values. With `.t
242242

243243
### delay
244244

245-
The simplest overload of `delay` will delay every item by the same amount of time. You can think of that as delaying the beginning of the sequence, while maintaining the time between successive elements.
245+
The simplest overload of `delay` will delay every item by the same amount of time. You can think of it as delaying the beginning of the sequence, while maintaining the time intervals between successive elements.
246246

247247
![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.png)
248248

@@ -267,7 +267,7 @@ We created 5 values spaced 100ms apart and then we delayed the sequence by 1s. W
267267

268268
You can also delay each value individually.
269269
![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.o.png)
270-
This overload takes a function which will return an observable for each item. When that observable emits, the corresponding item is also emitted. Here's some code:
270+
This overload takes a function which will create an observable for each item. When that observable emits onNext, the corresponding item is emitted in the delayed sequence. Here's some code:
271271

272272
```java
273273
Observable.interval(100, TimeUnit.MILLISECONDS).take(5)
@@ -288,7 +288,7 @@ The initial sequence is spaced 100ms apart, while the resulting is 200ms. If you
288288

289289
### delaySubscription
290290

291-
Rather than storing values and emitting them later, you can delay the subscription altogether. This will have a different effect between hot and cold observables and you will understand more about that in the [Hot and cold observables](/Part 3 - Taming the sequence/6. Hot and Cold observables.md) chapter. For our examples so far, subscription is when the source observable is created (i.e. it is created on demand). What that means is that there is no difference in emission times between delaying each item by the same amount and delaying the subscription. Since that is the case here, delaying the subscription is more efficient, since the operator doesn't need to buffer items internally.
291+
Rather than storing values and emitting them later, you can delay the subscription altogether. This will have a different effect depending on if the observable is hot or cold. This will be discussed more in the [Hot and cold observables](/Part 3 - Taming the sequence/6. Hot and Cold observables.md) chapter. For our examples so far, the observables are cold and subscription event is when the source observable is created (i.e. the begining of the sequence). What that means is that there is no difference in the sequences between delaying each item by the same amount and delaying the subscription. Since that is the case here, delaying the subscription is more efficient, since the operator doesn't need to buffer items internally.
292292

293293
Let's see code for the different overloads for delaying a subscription
294294
```java
@@ -336,7 +336,7 @@ This combines two delay variants we've already seen into one. The first argument
336336

337337
## Sample
338338

339-
`sample` allows you to thin-out your sequences by dividing them into time windows. When each window ends, the last value within that window (if any) is emitted.
339+
`sample` allows you to thin-out a sequence by dividing it into time windows and taking only one value out of each window. When each window ends, the last value within that window (if any) is emitted.
340340

341341
![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.png)
342342

@@ -366,7 +366,7 @@ Observable.interval(150, TimeUnit.MILLISECONDS)
366366

367367
## Throttling
368368

369-
When the producer emits more values than we want and we don't need every sequential value, we can thin out the sequence by throttling it.
369+
Throttling is also intended for thining out a sequence. When the producer emits more values than we want and we don't need every sequential value, we can thin out the sequence by throttling it.
370370

371371
### throttleFirst
372372

@@ -442,7 +442,7 @@ TimeInterval [intervalInMilliseconds=99, value=8]
442442
TimeInterval [intervalInMilliseconds=101, value=9]
443443
```
444444

445-
As we can see here, our observable will emit 4 values in quick succession, then 3 values in greater intervals and finally 3 values in quick succession. The `scan` only serves to turn the values into the natural sequence, than a 3 repetitions of 1,2,3. The reason the first two emissions are simultaneous is that that `scan` emits the initial value and the first value together.
445+
As we can see here, our observable will emit 4 values in quick succession, then 3 values in greater intervals and finally 3 values in quick succession. The `scan` only serves to turn the values into the natural sequence, rather than 3 repetitions of 1,2,3. The reason the first two emissions are simultaneous is that that `scan` emits the initial value and the first value together.
446446

447447
Now that we understand our source observable, let's `debounce` it:
448448

@@ -468,7 +468,8 @@ We debounced with a window of 150ms. The bursts of emissions in our observable w
468468

469469
There is a `throttleWithTimeout` operator which has the same behaviour as the `debounce` operator that we just saw. One is practically an alias of the other, even though neither is officially declared as such in the documentation.
470470

471-
You can also debounce based on a per item basis. In this case, you provide a function that calculates for each item how long the window should be after it. You signal that the window is over though an observable. When the observable terminates, the window expires.
471+
You can also debounce based on a per item basis. In this case, you provide a function that calculates for each item how long the window should be after it. You signal that the window is using a new observable for each item
472+
. When the observable terminates, the window expires.
472473

473474
![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/debounce.f.png)
474475

@@ -545,7 +546,7 @@ java.util.concurrent.TimeoutException
545546

546547
The output mirrors the source observable for as long as values come more frequently than 200ms. As soon as a value takes more than that to arrive, an error is pushed.
547548

548-
Instead of failing, you can provide a fallback observable. When a timeout occures, the resulting observable will switch to the fallback. The original observable will be ignored from then on, even if it were to come back.
549+
Instead of failing, you can provide a fallback observable. When a timeout occures, the resulting observable will switch to the fallback. The original observable will be ignored from then on, even if it resumes.
549550

550551
![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout.2.png)
551552

@@ -570,11 +571,11 @@ Output
570571
-1
571572
```
572573

573-
You can also specify the timeout window per item. In that case, you provide a function that creates an observable out of each value. When that observable terminates, that signals the end of the window. If no values had been emitted until that, that triggers the timeout.
574+
You can also specify the timeout window per item. In that case, you provide a function that creates an observable for each value. When the observable terminates, that is the signal for the timeout. If no values had been emitted until that, that triggers the timeout.
574575

575576
![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout5.png)
576577

577-
Here is the previous examples, implemented using this overload.
578+
Here is the previous example, implemented using this overload.
578579

579580
```java
580581
Observable.concat(

0 commit comments

Comments
 (0)