Getting Started with the Java Flow API
Exploring the Java Flow API
The Java Flow API, introduced in Java 9, provides a standard and efficient way for components of a reactive system to communicate with each other asynchronously, without requiring them to know each other's implementation details. In this post, we'll explore the key components of the Flow API and show how to use them to implement the Observer pattern in a more standardized way.
Key Components of the Flow API
The Flow API defines four key components: Publisher
, Subscriber
, Subscription
, and Processor
.
Publisher
A Publisher
is a provider of data items, which publishes them to one or more Subscriber
instances. To implement a custom Publisher, you need to implement the java.util.concurrent.Flow.Publisher
interface, which defines the following methods:
subscribe(Subscriber<? super T> subscriber)
: Subscribes aSubscriber
to thisPublisher
, returning a newSubscription
instance that represents the connection between them.
Subscriber
A Subscriber
is a consumer of data items, which receives them from a Publisher
. To implement a custom Subscriber, you need to implement the java.util.concurrent.Flow.Subscriber
interface, which defines the following methods:
onSubscribe(Subscription subscription)
: Receives aSubscription
instance from thePublisher
, which allows theSubscriber
to request and receive data items from thePublisher
.onNext(T item)
: Receives the next data item from thePublisher
.onError(Throwable throwable)
: Receives an error notification from thePublisher
.onComplete()
: Receives a completion notification from thePublisher
.
Subscription
A Subscription
is a connection between a Publisher
and a Subscriber
, which allows the Subscriber
to request and receive data items from the Publisher
. To implement a custom Subscription, you need to implement the java.util.concurrent.Flow.Subscription
interface, which defines the following methods:
request(long n)
: Requests n data items from thePublisher
.cancel()
: Cancels theSubscription
, terminating the connection between thePublisher
andSubscriber
.
Processor
A Processor
is a component that both subscribes to a Publisher
and publishes to a Subscriber
, allowing it to transform or filter data items. To implement a custom Processor, you need to implement both the java.util.concurrent.Flow.Publisher
and java.util.concurrent.Flow.Subscriber
interfaces.
Implementing a Temperature Converter using the Java Flow API
In this tutorial, we'll create a simple temperature converter application that allows users to convert temperature readings between Celsius and Fahrenheit. We'll use the Java Flow API to implement a reactive architecture that allows temperature readings to be published by a Publisher
and consumed by multiple Subscriber
instances, each of which can perform its own temperature conversion.
Why Use the Java Flow API for a Temperature Converter?
A temperature converter application might seem like a simple problem that can be solved using basic Java programming techniques. However, there are several good reasons to use the Java Flow API:
- Asynchronous processing: The Java Flow API provides a standardized way to handle asynchronous processing, allowing the temperature readings to be processed in real-time as they arrive, without blocking the main application thread.
- Reactive architecture: The Java Flow API supports a reactive architecture, which allows multiple
Subscriber
instances to consume the same data stream simultaneously, each performing its own processing. This makes it easy to extend the application with new functionality in the future. - Backpressure: The Java Flow API supports backpressure, which allows the
Publisher
to control the rate at which temperature readings are published, preventing theSubscriber
instances from being overwhelmed with too much data.
Implementing the Temperature Converter
Let's start by defining a simple TemperatureSensor
class that generates temperature readings at random intervals and publishes them to a Publisher
:
import java.util.Random;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.SubmissionPublisher;
public class TemperatureSensor {
private final SubmissionPublisher publisher;
private final Random random;
public TemperatureSensor() {
this.publisher = new SubmissionPublisher<>();
this.random = new Random();
}
public void start() {
Thread thread =
new Thread(
() -> {
while (true) {
double temperature = random.nextDouble() * 100;
publisher.submit(temperature);
sleep(random.nextInt(5000));
}
});
thread.start();
}
public void stop() {
publisher.close();
}
public Publisher asPublisher() {
return publisher;
}
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
The TemperatureSensor
class uses a SubmissionPublisher
to publish temperature readings to a Subscriber
. The start
method runs a loop that generates a random temperature reading and publishes it to the Subscriber
via the SubmissionPublisher
. The stop
method closes the SubmissionPublisher
. Finally, the asPublisher
method returns the Publisher
instance, which allows other components to subscribe to it.
Next, let's define a TemperatureConverter
interface that defines a single method for converting temperature readings between Celsius and Fahrenheit:
public interface TemperatureConverter {
double convert(double temperature);
}
We'll implement two concrete TemperatureConverter
classes: CelsiusConverter
, which converts Celsius to Fahrenheit, and FahrenheitConverter
, which converts Fahrenheit to Celsius.
public class CelsiusConverter implements TemperatureConverter {
@Override
public double convert(double temperature) {
return temperature * 1.8 + 32;
}
}
public class FahrenheitConverter implements TemperatureConverter {
@Override
public double convert(double temperature) {
return (temperature - 32) / 1.8;
}
}
Now, let's define a simple TemperatureDisplay
class that subscribes to the TemperatureSensor
and prints temperature readings to the console, after converting them to the desired temperature unit:
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
public class TemperatureDisplay implements Subscriber {
private final TemperatureConverter converter;
private Subscription subscription;
public TemperatureDisplay(TemperatureConverter converter) {
this.converter = converter;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Double temperature) {
double converted = converter.convert(temperature);
System.out.printf("%.2f%s\n", converted, converter instanceof CelsiusConverter ? "F" : "C");
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
The TemperatureDisplay
class implements the Subscriber
interface to receive temperature readings from the TemperatureSensor
. The constructor takes a TemperatureConverter
instance, which is used to convert the temperature readings to the desired unit. The onSubscribe
method receives a Subscription
instance, which it stores for later use, and requests the first temperature reading by calling subscription.request(1)
. The onNext
method receives each temperature reading, converts it to the desired unit, and prints it to the console with the appropriate unit suffix. Finally, the onError
method prints any error that occurs to the console, and the onComplete
method indicates that the temperature readings are complete.
Finally, let's see how to use the TemperatureSensor
and TemperatureDisplay
classes together to implement a simple temperature converter application:
public class Main {
public static void main(String[] args){
TemperatureSensor sensor = new TemperatureSensor();
sensor.start();
sensor.asPublisher().subscribe(new TemperatureDisplay(new CelsiusConverter()));
sensor.asPublisher().subscribe(new TemperatureDisplay(new FahrenheitConverter()));
}
}
The Main
class creates a TemperatureSensor
instance, starts it by calling sensor.start()
, and subscribes two TemperatureDisplay
instances to its Publisher
, one for Celsius readings and one for Fahrenheit readings.
When you run the Main
class, you should see temperature readings printed to the console at random intervals, converted to both Celsius and Fahrenheit:
98.22F
36.05C
13.61C
74.18F
...
And that's it! We've seen how to use the Java Flow API to implement a simple temperature converter application that allows temperature readings to be converted between Celsius and Fahrenheit in real-time, using a reactive architecture that allows multiple Subscriber
instances to consume the same data stream.
Comments
Post a Comment