Sometimes, there’s a need to schedule unique messages in a queue. In these instances, the older message should be discarded to ensure only the newest one is consumed. Implementing this requires careful consideration of what defines message uniqueness — whether it’s a single ID
field or a combination of multiple fields. Once identified, you can use a pre-execution message processor to manage the discarding of older messages before processing the latest ones.
Enqueue Process
interface MessageRepository {
Long getLatestEnqueueAt(String messageId);
void addEnqueueAt(String messageId, Long time);
}
class SimpleMessage {
private String id;
}
class MessageSender {
@Autowited
private MessageRepository messageRepository;
@Autowired
private RqueueMessageEnqueuer rqueueMessageEnqueuer;
public void sendMessage(SimpleMessage message) {
String id = message.getId();
//TODO handle error case
messageRepository.addEnqueueAt(id, System.currentTimeMillis());
rqueueMessageEnqueuer.enqueueIn("simple-queue", message, Duration.ofMinutes(10));
}
}
UniqueMessageProcessor that implements MessageProcessor and returns false for the older messages.
class UniqueMessageProcessor implements MessageProcessor {
@Autowired
private MessageRepository messageRepository;
@Override
boolean process(Object message, RqueueMessage rqueueMessage) {
if (message instanceof SimpleMessage) {
// here you can get id using composite fields, add a method to find the unique id
String messageId = ((SimpleMessage) message).getId();
Long latestEnqueueTime = messageRepository.getLatestEnqueueAt(messageId);
// no entry return true
// no new message return true
return latestEnqueueTime == null || latestEnqueueTime <= rqueueMessage.getQueuedTime();
}
return true;
}
}
Rqueue configuration, that uses preExecutionMessageProcessor to skip messages.
class RqueueConfiguration {
@Bean
public MessageProcessor preExecutionMessageProcessor() {
return new UniqueMessageProcessor();
}
@Bean
public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() {
SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory();
MessageProcessor preExecutionMessageProcessor = preExecutionMessageProcessor();
factory.setPreExecutionMessageProcessor(preExecutionMessageProcessor);
return factory;
}
}
If your use case requires that older message should be executed while new one should be ignored than you can also implement that using pre execution message processor.
interface MessageRepository {
Long getEnqueueAt(String messageId);
void saveEnqueueAt(String messageId, Long time);
}
class MessageSender {
@Autowired
private RqueueMessageEnqueuer rqueueMessageEnqueuer;
public void sendMessage(SimpleMessage message) {
rqueueMessageEnqueuer.enqueueIn("simple-queue", message, Duration.ofMinutes(10));
}
}
class UniqueMessageProcessor implements MessageProcessor {
@Autowired
private MessageRepository messageRepository;
@Override
boolean process(Object message, RqueueMessage rqueueMessage) {
if (message instanceof SimpleMessage) {
String messageId = ((SimpleMessage) message).getId();
Long enqueueAt = messageRepository.getEnqueueAt(messageId);
if (enqueueAt == null) {
messageRepository.saveEnqueueAt(messageId, System.currentTimeMillis());
return true;
}
// allow running the same message multiple times
return enqueueAt == rqueueMessage.getQueuedTime();
}
return true;
}
}
This does not handle the following cases
- Multiple similar messages enqueue at the same time.
- Multiple similar messages are trying to run at the same time.
- Enqueuing new message when the existing one is running.
- Enqueuing new message when the older message was discarded.