Pubsub
Spring Data provides dedicated messaging integration for Valkey, similar in functionality and naming to the JMS integration in Spring Framework.
Valkey messaging can be roughly divided into two areas of functionality:
- Publication or production of messages
- Subscription or consumption of messages
This is an example of the pattern often called Publish/Subscribe (Pub/Sub for short). The ValkeyTemplate class is used for message production. For asynchronous reception similar to Java EE’s message-driven bean style, Spring Data provides a dedicated message listener container that is used to create Message-Driven POJOs (MDPs) and, for synchronous reception, the ValkeyConnection contract.
The io.valkey.springframework.data.connection and io.valkey.springframework.data.listener packages provide the core functionality for Valkey messaging.
Publishing (Sending Messages)
Section titled “Publishing (Sending Messages)”To publish a message, you can use, as with the other operations, either the low-level [Reactive]ValkeyConnection or the high-level [Reactive]ValkeyOperations. Both entities offer the publish method, which accepts the message and the destination channel as arguments. While ValkeyConnection requires raw data (array of bytes), the [Reactive]ValkeyOperations lets arbitrary objects be passed in as messages, as shown in the following example:
// send message through connectionValkeyConnection con = …byte[] msg = …byte[] channel = …con.pubSubCommands().publish(msg, channel);
// send message through ValkeyOperationsValkeyOperations operations = …Long numberOfClients = operations.convertAndSend("hello!", "world");// send message through connectionReactiveValkeyConnection con = …ByteBuffer[] msg = …ByteBuffer[] channel = …con.pubSubCommands().publish(msg, channel);
// send message through ReactiveValkeyOperationsReactiveValkeyOperations operations = …Mono<Long> numberOfClients = operations.convertAndSend("hello!", "world");Subscribing (Receiving Messages)
Section titled “Subscribing (Receiving Messages)”On the receiving side, one can subscribe to one or multiple channels either by naming them directly or by using pattern matching. The latter approach is quite useful, as it not only lets multiple subscriptions be created with one command but can also listen on channels not yet created at subscription time (as long as they match the pattern).
At the low-level, ValkeyConnection offers the subscribe and pSubscribe methods that map the Valkey commands for subscribing by channel or by pattern, respectively. Note that multiple channels or patterns can be used as arguments. To change the subscription of a connection or query whether it is listening, ValkeyConnection provides the getSubscription and isSubscribed methods.
As mentioned earlier, once subscribed, a connection starts waiting for messages. Only commands that add new subscriptions, modify existing subscriptions, and cancel existing subscriptions are allowed. Invoking anything other than subscribe, pSubscribe, unsubscribe, or pUnsubscribe throws an exception.
In order to subscribe to messages, one needs to implement the MessageListener callback. Each time a new message arrives, the callback gets invoked and the user code gets run by the onMessage method. The interface gives access not only to the actual message but also to the channel it has been received through and the pattern (if any) used by the subscription to match the channel. This information lets the callee differentiate between various messages not just by content but also examining additional details.
Message Listener Containers
Section titled “Message Listener Containers”Due to its blocking nature, low-level subscription is not attractive, as it requires connection and thread management for every single listener. To alleviate this problem, Spring Data offers io.valkey.springframework.data.listener.ValkeyMessageListenerContainer, which does all the heavy lifting. If you are familiar with EJB and JMS, you should find the concepts familiar, as it is designed to be as close as possible to the support in Spring Framework and its message-driven POJOs (MDPs).
io.valkey.springframework.data.listener.ValkeyMessageListenerContainer acts as a message listener container. It is used to receive messages from a Valkey channel and drive the io.valkey.springframework.data.connection.MessageListener instances that are injected into it. The listener container is responsible for all threading of message reception and dispatches into the listener for processing. A message listener container is the intermediary between an MDP and a messaging provider and takes care of registering to receive messages, resource acquisition and release, exception conversion, and the like. This lets you as an application developer write the (possibly complex) business logic associated with receiving a message (and reacting to it) and delegates boilerplate Valkey infrastructure concerns to the framework.
A io.valkey.springframework.data.connection.MessageListener can additionally implement io.valkey.springframework.data.connection.SubscriptionListener to receive notifications upon subscription/unsubscribe confirmation. Listening to subscription notifications can be useful when synchronizing invocations.
Furthermore, to minimize the application footprint, io.valkey.springframework.data.listener.ValkeyMessageListenerContainer lets one connection and one thread be shared by multiple listeners even though they do not share a subscription. Thus, no matter how many listeners or channels an application tracks, the runtime cost remains the same throughout its lifetime. Moreover, the container allows runtime configuration changes so that you can add or remove listeners while an application is running without the need for a restart. Additionally, the container uses a lazy subscription approach, using a ValkeyConnection only when needed. If all the listeners are unsubscribed, cleanup is automatically performed, and the thread is released.
To help with the asynchronous nature of messages, the container requires a java.util.concurrent.Executor (or Spring’s TaskExecutor) for dispatching the messages. Depending on the load, the number of listeners, or the runtime environment, you should change or tweak the executor to better serve your needs. In particular, in managed environments (such as app servers), it is highly recommended to pick a proper TaskExecutor to take advantage of its runtime.
The MessageListenerAdapter
Section titled “The MessageListenerAdapter”The io.valkey.springframework.data.listener.adapter.MessageListenerAdapter class is the final component in Spring’s asynchronous messaging support. In a nutshell, it lets you expose almost any class as a MDP (though there are some constraints).
Consider the following interface definition:
public interface MessageDelegate { void handleMessage(String message); void handleMessage(Map message); void handleMessage(byte[] message); void handleMessage(Serializable message); // pass the channel/pattern as well void handleMessage(Serializable message, String channel); }Notice that, although the interface does not extend the MessageListener interface, it can still be used as a MDP by using the io.valkey.springframework.data.listener.adapter.MessageListenerAdapter class. Notice also how the various message handling methods are strongly typed according to the contents of the various Message types that they can receive and handle. In addition, the channel or pattern to which a message is sent can be passed in to the method as the second argument of type String:
public class DefaultMessageDelegate implements MessageDelegate { // implementation elided for clarity...}Notice how the above implementation of the MessageDelegate interface (the above DefaultMessageDelegate class) has no Valkey dependencies at all. It truly is a POJO that we make into an MDP with the following configuration:
@Configurationclass MyConfig {
// …
@Bean DefaultMessageDelegate listener() { return new DefaultMessageDelegate(); }
@Bean MessageListenerAdapter messageListenerAdapter(DefaultMessageDelegate listener) { return new MessageListenerAdapter(listener, "handleMessage"); }
@Bean ValkeyMessageListenerContainer valkeyMessageListenerContainer(ValkeyConnectionFactory connectionFactory, MessageListenerAdapter listener) {
ValkeyMessageListenerContainer container = new ValkeyMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listener, ChannelTopic.of("chatroom")); return container; }}<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:valkey="http://www.springframework.org/schema/valkey" xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/valkey https://spring.valkey.io/schema/valkey/spring-valkey-1.0.xsd">
<!-- the default ConnectionFactory --><valkey:listener-container> <!-- the method attribute can be skipped as the default method name is "handleMessage" --> <valkey:listener ref="listener" method="handleMessage" topic="chatroom" /></valkey:listener-container>
<bean id="listener" class="valkeyexample.DefaultMessageDelegate"/> ...</beans>The preceding example uses the Valkey namespace to declare the message listener container and automatically register the POJOs as listeners. The full-blown beans definition follows:
<bean id="messageListener" class="io.valkey.springframework.data.listener.adapter.MessageListenerAdapter"> <constructor-arg> <bean class="valkeyexample.DefaultMessageDelegate"/> </constructor-arg></bean>
<bean id="valkeyContainer" class="io.valkey.springframework.data.listener.ValkeyMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="messageListeners"> <map> <entry key-ref="messageListener"> <bean class="io.valkey.springframework.data.listener.ChannelTopic"> <constructor-arg value="chatroom"/> </bean> </entry> </map> </property></bean>Each time a message is received, the adapter automatically and transparently performs translation (using the configured ValkeySerializer) between the low-level format and the required object type. Any exception caused by the method invocation is caught and handled by the container (by default, exceptions get logged).
Reactive Message Listener Container
Section titled “Reactive Message Listener Container”Spring Data offers io.valkey.springframework.data.listener.ReactiveValkeyMessageListenerContainer which does all the heavy lifting of conversion and subscription state management on behalf of the user.
The message listener container itself does not require external threading resources. It uses the driver threads to publish messages.
ReactiveValkeyConnectionFactory factory = …ReactiveValkeyMessageListenerContainer container = new ReactiveValkeyMessageListenerContainer(factory);
Flux<ChannelMessage<String, String>> stream = container.receive(ChannelTopic.of("my-channel"));To await and ensure proper subscription, you can use the receiveLater method that returns a Mono<Flux<ChannelMessage>>.
The resulting Mono completes with an inner publisher as a result of completing the subscription to the given topics. By intercepting onNext signals, you can synchronize server-side subscriptions.
ReactiveValkeyConnectionFactory factory = …ReactiveValkeyMessageListenerContainer container = new ReactiveValkeyMessageListenerContainer(factory);
Mono<Flux<ChannelMessage<String, String>>> stream = container.receiveLater(ChannelTopic.of("my-channel"));
stream.doOnNext(inner -> // notification hook when Valkey subscriptions are synchronized with the server) .flatMapMany(Function.identity()) .…;Subscribing via template API
Section titled “Subscribing via template API”As mentioned above you can directly use io.valkey.springframework.data.core.ReactiveValkeyTemplate to subscribe to channels / patterns. This approach
offers a straight forward, though limited solution as you lose the option to add subscriptions after the initial
ones. Nevertheless you still can control the message stream via the returned Flux using eg. take(Duration). When
done reading, on error or cancellation all bound resources are freed again.
valkeyTemplate.listenToChannel("channel1", "channel2").doOnNext(msg -> { // message processing ...}).subscribe();