8. Spring Cloud Stream

Spring Cloud GCP provides a Spring Cloud Stream binder to Google Cloud Pub/Sub.

The provided binder relies on the Spring Integration Channel Adapters for Google Cloud Pub/Sub.

Maven coordinates, using Spring Cloud GCP BOM:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
</dependency>

Gradle coordinates:

dependencies {
    compile group: 'org.springframework.cloud', name: 'spring-cloud-gcp-pubsub-stream-binder'
}

8.1 Overview

This binder binds producers to Google Cloud Pub/Sub topics and consumers to subscriptions.

[Note]Note

Partitioning is currently not supported by this binder.

8.2 Configuration

You can configure the Spring Cloud Stream Binder for Google Cloud Pub/Sub to automatically generate the underlying resources, like the Google Cloud Pub/Sub topics and subscriptions for producers and consumers. For that, you can use the spring.cloud.stream.gcp.pubsub.bindings.<channelName>.<consumer|producer>.auto-create-resources property, which is turned ON by default.

Starting with version 1.1, these and other binder properties can be configured globally for all the bindings, e.g. spring.cloud.stream.gcp.pubsub.default.consumer.auto-create-resources.

If you are using Pub/Sub auto-configuration from the Spring Cloud GCP Pub/Sub Starter, you should refer to the configuration section for other Pub/Sub parameters.

[Note]Note

To use this binder with a running emulator, configure its host and port via spring.cloud.gcp.pubsub.emulator-host.

8.2.1 Producer Destination Configuration

If automatic resource creation is turned ON and the topic corresponding to the destination name does not exist, it will be created.

For example, for the following configuration, a topic called myEvents would be created.

application.properties. 

spring.cloud.stream.bindings.events.destination=myEvents
spring.cloud.stream.gcp.pubsub.bindings.events.producer.auto-create-resources=true

8.2.2 Consumer Destination Configuration

If automatic resource creation is turned ON and the subscription and/or the topic do not exist for a consumer, a subscription and potentially a topic will be created. The topic name will be the same as the destination name, and the subscription name will be the destination name followed by the consumer group name.

Regardless of the auto-create-resources setting, if the consumer group is not specified, an anonymous one will be created with the name anonymous.<destinationName>.<randomUUID>. Then when the binder shuts down, all Pub/Sub subscriptions created for anonymous consumer groups will be automatically cleaned up.

For example, for the following configuration, a topic named myEvents and a subscription called myEvents.consumerGroup1 would be created. If the consumer group is not specified, a subscription called anonymous.myEvents.a6d83782-c5a3-4861-ac38-e6e2af15a7be would be created and later cleaned up.

[Important]Important

If you are manually creating Pub/Sub subscriptions for consumers, make sure that they follow the naming convention of <destinationName>.<consumerGroup>.

application.properties. 

spring.cloud.stream.bindings.events.destination=myEvents
spring.cloud.stream.gcp.pubsub.bindings.events.consumer.auto-create-resources=true

# specify consumer group, and avoid anonymous consumer group generation
spring.cloud.stream.bindings.events.group=consumerGroup1

8.3 Streaming vs. Polled Input

Many Spring Cloud Stream applications will use the built-in Sink binding, which triggers the streaming input binder creation. Messages can then be consumed with an input handler marked by @StreamListener(Sink.INPUT) annotation, at whatever rate Pub/Sub sends them.

For more control over the rate of message arrival, a polled input binder can be set up by defining a custom binding interface with an @Input-annotated method returning PollableMessageSource.

public interface PollableSink {

	@Input("input")
	PollableMessageSource input();
}

The PollableMessageSource can then be injected and queried, as needed.

@EnableBinding(PollableSink.class)
public class SinkExample {

    @Autowired
    PollableMessageSource destIn;

    @Bean
    public ApplicationRunner singlePollRunner() {
        return args -> {
            // This will poll only once.
            // Add a loop or a scheduler to get more messages.
            destIn.poll((message) -> System.out.println("Message retrieved: " + message));
        };
    }
}

8.4 Sample

Sample applications are available: