Simple Message Queue using Redis


In this posts we will use Redis as a simple message queue, using list commands.

Let’s say we have an application that allows the users to upload a photo. Then in the application we display the photo in different sizes like Thumb, Medium and Large.

In a first implementation we could have the task of processing the uploaded image in the same request. As it is an expensive task, it would make our requests slow.

A possible solution for this it to make this processing asynchronous by using a Message Queue(MQ), there are a lot of well known MQs like ActiveMQ, RabbitMQ, IBM MQ and other. In the examples below we will be using Redis as a message queue using the LIST structure.

The idea is to have a LIST where the producer will put the messages to be processed and some consumers that will watch the LIST to process the messages sent.

Basically, the producers will add the messages into the end of the list with “RPUSH queue message” and the consumers will read the messages in the beginning of the list with “LPOP queue” configuring a FIFO processing.

The client will always be looking for a new message, for this purpose we will use the BLPOP command that is a blocking version of the LPOP command. Basically there will be a while loop calling BLPOP to get a new message to process.

Considering the image upload example, let’s say we have a class ImageUploader that is responsible for uploading the image to the server, it will add a new message in the queue, indicating there is an image to be processed, a message could be a JSON string like this:

{“imagePath”:”/path/to/image”, “user”:”userid”}

The ImageUploder class could be like this:


public class ImageUploader {

  public void uploadImage(HttpServletRequest request){
    
    String imagePath = saveImage(request);
    String jsonPayload = createJsonPayload(request, imagePath);
    jedis.rpush("queue", jsonPayload);
    //... keep with the processing
  }
  
  //.... other methods in the class
}

It is just an example on how the producer would work. In this case we have decoupled the image processing from the ImageUploader class. We just create a new message in the queue, so the consumers would process them.

The message consumer could be something like this:

package br.com.xicojunior.redistest;

import java.util.List;

import redis.clients.jedis.Jedis;

public class MessageConsumer 
{
    public static void main( String[] args )
    {
        Jedis jedis = new Jedis("localhost");   
        List<String> messages = null;
        while(true){
          System.out.println("Waiting for a message in the queue");
          messages = jedis.blpop(0,"queue");
          System.out.println("Got the message");
          System.out.println("KEY:" + messages.get(0) + " VALUE:" + messages.get(1));
          String payload = messages.get(1);
          //Do some processing with the payload
          System.out.println("Message received:" + payload);
        }
        
        
    }
}

This consumer code could be running in a different process or even a different machine. This consumers code is runnable we can compile it and run it using eclipse or java command.

For this code we used the jedis.blpop method, it returns a List with 2 Strings, (0) – The key, (1) – The value that was returned. The method also receives an integer, it indicates the timeout. We passed 0 indicating there will be no timeout.

When we run this code and there is no value in the list, in the console we will see just the message

"Waiting for a message in the queue". 

Then if a client adds a element in the list “queue” our consumer class will get its value. We can simulate a test by using the redis-cli or even another class that will add elements in the queue like the one below:


package br.com.xicojunior.redistest;

import redis.clients.jedis.Jedis;

public class MessageProducer {

  public static void main(String[] args) {
    Jedis jedis = new Jedis("localhost");
    
    jedis.rpush("queue", "Value 1");
    jedis.rpush("queue", "Value 2");
    jedis.rpush("queue", "Value 3");
    
    
  }
  
}

If we run the MessageProducer class after the MessageConsumer class is already running we will see this output in the console:


Waiting for a message in the queue
Got the message
KEY:queue VALUE:Value 1
Message received:Value 1
Waiting for a message in the queue
Got the message
KEY:queue VALUE:Value 2
Message received:Value 2
Waiting for a message in the queue
Got the message
KEY:queue VALUE:Value 3
Message received:Value 3
Waiting for a message in the queue


So, message queue would be another possible use case for Redis. There are some queues built on top of redis like RestMQ, Resque – a Job Queue and other.

I hope you enjoyed it.

See you in the next post.

About these ads

One thought on “Simple Message Queue using Redis

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s