관리용 웹 플러그인의 설치




RabbitMQ의 큐에 대한 관리용 웹을 설치하고자 한다면 다음의 URL에서 6개의 erlang 압축 파일을 받아 RabbitMQ를 다시 기동시키면 웹 관리 콘솔에 접근할 수 있다.


http://www.rabbitmq.com/plugins.html#rabbitmq-management



# rabbitmq-server stop

# rabbitmq-plugins enable mochiweb 
. . . 6개 plugin 설치
# rabbitmq-server start
# rabbitmq-plugins list    // 서버를 다시 구동시키면 다음과 같이 MQ 플러그인이 구동되는 것을 확인할 수 있다.

Activating RabbitMQ plugins ...
*WARNING* Undefined function fdsrv:bind_socket/2
*WARNING* Undefined function fdsrv:start/0
*WARNING* Undefined function fdsrv:stop/0
*WARNING* Undefined function webmachine_resource:start_link/2
6 plugins activated:
* amqp_client-2.4.0
* mochiweb-1.3
* rabbit_management-2.4.0.1
* rabbit_management_agent-2.4.0
* rabbit_mochiweb-2.4.0
* webmachine-1.7.0



관리용 웹 실행


 브라우저를 열고 http://localhost:55672/mgmt 를 입력한 후 guest/guest를 들어가면  관리화면이 표시된다.


블로그 이미지

오픈이지 제로킴

시큐어코딩 교육/컨설팅 전문가 그룹

RabbitMQ에 대해 아주 잘 정리된(간단한 예제와 함께) 사이트 참조하세요.^^

http://aspiringcraftsman.com/series/rabbitmq-for-windows/


Basic Concepts

 

ProducerQueueConsumer_thumb5


Message Routing



Direct Exchanges

  DirectExchange_thumb37


Fanout Exchanges

FanoutExchange_thumb[2]


Topic Exchanges


TopicExchange_thumb[2]


Headers Exchanges


HeadersExchange_thumb[2]

블로그 이미지

오픈이지 제로킴

시큐어코딩 교육/컨설팅 전문가 그룹

출처: http://www.rabbitmq.com/api-guide.html

Java Client API Guide

This page gives an overview of the RabbitMQ Java client API.

The code samples given here demonstrate connecting to AMQP brokers and using RPC services exposed via AMQP.

For more details, please see the relevant Javadoc documentation.

The client API is closely modelled on the AMQP protocol specification, with additional abstractions for ease of use.

Protocol class overview

The holder class AMQP stores all the code generated automatically from the AMQP XML protocol definition specification. It contains all required content-class-specific content header definitions (such asAMQP.BasicProperties) and all the method request and response descriptors (such asAMQP.Basic.Publish and AMQP.Queue.BindOk), as well as useful protocol-specific constants and other values.

import com.rabbitmq.client.AMQP;

The method request and response descriptors (and the BasicProperties class) come with Builder classes (following the Builder Pattern) to make constructing protocol objects easier and to allow us to construct them with immutable state.

We illustrate the builder classes by constructing some AMQP.BasicProperties objects with its Builderclass:

AMQP.BasicProperties.Builder bob = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties minBasic = bob.build();
AMQP.BasicProperties minPersistentBasic = bob.deliveryMode(2).build();
AMQP.BasicProperties persistentBasic
                    = bob.priority(0).contentType("application/octet-stream").build();
AMQP.BasicProperties persistentTextPlain = bob.contentType("text/plain").build();

bob (the builder) is constructed first and whenever build() is invoked this method returns a newBasicProperties object with the properties set in bob at that point. The parameter calls (deliveryMode,priority, etc.) update bob and not the BasicProperties object. Each of these methods returns the updated builder (allowing chaining, for example as in persistentBasic above). This pattern allows the parameters to be named and set in any order, the consistency of the complete set of parameters to be checked at build() time, the built objects to be immutable, and partially initialised builders to be re-used.

For details and exact definitions of the AMQP protocol, please see the AMQP specification document; for details of the API (including the Builder classes) see the Javadoc documentation; and for background on the Builder Pattern see this Dr. Dobb’s article or refer to Effective Java by Joshua Bloch.

Connections and channels

The core API classes are Connection and Channel, representing an AMQP connection and an AMQP data channel, respectively:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

Connecting to a broker

The following code connects to an AMQP broker using the given parameters (host name, port number, etc):

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();

All of these parameters have sensible defaults for a RabbitMQ server running locally.

Alternatively, AMQP URIs may be used:

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();

All of these parameters have sensible defaults for a stock RabbitMQ server running locally.

The Connection interface can then be used to open a channel:

Channel channel = conn.createChannel();

The channel can now be used to send and receive messages, as described in subsequent sections.

To disconnect, simply close the channel and the connection:

channel.close();
conn.close();

Note that closing the channel may be considered good practice, but isn’t strictly necessary here - it will be done automatically anyway when the underlying connection is closed.

Advanced Connection options

Consumer thread pool

Consumer threads (see Receiving below) are automatically allocated in a new ExecutorService thread pool by default. If greater control is required supply an ExecutorService on the newConnection() method, so that this pool of threads is used instead. Here is an example where a larger thread pool is supplied than is normally allocated:

ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);
Both Executors and ExecutorService classes are in the java.util.concurrent package.

When the connection is closed a default ExecutorService will be shutdown(), but a user-suppliedExecutorService (like es above) will not be shutdown(). Clients that supply a custom ExecutorServicemust ensure it is shutdown eventually (by calling its shutdown() method), or else the pool’s threads may prevent JVM termination.

The same executor service may be shared between multiple connections, or serially re-used on re-connection but it cannot be used after it is shutdown().

Use of this feature should only be considered if there is evidence that there is a severe bottleneck in the processing of Consumer callbacks. If there are no Consumer callbacks executed, or very few, the default allocation is more than sufficient. The overhead is initially minimal and the total thread resources allocated are bounded, even if a burst of consumer activity may occasionally occur.

Address array

It is possible to pass an Address array to newConnection(). An Address is simply a convenience class (in the com.rabbitmq.client package) with host and port components. For example:

Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
                                 , new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);
will attempt to connect to hostname1:portnumber1, and if that fails to hostname2:portnumber2. The connection returned is the first in the array that succeeds (without throwing IOException). This is entirely equivalent to repeatedly setting host and port on a factory, calling factory.newConnection() each time, until one of them succeeds.

If an ExecutorService is provided as well (using the form factory.newConnection(es, addrArr)) the thread pool is associated with the (first) successful connection.

Using exchanges and queues

Client applications work with exchanges and queues, the high-level building blocks of AMQP. These must be "declared" before they can be used. Declaring either type of object simply ensures that one of that name exists, creating it if necessary.

Continuing the previous example, the following code declares an exchange and a queue, then binds them together.

channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

This will actively declare the following objects, both of which can be customised by using additional parameters. Here neither of them have any special arguments.

  1. a durable, non-autodelete exchange of "direct" type
  2. a non-durable, exclusive, autodelete queue with a generated name

The above function calls then bind the queue to the exchange with the given routing key.

Note that this would be a typical way to declare a queue when only one client wants to work with it: it doesn’t need a well-known name, no other client can use it (exclusive) and will be cleaned up automatically (autodelete). If several clients want to share a queue with a well-known name, this code would be appropriate:

channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

This will actively declare:

  1. a durable, non-autodelete exchange of "direct" type
  2. a durable, non-exclusive, non-autodelete queue with a well-known name

Note that all of these Channel API methods are overloaded. These convenient short forms ofexchangeDeclarequeueDeclare and queueBind use sensible defaults. There are also longer forms with more parameters, to let you override these defaults as necessary, giving full control where needed.

This "short form, long form" pattern is used throughout the client API uses.

Publishing messages

To publish a message to an exchange, use Channel.basicPublish as follows:

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

For fine control, you can use overloaded variants to specify the mandatory flag, or send messages with pre-set message properties:

channel.basicPublish(exchangeName, routingKey, mandatory,
                     MessageProperties.PERSISTENT_TEXT_PLAIN,
                     messageBodyBytes);

This sends a message with delivery mode 2 (persistent), priority 0 and content-type "text/plain". You can build your own message properties object, using a Builder class mentioning as many properties as you like, for example:

channel.basicPublish(exchangeName, routingKey,
                     new AMQP.BasicProperties.Builder()
                       .contentType("text/plain").deliveryMode(2)
                       .priority(1).userId("bob")
                       .build()),
                     messageBodyBytes);

We have not illustrated all the possibilities here.

Note that BasicProperties is an inner class of the autogenerated holder class AMQP.

Channel thread-safety

Channel instances are safe for use by multiple threads. Requests into a Channel are serialized, with only one thread being able to run a command on the Channel at a time. Even so, applications should prefer using aChannel per thread instead of sharing the same Channel across multiple threads.

Receiving messages by subscription

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

The most efficient way to receive messages is to set up a subscription using the Consumer interface. The messages will then be delivered automatically as they arrive, rather than having to be explicitly requested.

When calling the API methods relating to Consumers, individual subscriptions are always referred to by their consumer tags, which can be either client- or server-generated as explained in the AMQP specification document. Distinct Consumers on the same Channel must have distinct consumer tags.

The easiest way to implement a Consumer is to subclass the convenience class DefaultConsumer. An object of this subclass can be passed on a basicConsume call to set up the subscription:

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.contentType;
             long deliveryTag = envelope.getDeliveryTag();
             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
         }
     });

Here, since we specified autoAck = false, it is necessary to acknowledge messages delivered to theConsumer, most conveniently done in the handleDelivery method, as illustrated.

More sophisticated Consumers will need to override further methods. In particular, handleShutdownSignal is called when channels and connections close, and handleConsumeOk is passed the consumer tag before any other callbacks to that Consumer are called.

Consumers can also implement the handleCancelOk and handleCancel methods to be notified of explicit and implicit cancellations, respectively.

You can explicitly cancel a particular Consumer with Channel.basicCancel:

channel.basicCancel(consumerTag);

passing the consumer tag.

Callbacks to Consumers are dispatched on a thread separate from the thread managed by the Connection. This means that Consumers can safely call blocking methods on the Connection or Channel, such asqueueDeclaretxCommitbasicCancel or basicPublish.

Each Channel has its own dispatch thread. For the most common use case of one Consumer per Channel, this means Consumers do not hold up other Consumers. If you have multiple Consumers per Channel be aware that a long-running Consumer may hold up dispatch of callbacks to other Consumers on that Channel.

Retrieving individual messages

To explicitly retrieve messages, use Channel.basicGet. The returned value is an instance of GetResponse, from which the header information (properties) and message body can be extracted:

boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
    // No message retrieved.
} else {
    AMQP.BasicProperties props = response.getProps();
    byte[] body = response.getBody();
    long deliveryTag = response.getEnvelope().getDeliveryTag();
    ...

and since the autoAck = false above, you must also call Channel.basicAck to acknowledge that you have successfully received the message:

    ...
    channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message
}

Handling unroutable messages

If a message is published with the "mandatory" flags set, but cannot be routed, the broker will return it to the sending client (via a AMQP.Basic.Return command).

To be notified of such returns, clients can implement the ReturnListener interface and callChannel.setReturnListener. If the client has not configured a return listener for a particular channel, then the associated returned messages will be silently dropped.

channel.setReturnListener(new ReturnListener() {
    public void handleBasicReturn(int replyCode,
                                  String replyText,
                                  String exchange,
                                  String routingKey,
                                  AMQP.BasicProperties properties,
                                  byte[] body)
    throws IOException {
        ...
    }
});

A return listener will be called, for example, if the client publishes a message with the "mandatory" flag set to an exchange of "direct" type which is not bound to a queue.

Basic RPC

As a programming convenience, the Java client API offers a class RpcClient which uses a temporary reply queue to provide simple RPC-style communication facilities via AMQP.

The class doesn’t impose any particular format on the RPC arguments and return values. It simply provides a mechanism for sending a message to a given exchange with a particular routing key, and waiting for a response on a reply queue.

import com.rabbitmq.client.RpcClient;

RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);

(The implementation details of how this class uses AMQP are as follows: request messages are sent with thebasic.correlation_id field set to a value unique for this RpcClient instance, and with basic.reply_toset to the name of the reply queue.)

Once you have created an instance of this class, you can use it to send RPC requests by using any of the following methods:

byte[] primitiveCall(byte[] message);
String stringCall(String message)
Map mapCall(Map message)
Map mapCall(Object[] keyValuePairs)

The primitiveCall method transfers raw byte arrays as the request and response bodies. The methodstringCall is a thin convenience wrapper around primitiveCall, treating the message bodies as Stringinstances in the default character encoding.

The mapCall variants are a little more sophisticated: they encode a java.util.Map containing ordinary Java values into an AMQP binary table representation, and decode the response in the same way. (Note that there are some restrictions on what value types can be used here - see the javadoc for details.)

All the marshalling/unmarshalling convenience methods use primitiveCall as a transport mechanism, and just provide a wrapping layer on top of it.

Shutdown Protocol

Overview of the AMQP client shutdown

The AMQP connection and channel share the same general approach to managing network failure, internal failure, and explicit local shutdown.

The AMQP connection and channel have the following lifecycle states:

  • open: the object is ready to use
  • closing: the object has been explicitly notified to shut down locally, has issued a shutdown request to any supporting lower-layer objects, and is waiting for their shutdown procedures to complete
  • closed: the object has received all shutdown-complete notification(s) from any lower-layer objects, and as a consequence has shut itself down

Those objects always end up in the closed state, regardless of the reason that casued the closure, like an application request, an internal client library failure, a remote network request or network failure.

The AMQP connection and channel objects possess the following shutdown-related methods:

  • addShutdownListener(ShutdownListener listener) and removeShutdownListener(ShutdownListener listener), to manage any listeners, which will be fired when the object transitions to closed state. Note that, adding a ShutdownListener to an object that is already closed will fire the listener immediately
  • getCloseReason(), to allow the investigation of what was the reason of the object’s shutdown
  • isOpen(), useful for testing whether the object is in an open state
  • close(int closeCode, String closeMessage), to explictly notify the object to shut down

Simple usage of listeners would look like:

import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;

connection.addShutdownListener(new ShutdownListener() {
    public void shutdownCompleted(ShutdownSignalException cause)
    {
        ...
    }
});

Information about the circumstances of a shutdown

One can retrieve the ShutdownSignalException, which contains all the information available about the close reason, either by explictly calling the getCloseReason() method or by using the cause parameter in the service(ShutdownSignalException cause) method of the ShutdownListener class.

The ShutdownSignalException class provides methods to analyze the reason of the shutdown. By calling the isHardError() method we get information whether it was a connection or a channel error.

public void shutdownCompleted(ShutdownSignalException cause)
{
  if (cause.isHardError())
  {
    Connection conn = (Connection)cause.getReference();
    if (!cause.isInitiatedByApplication())
    {
      Object reason = cause.getReason();
      ...
    }
    ...
  } else {
    Channel ch = (Channel)cause.getReference();
    ...
  }
}

Atomicity and use of the isOpen() method

Use of the isOpen() method of channel and connection objects is not recommended for production code, because the value returned by the method is dependent on the existence of the shutdown cause. The following code illustrates the possibility of race conditions:

public void brokenMethod(Channel channel)
{
    if (channel.isOpen())
    {
        // The following code depends on the channel being in open state.
        // However there is a possibility of the change in the channel state
        // between isOpen() and basicQos(1) call
        ...
        channel.basicQos(1);
    }
}

Instead, we should normally ignore such checking, and simply attempt the action desired. If during the execution of the code the channel of the connection is closed, a ShutdownSignalException will be thrown indicating that the object is in an invalid state. We should also catch for IOException caused either bySocketException, when broker closes the connection unexpectedly, or ShutdownSignalException, when broker initiated clean close.

public void validMethod(Channel channel)
{
    try {
        ...
        channel.basicQos(1);
    } catch (ShutdownSignalException sse) {
        // possibly check if channel was closed
        // by the time we started action and reasons for
        // closing it
        ...
    } catch (IOException ioe) {
        // check why connection was closed
        ...
    } 
}

Bundled examples

The Java client library distribution is shipped with a rabbitmq-client-tests.jar, containing several small example programs for exercising the functionality of the RabbitMQ server. The source code for these examples is in the test/src folder in the source distribution.

The script runjava.{sh,bat} runs Java with the class path correctly configured for the examples, e.g.runjava.sh com.rabbitmq.examples.TestMain runs the TestMain functional tests.

AMQP Protocol Capture/Analysis Tool

There's a very basic, very simple AMQP protocol analyzer in class com.rabbitmq.tools.Tracer. Invoke it with

runjava.sh com.rabbitmq.tools.Tracer listenPort connectHost connectPort

listenPort
port to listen for incoming AMQP connections on - defaults to 5673.
connectHost
hostname to use when making an outbound connection in response to an incoming connection - defaults to localhost.
connectPort
port number to use when making an outbound connection - defaults to 5672.

블로그 이미지

오픈이지 제로킴

시큐어코딩 교육/컨설팅 전문가 그룹

JMS(Java Message Service)란?


 

메시징 시스템은 응용프로그램 간에 비동기적으로 메시지를 교환할 수 있는 방법을 제공해주며 구현을 위해서는 MOM(Message Oriented Middleware)가 필요하다.

MOM은 메시지를 전달할 수 있으며, 메시지 전달을 비동기적으로 처리할 수 있는 특징이 있는 시스템이다.

JMS는 각 회사에서 만든 MOM 시스템의 공통적인 부분을 표준화함으로써 공통된 API를 사용할 수 있게 해준다.

Figure 2


<그림출처: https://today.java.net/pub/a/today/2005/06/03/loose.html>


메시지 시스템의 MOM은 아키텍처를 의미하고 이 표준을 구현한 환경이 필요하다.

자바에서는 메시지 시스템을 구현할 수 있는 환경을 제공하는 API를 JMS(Java Message Service)라고 하고
MOM은 J2EE, 웹로직 등 EJB서버에서 메모리 영역을 제공한다.



 

메시징 서비스의 종류



(1) P2P(Peer to Peer)


    P2P Model 은 Sender가 보낸 메세지를 하나의 Receiver만 받도록 되어 있다.

Sender가 메세지를 보내면 그 내용은 Queue에 저장이된다. JMS Queue는 그 메세지를 저장하고 있다가. Receiver가 그 Queue에 접속이 되면, 그 내용을 Receiver에게 전달한다.
하나의 Queue에는 여러개의 Sender나, 여러개의 Receiver가 붙을 수 있지만, 하나의 메세지는 절대로 하나의 Receiver에만 전달된다.

[출처] JMS API의 이해|작성자 투란도트


<그림출처: http://www.developerworkspace.com/Java/jms/tutorials/>

 


(2) Pub/sub


    Publish/Subscribe Messaging Model은 하나의 메세지를  여러 Recevier가 같이 받을 수 있다.

Sender가 보낸 메세지는 JMS의 Topic이라는 곳에 저장이 되며, 이 Topic을 Listening하고 있는 모든 recevier에게 전달이 된다.


[출처] JMS API의 이해|작성자 투란도트


 

 <그림출처: http://www.developerworkspace.com/Java/jms/tutorials/>

 



응용프로그램의 구성요소


JMS 명세에 따르면 JMS를 이용하는 응용 프로그램은 다음과 같은 요소로 구성되며, 각각의 요소는 API에서 제공하는 객체가 될 수도 있고 어떤 부분은 응용 프로그램일 수 도 있다.

 

(1) 운영도구(Administrative Tool)

       MOM을 이용해서 P2P나 pub/sub 방식으로 메시지를 전달하고 메시지를 전달 받기 위한 API는 JMS에서 제공하지만 MOM 자체를 관리하기 위한 방법은 JMS에서 제공하지 않는다. 그렇기 때문에 MOM을 운영하려면 MOM에서 제공하는 운영도구를 사용해야 한다. MOM을 운영한다는 것은 목적지(destination)를 설정하기 위한 것이고, 목적지는  P2P나 pub/sub 방식으로 메시지를 주고 받을 때 사용하기 위한 저장 공간이라고 생각하면 된다.

 

(2) 커넥션 팩토리(Connection Factories)

       커넥션과 관련된 정보를 통해서 커넥션을 생성할 수 있는 인터페이스를 말한다. 클라이언트는 JNDI를 이용하여 커넥션 팩토리(Connection Factory)를 검색한 후 JMS 커넥션을 만들게 되는데, P2P 방식일 경우에는 javax.jms.QueueConnectionFactory를 사용하며, pub/sub 방식일 경우에는 javax.jms.TopicConnectionFactory 를 사용한다.

 

(3) 목적지(Destinations)

       메시지를 MOM 에게 전송하면 목적지에 메시지가 전달된다.

 

(4) 커넥션(Connection)

       커넥션은 JMS 서비스 제공자와 클라이언트 간의 TCP/IP 커넥션을 의미하며, 세션을 생성하기 위해 사용된다. 

 

(5) 세션(Session)

        세션은 클라이언트와 JMS서버 간에 메시지를 생성하고 소비하기 위한 단일 스레드 환경이다.

P2P방식에서는 javax.jms.QueueSession을 pub/sub방식에서는 javax.jms.TopicSession을 사용한다.

 

(6) 메시지 생산자(Message Producer)

       클라이언트에서 메시지를 JMS서버에 전달하려면 메시지 생산자(Message Producer)을 이용해야 한다.

메시지 생산자는 세션을 통해서 만들어 질 수 있다. P2P 방식일 경우에는 QueueSender를, pub/sub 방식일 경우에는 TopicPublisher를 통해서 생성한다.

 

(7) 메시지 소비자(Message consumer)

      클라이언트는 JMS 서버로부터 메시지를 받아들이기 위해서 메시지 소비자(Message Consumer)를 이용해야 한다. 메시지 소비자는 생산자와 마찬가지로 세션을 통해서 만들어질 수 있다. P2P방식을 경우에는 QueueReceiver를, pub/sub방식일 경우에는 TopicSubscriber를 통해서 생성한다.

 

(8) 메시지(Message)

       메시지는 메시지 생산자와 소비자 간에 교환되는 데이터를 말한다. 메시지는 헤더(header), 특성(property), 몸체(body)로 구성되어 있으며 어떠한 데이터라도 몸체에 담겨 전송할 수 있다.

 



JMS API Programming Model 



JMS 프로바이드 환경을 설정하고나면, JMS 클라이언트를 구축해야 한다.

JMS 어플리케이션은 다음 순서로 처리된다.


1- Access JMS provider

2- Create administered Connection: connection factories and destinations
3- Create Connection
4- Create Session
5- Create Producer and consumer
6- Create Message
7- Send and receive Messages





 

<그림출처: http://www.developerworkspace.com/Java/jms/tutorials/>



(1)  JMS provider 액세스

Context ctx = new InitialContext(); ( if lookup in the current vendor)
Or
Hashtable env = new Hashtable();

String initialContextFactory="com.evermind.server.rmi.RMIInitialContextFactory";
String securityPrincipal="userName";
String credentials="userPassword";
env.put( Context.INITIAL_CONTEXT_FACTORY,initialContextFactory );
env.put( Context.SECURITY_PRINCIPAL, securityPrincipal );
env.put( Context.SECURITY_CREDENTIALS, credentials);
ctx = new InitialContext(env);



(2) Connection factories and Destinations 생성


1. Connection Factory: ConnectionFactory는 JMS의 Connection을 얻어오는 역할을 한다. JMS 서버로의 Connection은 각 JMS 서버 Vendor에 dependent 되기 때문에, 이를 통일하기 위해서 ConnectionFactory라는 Object를 통해서 JMS 서버로의 Connection을 얻어오게 되어 있다. 이 Connection Factory는 JMS Server의 JNDI Tree로 부터 얻어오게된다.

[출처] JMS API의 이해|작성자 투란도트



QueueConnectionFactory queueConnectionFactory =
                (QueueConnectionFactory) ctx.lookup("jms/QueueConnectionFactory");

TopicConnectionFactory topicConnectionFactory =
                (TopicConnectionFactory) ctx.lookup("jms/TopicConnectionFactory");


2. Destination:  Destination이란 메세지를 실제로 보낼곳, 즉 JMS의 Queue 또는 Topic을 이야기 한다.

Queue나 Topic은 JMS 서버에 의해서 관리 되기 때문에, Connection Factory Object와 마찬가지로 JMS 서버의 JNDI로 부터 얻어오게된다.

[출처] JMS API의 이해|작성자 투란도트



Queue myQueue = (Queue) ctx.lookup("jms/MyQueue");
Topic myTopic = (Topic) ctx.lookup("jms/MyTopic"); 



(3) Connection 생성


       Connection은 말그대로, Application에서 JMS Server의 Connection을 나타낸다.

[출처] JMS API의 이해|작성자 투란도트


QueueConnection queueConnection =
            queueConnectionFactory.createQueueConnection(); 

TopicConnection topicConnection =
            topicConnectionFactory.createTopicConnection();
Note: When an application completes, you need to close any connections that you have created. Failure to close a connection can cause resources not to be released by the JMS provider. queueConnection.close(); topicConnection.close();


(4) Session 생성

    세션은 메시지 생성과 소비를 위한 싱글 스레드 컨텍스트이다. 

    Session이라는 개념이 생긴것은 예를 들어, 하나의 JMS서버에 클라이언트가 접속했는데, 하나의 Queue가 아닌 동시에 여러개의 Queue에 통신하고자할때를 위해서 Session이라는 개념이 도입되었다. 쉽게 생각하면 Connection은 JMS server로의 물리적인 연결이라 생각하면 되고, Session은 그 물리적 연결내에 있는 논리적인 연결이라고 생각하면 된다.


[출처] JMS API의 이해|작성자 투란도트


QueueSession queueSession =
                 queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

TopicSession topicSession =
                 topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 


Session에 앞에 인자는 boolean값으로, 해당 JMS Session을 JTA Transaction에 포함시킬지 여부를 나타낸다. 뒤에는 메세지의 대한 Ack 방법을 정의한다. (Transaction에 포함시키는 경우-true일경우, 뒤에 ACK방법에 대한 인자는 무시된다.)

[출처] JMS API의 이해|작성자 투란도트




(5) 메시지 생성자와 소비자 생성 

     JMS Queue와 연결되어 실제 메세지를 보내고 받는 객체를 나타낸다. session 객체로 부터 생성되며, 생성시에 Destination (Queue나 Topic 이름) 을 정의하게 되어 있다.

[출처] JMS API의 이해|작성자 투란도트



Producer:
QueueSender queueSender = queueSession.createSender(myQueue);
TopicPublisher topicPublisher = topicSession.createPublisher(myTopic); 
Consumer:
QueueReceiver queueReceiver = queueSession.createReceiver(myQueue);
TopicSubscriber topicSubscriber = topicSession.createSubscriber(myTopic); 


(6) Message 생성 


    실질적으로 보내지는 메세지를 나타낸다. session 객체로 부터 생성한다.

[출처] JMS API의 이해|작성자 투란도트


JMS API는 다섯가지의 메지시 타입을 정의할 수 있다.

[ TextMesssage, MapMessage, BytesMessage, StreamMessage, ObjectMessage, Message] 


TextMessage message = queueSession.createTextMessage();
message.setText(msg_text); // msg_text is a String 


(7) 메시지 송수신 


Sending: QueueSender 와  TopicPublisher 를 사용하여 메시지를 전송할 수 있다.


queueSender.send(message);
topicPublisher.publish(message); 


Receiving: QueueReceiver 와 TopicSubscriber 을 이용하여 메소드가 호출되고 난 뒤 아무때나 메시지를 수신할 수 있다.

queueConnection.start(); // To consume a message synchronously you can use receive method as following: Message m = queueReceiver.receive(); or Message m = queueReceiver.receive(1000); // time out after a second if (m instanceof TextMessage) { TextMessage message = (TextMessage) m; System.out.println("Reading message: " + message.getText()); } else { // Handle error }


--> To consume a message asynchronously you will use message listener where it is an object that acts as an asynchronous event handler for messages. This object implements the MessageListener interface, which contains one method, onMessage. In the onMessage method, you define the actions to be taken when a message arrives. The message listener will be registered with a specific QueueReceiver or TopicSubscriber by using the setMessageListener method. For example, if you define a class named QueueListener that implements the MessageListener interface, you can register the message listener as follows:
QueueListener queueListener = new queueListener();
queueReceiver.setMessageListener(queueListener);

Important notes:
 1- The setmessageListener method will throw Exception if it is running in Web or EJB Container; it should be used under Client container. So to handle the asynchronous consumption in J2EE 1.3 platform you will need to use Message Driven Bean which is a special kind of message listener. 2- If you call start before you register the message listener, you are likely to miss messages.


References: Java Message Service O'Reilly [Richard Monson-Haefel ,David A. Chappell] EJB 3 Developer Guide [ Michael Sikora ] http://java.sun.com/javaee/5/docs/tutorial/doc/bncdq.html http://download.oracle.com/docs/cd/B14099_19/web.1012/b14012/jms.htm http://mediasrv.ns.ac.yu/extra/java2/tutorials/jms_tutorial/index.html



블로그 이미지

오픈이지 제로킴

시큐어코딩 교육/컨설팅 전문가 그룹

RabbitMQ 튜토리얼 정리

 

http://www.rabbitmq.com/getstarted.html

 

P: Producer(메시지를 삽입하는 프로그램) 

    ex) 웹어플리케이션

C: Consumer(큐에서 메시지를 꺼내는 프로그램)

    ex) 백그라운드에서 동작하는 데몬

X: 메시지를 큐에 전달(Exchange)할 때  규칙

Key: 규칙과 큐를 binding하는 인자값(Routing Key)

 

자료출처: http://www.rabbitmq.com/getstarted.html

            http://ir.bagesoft.com/643

 

 

Hello World


 

C가 1개이므로 작업(메시지 처리)가 분산되지 않아, 실제로 사용할 일은 많지 않다.
다만 로직을 P와 C로 분리하여, 오래 걸리는 작업은 C가 담당하면 사용자 입장에서의 응답속도가 빨라질 것이다.

 


 

 

[Producer]
  
//필요한 클래스를 import한다.
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class Send { 
 // 사용할 큐의 이름을 지정한다.  
  private final static String QUEUE_NAME = "hello"; 
  public static void main(String[] argv) throws Exception {   

    //서버와의 연결을 설정한다.  	          
    ConnectionFactory factory = new ConnectionFactory();    
    factory.setHost("localhost");    
    Connection connection = factory.newConnection();    
    Channel channel = connection.createChannel();     

    // 전송에 사용할 큐를 선언한다.    
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);    
    String message = "Hello World!";    

    // 큐에 메시지를 게재한다.     
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());    
    System.out.println(" [x] Sent '" + message + "'");   

    // 채널과 연결을 닫는다.    
    channel.close();    
    connection.close();  
  }
}    

[Consumer]

  
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {    
     private final static String QUEUE_NAME = "hello";    
     public static void main(String[] argv) throws Exception {    
     
     //서버와의 연결을 설정한다.     
     ConnectionFactory factory = new ConnectionFactory();    
     factory.setHost("localhost");    
     Connection connection = factory.newConnection();    
     Channel channel = connection.createChannel();    

     // 전송에 사용할 큐를 선언한다.    
     channel.queueDeclare(QUEUE_NAME, false, false, false, null);    
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");        
  
     // 큐로부터 메시지를 배달해달라고 등록한다.
     QueueingConsumer consumer = new QueueingConsumer(channel);    
     channel.basicConsume(QUEUE_NAME, true, consumer);        
  
     while (true) {      

        // 큐에 게시된 메시지가 있으면 읽어온다.      
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();      
        String message = new String(delivery.getBody());      
        System.out.println(" [x] Received '" + message + "'");    
      }  
    }
}
 

Work Queues (Round-Robin)


큐에서 순서대로 메시지를 읽어서 C1, C2, ... 순서대로 번갈아 수행한다.  따라서 C1과 C2가 다른 메시지를 받지만 하는 일은 같다.

 


 

[Producer]
  
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) 
                      throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = getMessage(argv);

    channel.basicPublish( "", TASK_QUEUE_NAME, 
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }      
  //...
}
[Consumer] 
  
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException,
                      java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());

      System.out.println(" [x] Received '" + message + "'");   
      doWork(message); 
      System.out.println(" [x] Done" );

      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
  }
  //...
}

 

Publish/Subscribe (FanOut)


X(fanout)에 의해, 여러 큐에 모두 동일한 메시지가 삽입된다. 따라서 C1, C2가 모두 동일한 메시지를 받지만 하는 일은 다르다. Hellow World 예제에서 C가 하던 일을 C1, C2로 분리했다고 생각할 수 도 있다.

 

exchange의 타입에는 direct, topic, headers, fanout이 있다.

fanout타입은 exchange가 알고 있는 모든 큐에 메시지를 전달하는 방식이다.

 

 

 

[Producer]
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
    //...
}

[Consumer]

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogs {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException,
                  java.lang.InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();

       // exchange와 queue을 연결한다.
       channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());

            System.out.println(" [x] Received '" + message + "'");
        }
    }
}

 

Routing (Direct, Exact Matching)


선택적으로 메시지를 보낸다.

 

C가 X(direct)와 큐를 binding할 때, Key를 명시하고, 메시지의 Key와 비교해서 동일한(complete matching)경우만 각 큐에 삽입된다. 

 

Key가 error일 때는 두 큐에 모두 삽입되고,  
Key가 info, warning일 경우에는 두번 째 큐에만 삽입된다.
그 외의 Key는 큐에 삽입되지 않는다. 즉 그 외의 key를 가진 메시지는 버려진다.

 

C1은 중대한 오류(error)에 대해서만 처리하고, C2는 모든 오류(info, error, warning)에 대해서 처리한다.
따라서 C1, C2는 다른 메시지를 받으며, 하는 일도 다르다.

 


 

[Producer]

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }
    //..
}

[Consumer]

public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException,
                  java.lang.InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for(String severity : argv){
        // EXCHANGE_NAME을 가지는 direct exchange에서 아규먼트로 넘어오는 serverity의 값을 
        // routingKey로 가지는 메시지만 queueName을 가지는 큐에 전송된다.
         channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}

 

Topics (Pattern Matching)


Routing 예제와 비슷하지만,  Key를 비교할 때, complete matching이 아니라 pattern matching을 사용하여 유연성과 확장성이 제공된다.

 

Key는 .(마침표)로 구분되어 word(토큰)로 나누어지진다.
각 word에 대하여 *는 한개이상의 임의의 단어를, #는 0개이상의 임의의 단어를 의미한다.

 

<주의>마침표(.)은 단어 구분자의 역할을 하며 X가 비교할때는 포함되지 않는다.

 


[Producer]
 public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 큐의 이름과 타입을 지정
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        // 라우팅 키 설정
        String routingKey = getRouting(argv);
        String message = getMessage(argv);
 
        // MQ에 메시지 전송
       channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

        connection.close();
    }
    //...
} 

 

[Consumer]

 public class ReceiveLogsTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
            System.exit(1);
        }

        for(String bindingKey : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}


 

RPC (Remote Procedure Call)


 RPC(Remote Procedure Call)를 사용하는 경우이며, 이 소스는 Java로 컨버팅하지 않았다.


 


블로그 이미지

오픈이지 제로킴

시큐어코딩 교육/컨설팅 전문가 그룹

시스템 환경


   - CentOS 6.3 32비트 운영체제

   - export LD_LIBRARY_PATH=""

 

EPEL 설치


# rpm -Uvh http://ftp.riken.jp/Linux/fedora/epel/6/i386/epel-release-6-8.noarch.rpm 

 

erlang 설치


# vi /etc/yum.repos.d/epel-erlang.repo


# Place this file in your /etc/yum.repos.d/ directory

[epel-erlang]
name=Erlang/OTP R14B
baseurl=http://ftp.riken.jp/Linux/fedora/epel/6/i386/
enabled=1
skip_if_unavailable=1
gpgcheck=0

[epel-erlang-source]
name=Erlang/OTP R14B - Source
baseurl=http://ftp.riken.jp/Linux/fedora/epel/6/i386/
enabled=0
skip_if_unavailable=1
gpgcheck=0


# yum install erlang


RabbitMQ 설치



# wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.1.0/rabbitmq-server-3.1.0-1.noarch.rpm

# rpm -Uvh rabbitmq-server-3.1.0-1.noarch.rpm


RabbitMQ 서버 시작하기



# service rabbitmq-server start

Starting rabbitmq-server: SUCCESS
rabbitmq-server.


# rabbitmqctl status
Status of node rabbit@localhost ...
[{pid,10955},
 {running_applications,[{rabbit,"RabbitMQ","3.1.0"},
                        {mnesia,"MNESIA  CXC 138 12","4.5"},
                        {os_mon,"CPO  CXC 138 46","2.2.7"},
                        {xmerl,"XML parser","1.2.10"},
                        {sasl,"SASL  CXC 138 11","2.1.10"},
                        {stdlib,"ERTS  CXC 138 10","1.17.5"},
                        {kernel,"ERTS  CXC 138 10","2.14.5"}]},
 {os,{unix,linux}},
 {erlang_version,"Erlang R14B04 (erts-5.8.5) [source] [rq:1] [async-threads:30] [kernel-poll:true]\n"},
 {memory,[{total,14318840},
          {connection_procs,1364},
          {queue_procs,2728},
          {plugins,0},
          {other_proc,4625248},
          {mnesia,28920},
          {mgmt_db,0},
          {msg_index,10916},
          {other_ets,380884},
          {binary,2688},
          {code,7678162},
          {atom,903189},
          {other_system,684741}]},
 {vm_memory_high_watermark,0.4},
 {vm_memory_limit,794248806},
 {disk_free_limit,1000000000},
 {disk_free,10803355648},
 {file_descriptors,[{total_limit,924},
                    {total_used,3},
                    {sockets_limit,829},
                    {sockets_used,1}]},
 {processes,[{limit,1048576},{used,121}]},
 {run_queue,0},
 {uptime,31}]
...done.


# service rabbitmq-server stop

Stopping rabbitmq-server: rabbitmq-server.


설치후기



# yum install erlang 에서 아래 에러 때문에 한시간 삽질

 . . .

  File "/usr/lib/python2.6/site-packages/yum/misc.py", line 1164, in _cElementTree_import
    import cElementTree
ImportError: No module named cElementTree


오라클 proc 설치하면서 LD_LIBRARY_PATH 를 잡아줬는데 그것때문에 걸리네 한시간 삽질끝에

export LD_LIBRARY_PATH="" 로 해결.

러시아어로 된 사이트에서 해결책을 건짐. 근데 왜?




블로그 이미지

오픈이지 제로킴

시큐어코딩 교육/컨설팅 전문가 그룹

티스토리 툴바