Tuesday, 17 February 2015

Simple RabbitMQ Sender and Listener

Assuming RabbitMQ server is running on localhost, and requires user-id and password to connect.
This example  has

  • a sender which sends the message (line) typed in the command line, to a specific exchange and a routing key
  • Reciever, which listens to the message send to a specific queue

The working maven project for this example can be downloaded here.

SENDER


import java.io.IOException;
import java.util.Scanner;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitSender {

 private static final String TEST_EXCHANGE = "test-exchange";
 private static final String TEST_QUEUE = "test-queue";

 public static void main(String args[]) throws Exception {
  Connection connection = null;
  Channel channel = null;
  try (Scanner sc = new Scanner(System.in)) {
   while (true) {
    
    System.out.print("--Type in the message to send--");
    String s = sc.nextLine();
    
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://localhost");
    factory.setUsername("userid");
    factory.setPassword("password");

    System.out.println("-- Creating Connection--");
    connection = factory.newConnection();

    System.out.println("-- Creating channel--");
    channel = connection.createChannel();

    System.out.println("-- Creating Exchange--");
    channel.exchangeDeclare(TEST_EXCHANGE, "topic");

    System.out.println("-- Creating queue--");
    channel.queueDeclare(TEST_QUEUE, false, false, false, null);
    channel.queueBind(TEST_QUEUE, TEST_EXCHANGE, "test.#");

    System.out.println("-- Sending Message--");
    channel.basicPublish(TEST_EXCHANGE, "test.route", null,
      s.getBytes());
    System.out.println("-- Message sent to--" + TEST_EXCHANGE
      + " with routing key:" + "test.route");
   }
  } catch (Exception e) {
   e.printStackTrace();
  } finally {
   if (channel != null)
    channel.close();
   if (connection != null)
    connection.close();
  }
 }
}

RECEIVER
import java.util.Scanner;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class RabbitListener {

 private static final String TEST_EXCHANGE = "test-exchange";
 private static final String TEST_QUEUE = "test-queue";

 public static void main(String args[]) throws Exception {
  Connection connection = null;
  Channel channel = null;
  try {
   ConnectionFactory factory = new ConnectionFactory();
   factory.setUri("amqp://localhost");
   factory.setUsername("userid");
   factory.setPassword("password");

   System.out.println("-- Creating Connection--");
   connection = factory.newConnection();

   System.out.println("-- Creating channel--");
   channel = connection.createChannel();

   QueueingConsumer consumer = new QueueingConsumer(channel);

   channel.basicConsume(TEST_QUEUE, true, consumer);

   System.out.println("-- Waiting for message--");
   Delivery delivery = null;
   while ((delivery = consumer.nextDelivery()) != null) {
    System.out.println("--Message received --"
      + new String(delivery.getBody()));
   }

  } catch (Exception e) {
   e.printStackTrace();
  } finally {
   if (channel != null)
    channel.close();
   if (connection != null)
    connection.close();
  }
 }
}

Monday, 16 February 2015

ACID vs CAP

ACID - set of properties that define the quality of database transactions.



CAP - Consistency, Availability, Partition Tolerance

3 basic properties or requirements for distributed System. Assuming a Distributed Database system, where data is distributed/partitioned across nodes/servers


All 3 properties is hard to achieve. So at least a combination 2 properties from the above is acceptable.





Saturday, 14 February 2015

Setup ELK on Linux (Elasticsearch 1.4.2 /Logstash 1.4.2/Kibana 3.1.2)

Below are instructions to setup ELK stack, in 8 simple steps.

1. Install JDK Httpd
2. Download and extract necessary components
3. Configure and start httpd and elasticsearch servers
3. Verify httpd,elasticsearch
4. Setup Kibana on HTTPD path.
5. Test Kibana and get it working with few changes to elasticsearch.
6. Add logstash configuration
7. Run logstash to push to Elasticsearch.
8. Advanced Logstash configurations to parse access_log.




Install JDK and Httpd

Make sure appropriate yum repo's are updated.

yum install java-1.7.0-openjdk
yum install httpd

Disable Firewall 
service iptables stop


Downloads:




Copy the files to a linux machine to /root folder

ElasticSearch: unzip elasticsearch-1.4.2.zip
Kibana: tar -zxvf kibana-3.1.2.tar
Logstash: tar -zxvf logstash-1.4.2.tar
Head Plugin: elasticsearch-1.4.2/bin/plugin --url file:///root/elasticsearch-head-master.zip --install mobz/elasticsearch-head

Configure Elasticsaerch

vi /root/elasticsearch-1.4.2/config/elasticsearch.yml
uncomment cluster-name and give a name. don't use the default

################################### Cluster ###################################

# Cluster name identifies your cluster for auto-discovery. If you're running
# multiple clusters on the same network, make sure you're using unique names.
#

cluster.name: vidhya-elk



Start Servers

service httpd restart


Verify the server Installation

Httpd: http://<IP/hostname>




Start Elasticsearch
/root/elasticsearch-1.4.2/bin/elasticsearch



Verify Elasticsearch :   http://<ip/hostname>:9200/

Verify Elasticsearch head :  http://<ip/hostname>:9200/_plugin/head



Kibana Setup


mkdir /var/www/kibana3
cp -r /root/kibana-3.1.2/*   /var/www/kibana3/

vi /etc/httpd/conf/httpd.conf

alias /kibana /var/www/kibana3
<Directory /var/www/kibana3>
  AllowOverride All
  Require all granted
</Directory>

Verify Kibana : 





To fix this error, changes are required in elasticsearch.yml, by adding the below mentioned line at the end of the file.

vi /root/elasticsearch-1.4.2/config/elasticsearch.yml

http.cors.enabled: true

Restart elasticsearch





Logstash Setup

Create a configuration file:
vi  /root/logstash-1.4.2/conf/es.conf
input { stdin { }}
output {
        stdout { }
        elasticsearch {
                bind_host => "127.0.0.1"
                protocol => http
        }
}

The above configuration takes any standard input and publishes to elasticsearch as well as prints it on the command line.

Verify Logstash

/root/logstash-1.4.2/bin/logstash agent -f /root/logstash-1.4.2/conf/es.conf --configtest
-- This verifies the configuration file
./logstash-1.4.2/bin/logstash agent -f logstash-1.4.2/conf/es.conf
-- This pushes whatever is typed on the command-line to elasticsearch, you can see indexes getting created using the elasticsearch head plugin.
Advanced Logstash configuration


1. Parse the access_log and publish to elasticsearch for log analysis 

vi  /root/logstash-1.4.2/conf/access_log.conf
input {
  file {
    path => "/var/log/httpd/access_log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
    type => "apache-access"
  }
}

output {
        stdout { }
        elasticsearch {
              bind_host => "127.0.0.1"
              protocol => http
        }
}
2. Parse the access_log and publish to elasticsearch for log analysis, custom grok filters 

vi  /root/logstash-1.4.2/conf/access_grok_log.conf

input {
  file {
    path => "/root/log/access_log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
    type => "apache-access"
  }
}
filter {
  if ([message] =~ "^::") {
      drop {}
  }
  grok {
    match => ["%{COMBINEDAPACHELOG}"]
  }
  date {
    match => [ "timestamp" ,"dd/MMM/yyyy:HH:mm:ss Z"]
  }
}

output {
        stdout { }
        elasticsearch {
                bind_host => "127.0.0.1"
                protocol => http
        }
}