11. RxJava with Spring MVC

Spring Cloud Netflix includes RxJava.

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

Spring Cloud Netflix provides support for returning rx.Single objects from Spring MVC Controllers. It also supports using rx.Observable objects for Server-sent events (SSE). This can be very convenient if your internal APIs are already built using RxJava (see Section 7.4, “Feign Hystrix Support” for examples).

Here are some examples of using rx.Single:

@RequestMapping(method = RequestMethod.GET, value = "/single")
public Single<String> single() {
	return Single.just("single value");
}

@RequestMapping(method = RequestMethod.GET, value = "/singleWithResponse")
public ResponseEntity<Single<String>> singleWithResponse() {
	return new ResponseEntity<>(Single.just("single value"),
			HttpStatus.NOT_FOUND);
}

@RequestMapping(method = RequestMethod.GET, value = "/singleCreatedWithResponse")
public Single<ResponseEntity<String>> singleOuterWithResponse() {
	return Single.just(new ResponseEntity<>("single value", HttpStatus.CREATED));
}

@RequestMapping(method = RequestMethod.GET, value = "/throw")
public Single<Object> error() {
	return Single.error(new RuntimeException("Unexpected"));
}

If you have an Observable, rather than a single, you can use .toSingle() or .toList().toSingle(). Here are some examples:

@RequestMapping(method = RequestMethod.GET, value = "/single")
public Single<String> single() {
	return Observable.just("single value").toSingle();
}

@RequestMapping(method = RequestMethod.GET, value = "/multiple")
public Single<List<String>> multiple() {
	return Observable.just("multiple", "values").toList().toSingle();
}

@RequestMapping(method = RequestMethod.GET, value = "/responseWithObservable")
public ResponseEntity<Single<String>> responseWithObservable() {

	Observable<String> observable = Observable.just("single value");
	HttpHeaders headers = new HttpHeaders();
	headers.setContentType(APPLICATION_JSON_UTF8);
	return new ResponseEntity<>(observable.toSingle(), headers, HttpStatus.CREATED);
}

@RequestMapping(method = RequestMethod.GET, value = "/timeout")
public Observable<String> timeout() {
	return Observable.timer(1, TimeUnit.MINUTES).map(new Func1<Long, String>() {
		@Override
		public String call(Long aLong) {
			return "single value";
		}
	});
}

If you have a streaming endpoint and client, SSE could be an option. To convert rx.Observable to a Spring SseEmitter use RxResponse.sse(). Here are some examples:

@RequestMapping(method = RequestMethod.GET, value = "/sse")
public SseEmitter single() {
	return RxResponse.sse(Observable.just("single value"));
}

@RequestMapping(method = RequestMethod.GET, value = "/messages")
public SseEmitter messages() {
	return RxResponse.sse(Observable.just("message 1", "message 2", "message 3"));
}

@RequestMapping(method = RequestMethod.GET, value = "/events")
public SseEmitter event() {
	return RxResponse.sse(APPLICATION_JSON_UTF8,
			Observable.just(new EventDto("Spring io", getDate(2016, 5, 19)),
					new EventDto("SpringOnePlatform", getDate(2016, 8, 1))));
}