Tuesday, 17 February 2015

Simple RabbitMQ Sender and Listener

Assuming RabbitMQ server is running on localhost, and requires user-id and password to connect.
This example  has

  • a sender which sends the message (line) typed in the command line, to a specific exchange and a routing key
  • Reciever, which listens to the message send to a specific queue

The working maven project for this example can be downloaded here.

SENDER


import java.io.IOException;
import java.util.Scanner;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitSender {

 private static final String TEST_EXCHANGE = "test-exchange";
 private static final String TEST_QUEUE = "test-queue";

 public static void main(String args[]) throws Exception {
  Connection connection = null;
  Channel channel = null;
  try (Scanner sc = new Scanner(System.in)) {
   while (true) {
    
    System.out.print("--Type in the message to send--");
    String s = sc.nextLine();
    
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://localhost");
    factory.setUsername("userid");
    factory.setPassword("password");

    System.out.println("-- Creating Connection--");
    connection = factory.newConnection();

    System.out.println("-- Creating channel--");
    channel = connection.createChannel();

    System.out.println("-- Creating Exchange--");
    channel.exchangeDeclare(TEST_EXCHANGE, "topic");

    System.out.println("-- Creating queue--");
    channel.queueDeclare(TEST_QUEUE, false, false, false, null);
    channel.queueBind(TEST_QUEUE, TEST_EXCHANGE, "test.#");

    System.out.println("-- Sending Message--");
    channel.basicPublish(TEST_EXCHANGE, "test.route", null,
      s.getBytes());
    System.out.println("-- Message sent to--" + TEST_EXCHANGE
      + " with routing key:" + "test.route");
   }
  } catch (Exception e) {
   e.printStackTrace();
  } finally {
   if (channel != null)
    channel.close();
   if (connection != null)
    connection.close();
  }
 }
}

RECEIVER
import java.util.Scanner;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class RabbitListener {

 private static final String TEST_EXCHANGE = "test-exchange";
 private static final String TEST_QUEUE = "test-queue";

 public static void main(String args[]) throws Exception {
  Connection connection = null;
  Channel channel = null;
  try {
   ConnectionFactory factory = new ConnectionFactory();
   factory.setUri("amqp://localhost");
   factory.setUsername("userid");
   factory.setPassword("password");

   System.out.println("-- Creating Connection--");
   connection = factory.newConnection();

   System.out.println("-- Creating channel--");
   channel = connection.createChannel();

   QueueingConsumer consumer = new QueueingConsumer(channel);

   channel.basicConsume(TEST_QUEUE, true, consumer);

   System.out.println("-- Waiting for message--");
   Delivery delivery = null;
   while ((delivery = consumer.nextDelivery()) != null) {
    System.out.println("--Message received --"
      + new String(delivery.getBody()));
   }

  } catch (Exception e) {
   e.printStackTrace();
  } finally {
   if (channel != null)
    channel.close();
   if (connection != null)
    connection.close();
  }
 }
}

2 comments:

  1. man can u send the code for Publish/Subscribe i am failing here

    ReplyDelete
  2. sorry mam can u send the code for fanout exchange i am failing there

    ReplyDelete