Merging Rx Single Results

Today, I saw something incredible which basically looks like this.

public class Thing {

final ThingService thingService;

public Thing(final ThingService thingService) {
this.thingService = thingService;
}

public Single<List<String>> getOtherThings(
final List<String> keys
) {
return keys.stream()
.map(key -> getOtherThing(key))
.reduce(
Single.just(Collections.emptyList()),
(singleThing1, singleThing2) -> singleThing1
.flatMap(thing1 -> singleThing2
.map(thing2 -> {
final List<String> newList =
new LinkedList<>(thing1);
newList.addAll(thing2);
return newList;
}))
);
}
}

public class ThingService {

public Single<List<String>> getThing(final String keys) {
return Single.just(Collections.<String>emptyList());
}
}

The Thing class returns a single list of strings for a given list of keys, it all looks good in the beginning, but drilling down to the implementation, it does a bunch of wired stuff:

1) it pushes the list of keys into the service, which if an exception is thrown, the whole call will end up with nothing;

2) it implements a reduce in order to merge the result from the service call into an empty list wrapped in a Single, and it looks horrendous in my opinion;

3) reduce starts a different Single, and there is no guarantee it will wait for service to respond with a result before it collapses the lists, the very nature of Rx.

And essentially, it looks pretty unpolished and hard to understand. But it doesn't have to be so convoluted. On top of SingleRx provides a nice thing call Observable to merge a sequence of async requests. It looks like this.

public class Thing {
final ThingService thingService;

public Thing(final ThingService thingService) {
this.thingService = thingService;
}

public Single<List<String>> getOtherThings(
final List<String> keys
) {
return Observable.fromIterable(keys)
.flatMapSingle(key -> thingService.getOtherThing(key))
.toList();
}
}

public class ThingService {
public Single<String> getOtherThing(final String key) {
return Single.just("");
}
}

Observable.fromIterable provides an async wrapping for each individual value in the list of strings. By calling flatMapSingle over the result, it passes one value at a time to the service to get a Single result back, and merge them into Observable<List<String>. In order to return an aggregated result to the caller, it then calls toList() to turn the Observable into Single.

This approach not only simplifies the service implementation to deal with one thing at a time, but it's also capable of merging and returning the successful results at the end of the call. Most importantly, this is more idiomatic Rx.


Comments

Popular posts from this blog

How to: Add Watermark to PDFs Programmatically using iTextSharp

A practical guide to Scala Traits

A Short Guide to AWK