Rqueue | Redis Queue For Spring Framework
Rqueue is an asynchronous task executor (worker) built for the Spring Framework based on Spring’s messaging library, backed by Redis. It can serve as a message broker where all service code is within Spring/Spring Boot applications. Rqueue fully supports both Spring and Spring Boot frameworks.
Get started now View it on GitHub
Features
- Instant Delivery: Immediate execution of messages.
- Message Scheduling: Schedule messages for any arbitrary period.
- Unique Message Processing: Ensures unique processing of messages based on message ID.
- Periodic Message Processing: Process the same message at defined intervals.
- Priority Tasks: Support for task prioritization (e.g., high, low, medium).
- Message Delivery Guarantee: Ensures each message is consumed at least once, and may be retried in case of worker failures or restarts.
- Automatic Serialization and Deserialization of Messages.
- Message Multicasting: Call multiple message listeners for each message.
- Batch Message Polling: Fetch multiple messages from Redis in one operation.
- Metrics: Provides insights into in-flight messages, waiting messages, and delayed messages.
- Competing Consumers: Multiple workers can consume messages in parallel.
- Concurrency Control: Configurable concurrency for message listeners.
- Queue Priority: Supports both group-level and sub-queue level priorities.
- Long Execution Jobs: Check-in mechanism for long-running jobs.
- Execution Backoff: Supports exponential and fixed backoff strategies.
- Do not retry: Supports do not retry strategy.
- Middleware: Allows integration of middleware to intercept messages before processing.
- Callbacks: Supports callbacks for handling dead letter queues and discarding messages.
- Events: Provides bootstrap and task execution events.
- Redis Connection Options: Supports different Redis configurations including Redis Cluster and Redis Sentinel.
- Reactive Programming: Integrates with reactive Redis and Spring WebFlux.
- Web Dashboard: Provides a web-based dashboard for managing queues and monitoring queue metrics.
Requirements
- Spring 5+, 6+
- Spring Boot 2+, 3+
- Spring Reactive
- Lettuce client for Redis cluster
- Read master preference for Redis cluster
Getting Started
All queue names are dynamic. Manually creating queues using registerQueue
method can lead to inconsistencies. Queues should only be created when using Rqueue as a producer.
Sample Apps
The Rqueue GitHub repository includes several sample apps for local testing and demonstration:
Project Integration
When configuring the Redis connection factory, ensure to set readFrom
to MASTER_PREFERRED
for Redis cluster compatibility, otherwise the application may fail to start.
Spring Boot
Use Rqueue Spring Boot Starter 3.x for Spring Boot 3.x, and Rqueue Spring Boot Starter 2.x for Spring Boot 2.x.
Get the latest version of Rqueue Spring Boot Starter from Maven Central. Add the dependency to your project:
Spring Boot 2.x Setup
- Gradle
implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.13.1-RELEASE'
- Maven
<dependency> <groupId>com.github.sonus21</groupId> <artifactId>rqueue-spring-boot-starter</artifactId> <version>2.13.1-RELEASE</version> </dependency>
Spring Boot 3.x Setup
- Gradle
implementation 'com.github.sonus21:rqueue-spring-boot-starter:3.1.0-RELEASE'
- Maven
<dependency> <groupId>com.github.sonus21</groupId> <artifactId>rqueue-spring-boot-starter</artifactId> <version>3.1.0-RELEASE</version> </dependency>
Spring Framework
Use Rqueue Spring 3.x for Spring Framework 6.x, and Rqueue Spring 2.x for Spring Framework 5.x.
Get the latest version of Rqueue Spring from Maven Central. Add the dependency to your project:
Spring Framework 5.x Setup
- Gradle
implementation 'com.github.sonus21:rqueue-spring:2.13.1-RELEASE'
- Maven
<dependency> <groupId>com.github.sonus21</groupId> <artifactId>rqueue-spring</artifactId> <version>2.13.1-RELEASE</version> </dependency>
Spring Framework 6.x Setup
- Gradle
implementation 'com.github.sonus21:rqueue-spring:3.1.0-RELEASE'
- Maven
<dependency> <groupId>com.github.sonus21</groupId> <artifactId>rqueue-spring</artifactId> <version>3.1.0-RELEASE</version> </dependency>
For Spring Framework, ensure to:
- Add
EnableRqueue
annotation on the main method. - Provide a
RedisConnectionFactory
bean.
Example Spring Application Configuration
@EnableRqueue
public class Application {
@Bean
public RedisConnectionFactory redisConnectionFactory() {
// return a Redis connection factory
}
}
Once Rqueue is configured in Spring or Spring Boot as described above, you can start using Rqueue methods and annotations. The usage remains consistent whether using Spring Boot or the Spring framework.
Message Publishing / Task Submission
All messages should be sent using the RqueueMessageEnqueuer
bean’s enqueueXXX
, enqueueInXXX
, and enqueueAtXXX
methods. Use the appropriate method based on your use case:
import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer;
@Component
public class MessageService {
@Autowired
private RqueueMessageEnqueuer rqueueMessageEnqueuer;
public void doSomething() {
rqueueMessageEnqueuer.enqueue("simple-queue", "Rqueue is configured");
}
public void createJob(Job job) {
rqueueMessageEnqueuer.enqueue("job-queue", job);
}
public void sendNotification(Notification notification) {
rqueueMessageEnqueuer.enqueueIn("notification-queue", notification, 30 * 1000L);
}
public void createInvoice(Invoice invoice, Instant instant) {
rqueueMessageEnqueuer.enqueueAt("invoice-queue", invoice, instant);
}
public void sendSms(Sms sms, SmsPriority priority) {
rqueueMessageEnqueuer.enqueueWithPriority("sms-queue", priority.value(), sms);
}
public void sendPeriodicEmail(Email email) {
rqueueMessageEnqueuer.enqueuePeriodic("email-queue", email, 30_000);
}
}
Worker / Consumer / Task Executor / Listener
Annotate any public method of a Spring bean with RqueueListener
to make it a message consumer:
import com.github.sonus21.rqueue.annotation.RqueueListener;
import com.github.sonus21.rqueue.listener.RqueueMessageHeaders;
@Component
@Slf4j
public class MessageListener {
@RqueueListener(value = "simple-queue")
public void handleSimpleMessage(String message) {
log.info("Received message from simple-queue: {}", message);
}
@RqueueListener(value = "job-queue", numRetries = "3", deadLetterQueue = "failed-job-queue", concurrency = "5-10")
public void handleJob(Job job) {
log.info("Received job: {}", job);
}
@RqueueListener(value = "push-notification-queue", numRetries = "3", deadLetterQueue = "failed-notification-queue")
public void handleNotification(Notification notification) {
log.info("Received notification: {}", notification);
}
@RqueueListener(value = "sms", priority = "critical=10,high=8,medium=4,low=1")
public void handleSms(Sms sms) {
log.info("Received SMS: {}", sms);
}
@RqueueListener(value = "chat-indexing", priority = "20", priorityGroup = "chat")
public void handleChatIndexing(ChatIndexing chatIndexing) {
log.info("Received chat indexing message: {}", chatIndexing);
}
@RqueueListener(value = "chat-indexing-daily", priority = "10", priorityGroup = "chat")
public void handleDailyChatIndexing(ChatIndexing chatIndexing) {
log.info("Received daily chat indexing message: {}", chatIndexing);
}
}
Notes:
-
Retry Mechanism: Configure retry behavior using
numRetries
anddeadLetterQueue
attributes. -
Concurrency: Adjust concurrency using the
concurrency
attribute. -
Priority: Set message priority using the
priority
attribute.
Advanced Configuration
Rqueue Configuration
For advanced configurations such as message serialization, queue properties, message listener details, and more, refer to the official documentation.
Support
For any issues, questions, or feature requests, please create an issue on the GitHub repository or contact the maintainers directly.
License
Rqueue is licensed under the Apache License 2.0. See the LICENSE file for more details.