Skip to content

Commit eecb297

Browse files
committed
3.7 Added serialize and unsafeSubscribe
1 parent c662413 commit eecb297

File tree

1 file changed

+108
-0
lines changed

1 file changed

+108
-0
lines changed

Part 3 - Taming the sequence/7. Custom operators.md

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,114 @@ If you can't guarantee that your operator will obey the Rx contract, for example
269269

270270
![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/synchronize.png)
271271

272+
Let's first create an observable that breaks the contract and subscribe to it.
273+
274+
```java
275+
Observable<Integer> source = Observable.create(o -> {
276+
o.onNext(1);
277+
o.onNext(2);
278+
o.onCompleted();
279+
o.onNext(3);
280+
o.onCompleted();
281+
});
282+
283+
source.subscribe(
284+
System.out::println,
285+
System.out::println,
286+
() -> System.out.println("Completed"));
287+
```
288+
Output
289+
```
290+
1
291+
2
292+
Completed
293+
Unsubscribed
294+
```
295+
296+
Despite what our observable is set to emit, the end result obeyed the Rx contract. That happened because `subscribe` will temrinate the subscription when the sequence terminates (or was supposed to). It doesn't mean that the problem will be taken care for us. There is also a method called `unsafeSubscribe`, which won't unsubscribe automatically.
297+
298+
```java
299+
Observable<Integer> source = Observable.create(o -> {
300+
o.onNext(1);
301+
o.onNext(2);
302+
o.onCompleted();
303+
o.onNext(3);
304+
o.onCompleted();
305+
});
306+
307+
source.doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
308+
.unsafeSubscribe(new Subscriber<Integer>() {
309+
@Override
310+
public void onCompleted() {
311+
System.out.println("Completed");
312+
}
313+
314+
@Override
315+
public void onError(Throwable e) {
316+
System.out.println(e);
317+
}
318+
319+
@Override
320+
public void onNext(Integer t) {
321+
System.out.println(t);
322+
}
323+
});
324+
```
325+
Output
326+
```
327+
1
328+
2
329+
Completed
330+
3
331+
Completed
332+
```
333+
334+
Our subscriber's behaviour was identical to the previous example (we created an instance of `Subscriber` because `unsafeSubscribe` doesn't have overloads that take lambdas). We can see here we weren't unsubscribed and we kept receiving notifications.
335+
336+
`unsafeSubscribe` is unsafe in other regards as well, such as error handling. It's usefulness is limited. The documentation says that it should only be used for custom operators that use nested subscriptions. To protect such operators from receiving and illegal sequence, we can apply the `serialize` operator
337+
338+
```java
339+
Observable<Integer> source = Observable.create(o -> {
340+
o.onNext(1);
341+
o.onNext(2);
342+
o.onCompleted();
343+
o.onNext(3);
344+
o.onCompleted();
345+
})
346+
.cast(Integer.class)
347+
.serialize();;
348+
349+
350+
source.doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
351+
.unsafeSubscribe(new Subscriber<Integer>() {
352+
@Override
353+
public void onCompleted() {
354+
System.out.println("Completed");
355+
}
356+
357+
@Override
358+
public void onError(Throwable e) {
359+
System.out.println(e);
360+
}
361+
362+
@Override
363+
public void onNext(Integer t) {
364+
System.out.println(t);
365+
}
366+
});
367+
```
368+
Output
369+
```
370+
1
371+
2
372+
Completed
373+
```
374+
375+
We here that, despite the fact that we did not unsubscribe, the illegal notifications were filtered out.
376+
377+
378+
379+
272380
### Extra benefits of lift
273381

274382
If the ability to use your custom operator in the chain like a standard operator is not convincing enough, using `lift` has one more unexpected advantage. Standard operators are also implemented using `lift`, which makes `lift` a hot method at runtime. JVM optimises for `lift` and operators that use `lift` receive a performance boost. That can include your operator, if you use `lift`.

0 commit comments

Comments
 (0)