Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Sometimes, there’s a need to schedule unique messages in a queue. In these instances, the older message should be discarded to ensure only the newest one is consumed. Implementing this requires careful consideration of what defines message uniqueness — whether it’s a single ID field or a combination of multiple fields. Once identified, you can use a pre-execution message processor to manage the discarding of older messages before processing the latest ones.

Enqueue Process
interface MessageRepository {
  Long getLatestEnqueueAt(String messageId);

  void addEnqueueAt(String messageId, Long time);
}

class SimpleMessage {
  private String id;
}

class MessageSender {
  @Autowited
  private MessageRepository messageRepository;
  @Autowired
  private RqueueMessageEnqueuer rqueueMessageEnqueuer;

  public void sendMessage(SimpleMessage message) {
    String id = message.getId();
    //TODO handle error case
    messageRepository.addEnqueueAt(id, System.currentTimeMillis());
    rqueueMessageEnqueuer.enqueueIn("simple-queue", message, Duration.ofMinutes(10));
  }
}

UniqueMessageProcessor that implements MessageProcessor and returns false for the older messages.

class UniqueMessageProcessor implements MessageProcessor {

  @Autowired
  private MessageRepository messageRepository;

  @Override
  boolean process(Object message, RqueueMessage rqueueMessage) {
    if (message instanceof SimpleMessage) {
      // here you can get id using composite fields, add a method to find the unique id
      String messageId = ((SimpleMessage) message).getId();
      Long latestEnqueueTime = messageRepository.getLatestEnqueueAt(messageId);
      // no entry return true
      // no new message return true
      return latestEnqueueTime == null || latestEnqueueTime <= rqueueMessage.getQueuedTime();
    }
    return true;
  }
}

Rqueue configuration, that uses preExecutionMessageProcessor to skip messages.

class RqueueConfiguration {

  @Bean
  public MessageProcessor preExecutionMessageProcessor() {
    return new UniqueMessageProcessor();
  }

  @Bean
  public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() {
    SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory();
    MessageProcessor preExecutionMessageProcessor = preExecutionMessageProcessor();
    factory.setPreExecutionMessageProcessor(preExecutionMessageProcessor);
    return factory;
  }
}

If your use case requires that older message should be executed while new one should be ignored than you can also implement that using pre execution message processor.

interface MessageRepository {

  Long getEnqueueAt(String messageId);

  void saveEnqueueAt(String messageId, Long time);
}

class MessageSender {

  @Autowired
  private RqueueMessageEnqueuer rqueueMessageEnqueuer;

  public void sendMessage(SimpleMessage message) {
    rqueueMessageEnqueuer.enqueueIn("simple-queue", message, Duration.ofMinutes(10));
  }
}
class UniqueMessageProcessor implements MessageProcessor {

  @Autowired
  private MessageRepository messageRepository;

  @Override
  boolean process(Object message, RqueueMessage rqueueMessage) {
    if (message instanceof SimpleMessage) {
      String messageId = ((SimpleMessage) message).getId();
      Long enqueueAt = messageRepository.getEnqueueAt(messageId);
      if (enqueueAt == null) {
        messageRepository.saveEnqueueAt(messageId, System.currentTimeMillis());
        return true;
      }
      // allow running the same message multiple times
      return enqueueAt == rqueueMessage.getQueuedTime();
    }
    return true;
  }
}
This does not handle the following cases
  • Multiple similar messages enqueue at the same time.
  • Multiple similar messages are trying to run at the same time.
  • Enqueuing new message when the existing one is running.
  • Enqueuing new message when the older message was discarded.