In this post I look into practical applications of a back pressure when building data intensive pipelines with RxJava. I explain what a hot source is and how to handle large data streams without overwhelming the system. As usual there are trade-offs to consider. Please read on if that sounds interesting to you.

Cold vs Hot Source

Cold sources, or rather value generators are demand-driven. An infinite stream is a good example: Nothing happens, unless the client explicitly asks for the next value.

Hot sources emit events autonomously and subscribers (observers) are forced to keep up with whatever the data rate is. This has a potential to cause performance issues and high resource consumption in general.

A source is conceptualised by an Observable: monitors data flows from sources and makes them accessible to subscribers.

A Flowable is an Observable with a back pressure mechanism (strategy). Whether to choose one or the other depends on how “bursty” your data source is. Let’s have a look at a few examples.

The code snippet below is an example of a cold source. Observable.range is lazy, the demand is driven by subscribers (pull approach) and thereof no back pressure needs to be applied.

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

Observable.range(0, 10_000)
  .observeOn(Schedulers.computation())
  .subscribe(System.out::println, Throwable::printStackTrace);

PublishProcessor, on the other hand, is considered a hot source. The code below is therefore prone to a MissingBackpressureException. I will look into resolving this issue in a minute.

import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import java.util.stream.IntStream;

PublishProcessor<Integer> source = PublishProcessor.create(); 

source
  .observeOn(Schedulers.computation())
  .subscribe(System.out::println, Throwable::printStackTrace); 

IntStream.range(0, 10_000).forEach(source::onNext);

Okay, so what actually is a back pressure and how to implement it?
Back pressure gives subscribers control over data flow. It’s an ability to slow down or throttle data intake. When it comes to implementation the most straightforward approach is buffering.

Buffering

To reduce the likelihood of MissingBackpressureException, data can be batched by size or by time. The code below adds size-based back pressure by slicing the incoming data flow into batches, a thousand of items each.

PublishProcessor<Integer> source = PublishProcessor.create();

source
  .buffer(1000)
  .observeOn(Schedulers.computation())
  .flatMap(x -> Flowable.fromIterable(x))
  .subscribe(System.out::println, Throwable::printStackTrace);

IntStream.range(0, 10_000).forEach(source::onNext);

Version 2 of RxJava introduces a Flowable – a reactive data flow handler with a default internal buffer of 128 items. The example below combines two data sources and uses a queue as a temporary data storage.

The first implementation is done using a plain Observable. It’s obvious there is no back pressure, since all of the items are eagerly queued up.

import java.util.concurrent.ConcurrentLinkedQueue;

final ConcurrentLinkedQueue<Integer> queue = 
  new ConcurrentLinkedQueue<>();

final Observable<Integer> o1 = 
  Observable.range(0, 10_000).doOnNext(queue::add);

final Observable<Integer> o2 = Observable.fromIterable(queue).doOnNext(queue::remove);

Observable
  .zip(
    o1, o2, 
    (x, y) -> 
      String.format("[%d, %d], backlog: %d",  x, y, queue.size()))
  .observeOn(Schedulers.computation())
  .subscribe(System.out::println, Throwable::printStackTrace);

Output:

[0, 0], backlog: 9999
[1, 1], backlog: 9998
[2, 2], backlog: 9997
[3, 3], backlog: 9996
[4, 4], backlog: 9995
[5, 5], backlog: 9994
[6, 6], backlog: 9993
[7, 7], backlog: 9992
...

A safer approach is to enforce buffering. As you can see the implementation is almost identical. A mere switch to a Flowable leverages the aforementioned internal buffer of 128 elements, which is visible from the output.

final ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();

final Flowable<Integer> f1 = 
  Flowable.range(0, 10_000).doOnNext(queue::add);

final Flowable<Integer> f2 = 
  Flowable.fromIterable(queue).doOnNext(queue::remove);

Flowable
  .zip(
    f1, f2, 
    (x, y) -> 
      String.format("%d vs %d, backlog: %d", x, y, queue.size()))
  .subscribe(System.out::println, Throwable::printStackTrace);

Output:

[0, 0], backlog: 127
[1, 1], backlog: 126
[2, 2], backlog: 125
[3, 3], backlog: 124
[4, 4], backlog: 123
[5, 5], backlog: 122
[6, 6], backlog: 121
[7, 7], backlog: 120
...

Skipping Values

Sampling is another great means of how to preserve resources. It’s a lossy operation reducing throughput by allowing only a certain number of items per a given period of time.

PublishProcessor<Integer> source = PublishProcessor.create();

source
  .sample(1, TimeUnit.MILLISECONDS)
  .observeOn(Schedulers.computation())
  .subscribe(System.out::println, Throwable::printStackTrace);

IntStream
  .range(0, 10_000)
  .forEach(source::onNext);

Yields  a somewhat trimmed output:

263
533
747
1211
1746
2097
2782
3655
4562
4612
5351
5354
6388
7054
9999

Another way of how to reduce data inflow is an application of BackpressureOverflowStrategy:

  • ERROR: A Default strategy, where an exception (BufferOverflowException) is thrown whenever the buffer fills up.
  • DROP_OLDEST: Adds the latest value onto the buffer at the cost of dropping the oldest value.
  • DROP_LATEST: Replaces the latest buffered value with a new one.

Here is an example of how to apply an explicit back pressure strategy.

import io.reactivex.BackpressureOverflowStrategy;

Flowable
  .range(0, 10_000)
  .onBackpressureBuffer(
    1000, 
    () -> {}, BackpressureOverflowStrategy.DROP_OLDEST)
  .observeOn(Schedulers.computation())
  .subscribe(System.out::println, Throwable::printStackTrace);

Inspecting the output reveals an expected data loss:

...
219
220
221
222
223
3206
3207
3215
...

Best Practices

The ultimate best approach always depends on the use case. The library provides tools for controlling the volume of the data flow, each approach has its own advantages and shortcomings. I hope the summary below helps you decide of what to do in your particular situation.

Observable vs Flowable

Observable imposes a lower overhead in comparison with Flowable, but presents a risk of running out of memory or a failure to handle an overwhelming data stream.

Observable is safe to use when there is a modest data load (thousands of items at most). This applies to capturing GUI interactions, such as mouse moves or touch events. In any case, should the amount of data grow beyond these limits consider the use of sampling.

Flowable comes with a built-in back pressure and covers all kinds of data intensive scenarios dealing with tens of thousands of events. Blocking I/O operations, such as reading from a file or pulling data from a database are good candidates for batched processing. Also, network streaming – whenever the protocol allows to set a threshold. Apply sampling or an appropriate back pressure strategy.

Back Pressure

Reactive Streams specification mandates operators supporting non-blocking back pressure. This is to guarantee that consumers won’t overflow when requesting data from hot sources. In ReactiveX, Flowable ensures proper handling of downstream data. While a back pressure is built in, OutOfMemory or MissingBackpressure exceptions can still occur. At the very least, there is a guarantee that in case of problems a call to onNext in the consumer won’t happen and an exception is signalled instead.

ReactiveX project’s wiki talks about back pressure concepts in detail.

Thanks for reading to the end. Let me know your feedback in the comments section below and please do share this post if you found it useful.

Categories: Java

Tomas Zezula

Hello! I'm a technology enthusiast with a knack for solving problems and a passion for making complex concepts accessible. My journey spans across software development, project management, and technical writing. I specialise in transforming rough sketches of ideas to fully launched products, all the while breaking down complex processes into understandable language. I believe a well-designed software development process is key to driving business growth. My focus as a leader and technical writer aims to bridge the tech-business divide, ensuring that intricate concepts are available and understandable to all. As a consultant, I'm eager to bring my versatile skills and extensive experience to help businesses navigate their software integration needs. Whether you're seeking bespoke software solutions, well-coordinated product launches, or easily digestible tech content, I'm here to make it happen. Ready to turn your vision into reality? Let's connect and explore the possibilities together.