RabbitMQ does not support partitioning natively.
Sometimes, it is advantageous to send data to specific partitions — for example, when you want to strictly order message processing, all messages for a particular customer should go to the same partition.
The RabbitMessageChannelBinder
provides partitioning by binding a queue for each partition to the destination exchange.
The following Java and YAML examples show how to configure the producer:
Producer.
@SpringBootApplication @EnableBinding(Source.class) public class RabbitPartitionProducerApplication { private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final String[] data = new String[] { "abc1", "def1", "qux1", "abc2", "def2", "qux2", "abc3", "def3", "qux3", "abc4", "def4", "qux4", }; public static void main(String[] args) { new SpringApplicationBuilder(RabbitPartitionProducerApplication.class) .web(false) .run(args); } @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000")) public Message<?> generate() { String value = data[RANDOM.nextInt(data.length)]; System.out.println("Sending: " + value); return MessageBuilder.withPayload(value) .setHeader("partitionKey", value) .build(); } }
application.yml.
spring: cloud: stream: bindings: output: destination: partitioned.destination producer: partitioned: true partition-key-expression: headers['partitionKey'] partition-count: 2 required-groups: - myGroup
Note | |
---|---|
The configuration in the prececing example uses the default partitioning ( The |
The following configuration provisions a topic exchange:
The following queues are bound to that exchange:
The following bindings associate the queues to the exchange:
The following Java and YAML examples continue the previous examples and show how to configure the consumer:
Consumer.
@SpringBootApplication @EnableBinding(Sink.class) public class RabbitPartitionConsumerApplication { public static void main(String[] args) { new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class) .web(false) .run(args); } @StreamListener(Sink.INPUT) public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) { System.out.println(in + " received from queue " + queue); } }
application.yml.
spring: cloud: stream: bindings: input: destination: partitioned.destination group: myGroup consumer: partitioned: true instance-index: 0
Important | |
---|---|
The |