VOOZH about

URL: https://www.javacodegeeks.com/2014/11/use-reactive-streams-api-to-combine-akka-streams-with-rxjava.html

⇱ Use reactive streams API to combine akka-streams with rxJava


Just a quick article this time, since I’m still experimenting with this stuff. There is a lot of talk around reactive programming. In Java 8 we’ve got the Stream API, we got rxJava we got ratpack and Akka has got akka-streams.

The main issue with these implementations is that they aren’t compatible. You can’t connect the subscriber of one implementation to the publisher of another. Luckily an initiative has started to provide a way that these different implementations can work together:

 
 
 

“It is the intention of this specification to allow the creation of many conforming implementations, which by virtue of abiding by the rules will be able to interoperate smoothly, preserving the aforementioned benefits and characteristics across the whole processing graph of a stream application.”

From – http://www.reactive-streams.org/

How does this work

Now how do we do this? Lets look at a quick example based on the akka-stream provided examples (from here). In the following listing:

package sample.stream
 
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{SubscriberSink, PublisherSource, Source}
import com.google.common.collect.{DiscreteDomain, ContiguousSet}
import rx.RxReactiveStreams
import rx.Observable;
import scala.collection.JavaConverters._
 
object BasicTransformation {
 
 def main(args: Array[String]): Unit = {
 
 // define an implicit actorsystem and import the implicit dispatcher
 implicit val system = ActorSystem("Sys")
 import system.dispatcher
 
 // flow materializer determines how the stream is realized.
 // this time as a flow between actors.
 implicit val materializer = FlowMaterializer()
 
 // input text for the stream.
 val text =
 """|Lorem Ipsum is simply dummy text of the printing and typesetting industry.
 |Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, 
 |when an unknown printer took a galley of type and scrambled it to make a type 
 |specimen book.""".stripMargin
 
 // create an observable from a simple list (this is in rxjava style)
 val first = Observable.from(text.split("\\s").toList.asJava);
 // convert the rxJava observable to a publisher
 val publisher = RxReactiveStreams.toPublisher(first);
 // based on the publisher create an akka source
 val source = PublisherSource(publisher);
 
 // now use the akka style syntax to stream the data from the source
 // to the sink (in this case this is println)
 source.
 map(_.toUpperCase). // executed as actors
 filter(_.length > 3).
 foreach { el => // the sink/consumer
 println(el)
 }.
 onComplete(_ => system.shutdown()) // lifecycle event
 }
}

The code comments in this example explain pretty much what is happening. What we do here is we create a rxJava based Observable. Convert this Observable to a “reactive streams” publisher and use this publisher to create an akka-streams source. For the rest of the code we can use the akka-stream style flow API to model the stream. In this case we just do some filtering and print out the result.

Do you want to know how to develop your skillset to become a Java Rockstar?
Subscribe to our newsletter to start Rocking right now!
To get you started we give you our best selling eBooks for FREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
I agree to the Terms and Privacy Policy

Thank you!

We will contact you soon.

👁 Photo of Jos Dirksen
Jos Dirksen
November 13th, 2014Last Updated: November 12th, 2014
0 86 2 minutes read
Subscribe

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Back to top button
Close
wpDiscuz