Configuration
Rqueue offers numerous configuration settings that can be adjusted either through application properties or directly in code.
Beyond basic setup, Rqueue can be highly customized, for example, by adjusting the number of tasks executed concurrently. Further configurations are available via the SimpleRqueueListenerContainerFactory class. Refer to the SimpleRqueueListenerContainerFactory Javadoc for more details.
@Configuration
public class RqueueConfiguration {
@Bean
public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() {
// return SimpleRqueueListenerContainerFactory object
}
}
Task and Queue Concurrency
By default, the number of task executors is twice the number of queues. You can provide a custom or shared task executor using the factory’s setTaskExecutor method.
Queue-level concurrency can be configured using the @RqueueListener annotation’s concurrency field. This can be a fixed number (e.g., 10) or a range (e.g., 5-10). When specified, that queue uses its own task executor; otherwise, the shared task executor is used.
You can also set a global limit on workers using setMaxNumWorkers. The batchSize field in @RqueueListener determines how many messages are fetched at once. By default, listeners with explicit concurrency fetch 10 messages per poll, while others fetch 1.
Increasing the batch size can lead to task rejection if the thread pool is too small and the queueCapacity is not sufficiently large.
class RqueueConfiguration {
@Bean
public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() {
SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory();
//...
factory.setMaxNumWorkers(10);
return factory;
}
}
class RqueueConfiguration {
@Bean
public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() {
SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory();
//...
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("taskExecutor");
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setMaxPoolSize(50);
threadPoolTaskExecutor.setQueueCapacity(0);
threadPoolTaskExecutor.afterPropertiesSet();
factory.setTaskExecutor(threadPoolTaskExecutor);
return factory;
}
}
When providing a custom executor, it is essential to set MaxNumWorkers correctly to avoid over- or under-utilizing the thread pool. Over-utilization can cause task rejection and message consumption delays.
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("ListenerExecutor");
threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
threadPoolTaskExecutor.afterPropertiesSet();
factory.setTaskExecutor(threadPoolTaskExecutor);
Key configuration parameters for the executor include:
-
corePoolSize: The minimum number of active threads. -
maxPoolSize: The maximum number of active threads. -
queueCapacity: The number of tasks that can wait in the internal queue before new tasks are rejected.
With N queues, a common rule of thumb for setting the maximum number of workers is (maxPoolSize + queueCapacity - N).
In this case, N represents the threads allocated for polling. However, this count can vary significantly if priorities are used.
The total number of message pollers is determined by the sum of:
- The number of unique priority groups.
- The number of queues with explicit priority settings (e.g.,
"critical=5,high=2"). - The number of queues without specified priorities.
A safe baseline configuration without complex calculations:
queueCapacity >= 2 * number of queuesmaxPoolSize >= 2 * number of queuescorePoolSize >= number of queues
A non-zero queueCapacity can lead to duplicate message processing. If a message is polled and sits in the executor’s queue longer than its visibilityTimeout, it may be re-polled by another listener. Ensure your visibilityTimeout is long enough to accommodate potential queuing delays.
Manual Container Management
By default, the Rqueue container starts automatically. You can control this behavior using the autoStartup flag. If set to false, you must manually call the start() and stop() methods of the container. For a clean shutdown, also ensure that the destroy() method is called.
class RqueueConfiguration {
@Bean
public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() {
SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory();
//...
factory.setAutoStartup(false);
return factory;
}
}
public class BootstrapController {
@Autowired
private RqueueMessageListenerContainer rqueueMessageListenerContainer;
// ...
public void start() {
// ...
rqueueMessageListenerContainer.start();
}
public void stop() {
// ...
rqueueMessageListenerContainer.stop();
}
public void destroy() {
// ...
rqueueMessageListenerContainer.destroy();
}
//...
}
Message Converter Configuration
To customize message conversion, set the property rqueue.message.converter.provider.class to the fully qualified name of your provider class. This class must implement the MessageConverterProvider interface and return a Spring MessageConverter.
Your custom provider must implement com.github.sonus21.rqueue.converter.MessageConverterProvider.
class MyMessageConverterProvider implements MessageConverterProvider {
@Override
public MessageConverter getConverter() {
// here any message converter can be returned except null
return new MyMessageConverter();
}
}
The DefaultRqueueMessageConverter handles serialization for most use cases, but it may fail if classes are not shared between producing and consuming applications. To avoid shared dependencies, consider using JSON-based converters like com.github.sonus21.rqueue.converter.JsonMessageConverter or Spring’s JacksonJsonMessageConverter. These serialize payloads into JSON, improving interoperability.
Other serialization formats like MessagePack or Protocol Buffers (ProtoBuf) can also be implemented based on your requirements.
Generic Envelope Types
GenericMessageConverter (used by the default converter) supports single-level generic envelope types such as Event<T>. The type parameter is resolved at serialization time by inspecting the runtime class of the field value that corresponds to T.
// A generic envelope type
public class Event<T> {
private String id;
private T payload;
// getters/setters ...
}
// Enqueue
Event<Order> event = new Event<>("evt-123", order);
rqueueMessageEnqueuer.enqueue("order-queue", event);
// Consume
@RqueueListener(value = "order-queue")
public void onEvent(Event<Order> event) { ... }
The serialized form encodes both the envelope class and the type parameter:
{"msg":"...","name":"com.example.Event#com.example.Order"}
Constraints:
- The type parameter
Tmust be a non-generic concrete class (e.g.Order, notList<Order>). - At least one field of type
Ton the envelope class must be non-null at serialization time, so the runtime type can be determined. - For
List<T>, items must also be non-generic concrete classes. Envelopes likeList<Event<Order>>are not supported. - Multi-level nesting (e.g.
Wrapper<Event<T>>) is not supported.
Additional Configuration
-
rqueue.retry.per.poll: Determines how many times a polled message is retried immediately if processing fails, before it is moved back to the queue for a subsequent poll. The default value is1. If increased toN, the message will be retriedNtimes consecutively within the same polling cycle.