summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Client.java (renamed from java/client/src/main/java/org/apache/qpidity/impl/Client.java)6
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/ClientSession.java (renamed from java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java)4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java (renamed from java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java)3
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Connection.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/DemoClient.java84
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Session.java8
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java99
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java (renamed from java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java)72
-rw-r--r--java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java45
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Connection.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java26
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyBroker.java91
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyExchange.java132
-rw-r--r--java/common/src/main/java/org/apache/qpidity/api/Message.java10
16 files changed, 462 insertions, 128 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/Client.java b/java/client/src/main/java/org/apache/qpidity/client/Client.java
index 13acff1ea6..8465475282 100644
--- a/java/client/src/main/java/org/apache/qpidity/impl/Client.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Client.java
@@ -1,7 +1,6 @@
-package org.apache.qpidity.impl;
+package org.apache.qpidity.client;
import java.net.URL;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -14,9 +13,6 @@ import org.apache.qpidity.ConnectionDelegate;
import org.apache.qpidity.MinaHandler;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.SessionDelegate;
-import org.apache.qpidity.client.DtxSession;
-import org.apache.qpidity.client.ExceptionListener;
-import org.apache.qpidity.client.Session;
public class Client implements org.apache.qpidity.client.Connection
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
index 627829556c..fcde60fa04 100644
--- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
@@ -1,4 +1,4 @@
-package org.apache.qpidity.impl;
+package org.apache.qpidity.client;
import java.util.HashMap;
import java.util.List;
@@ -9,8 +9,6 @@ import org.apache.qpidity.QpidException;
import org.apache.qpidity.Range;
import org.apache.qpidity.RangeSet;
import org.apache.qpidity.api.Message;
-import org.apache.qpidity.client.ExceptionListener;
-import org.apache.qpidity.client.MessagePartListener;
/**
* Implements a Qpid Sesion.
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
index 70d1019a06..975dcb6d8b 100644
--- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
@@ -1,4 +1,4 @@
-package org.apache.qpidity.impl;
+package org.apache.qpidity.client;
import java.nio.ByteBuffer;
@@ -12,7 +12,6 @@ import org.apache.qpidity.RangeSet;
import org.apache.qpidity.Session;
import org.apache.qpidity.SessionDelegate;
import org.apache.qpidity.Struct;
-import org.apache.qpidity.client.MessagePartListener;
public class ClientSessionDelegate extends SessionDelegate
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Connection.java b/java/client/src/main/java/org/apache/qpidity/client/Connection.java
index 2ea6db8943..9bc17b14a6 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Connection.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Connection.java
@@ -20,6 +20,7 @@ package org.apache.qpidity.client;
import java.net.URL;
+import java.util.UUID;
import org.apache.qpidity.QpidException;
@@ -46,7 +47,7 @@ public interface Connection
* @throws QpidException If the communication layer fails to connect with the broker.
*/
public void connect(URL url) throws QpidException;
-
+
/**
* Close this connection.
*
@@ -83,5 +84,6 @@ public interface Connection
*
* @param exceptionListner The execptionListener
*/
+
public void setExceptionListener(ExceptionListener exceptionListner);
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java
new file mode 100644
index 0000000000..e46065e0a0
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java
@@ -0,0 +1,84 @@
+package org.apache.qpidity.client;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
+
+public class DemoClient
+{
+ public static MessagePartListenerAdapter createAdapter()
+ {
+ return new MessagePartListenerAdapter(new MessageListener()
+ {
+ public void onMessage(Message m)
+ {
+ System.out.println("\n================== Received Msg ==================");
+ System.out.println("Message Id : " + m.getMessageProperties().getMessageId());
+ System.out.println(m.toString());
+ System.out.println("================== End Msg ==================\n");
+ }
+
+ });
+ }
+
+ public static final void main(String[] args)
+ {
+ Connection conn = Client.createConnection();
+ try{
+ conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+
+ Session ssn = conn.createSession(50000);
+ ssn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(QpidException e)
+ {
+ System.out.println(e);
+ }
+ });
+ ssn.queueDeclare("queue1", null, null);
+ ssn.queueBind("queue1", "amq.direct", "queue1",null);
+ ssn.sync();
+
+ ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
+
+ // queue
+ ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
+ ssn.headers(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123"));
+ ssn.data("this is the data");
+ ssn.endData();
+
+ //reject
+ ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
+ ssn.data("this should be rejected");
+ ssn.headers(new DeliveryProperties().setRoutingKey("stocks"));
+ ssn.endData();
+ ssn.sync();
+
+ // topic subs
+ ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null);
+ ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null);
+ ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null);
+ ssn.sync();
+
+ ssn.queueDeclare("topic1", null, null);
+ ssn.queueBind("topic1", "amq.topic", "stock.*",null);
+ ssn.queueDeclare("topic2", null, null);
+ ssn.queueBind("topic2", "amq.topic", "stock.us.*",null);
+ ssn.queueDeclare("topic3", null, null);
+ ssn.queueBind("topic3", "amq.topic", "stock.us.rh",null);
+ ssn.sync();
+
+ // topic
+ ssn.messageTransfer("amq.topic", (short) 0, (short) 1);
+ ssn.data("Topic message");
+ ssn.headers(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456"));
+ ssn.endData();
+ ssn.sync();
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java
index 84de268232..09595c8d0b 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Session.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java
@@ -20,7 +20,6 @@ package org.apache.qpidity.client;
import java.nio.ByteBuffer;
import java.util.Map;
-import java.util.UUID;
import org.apache.qpidity.Option;
import org.apache.qpidity.RangeSet;
@@ -67,13 +66,6 @@ public interface Session
*/
public void sessionSuspend();
- /**
- * This will resume an existing session
- * <p> Upon resume the session is attached with an underlying channel
- * hence making operation on this session available.
- */
- public void sessionResume(UUID sessionId);
-
//------------------------------------------------------
// Messaging methods
// Producer
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
new file mode 100644
index 0000000000..84f18dcca4
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
@@ -0,0 +1,99 @@
+package org.apache.qpidity.client.util;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+/**
+ * FileMessage provides pull style semantics for
+ * larges messages backed by a disk.
+ * Instead of loading all data into memeory it uses
+ * FileChannel to map regions of the file into memeory
+ * at a time.
+ *
+ * The write methods are not supported.
+ *
+ * From the standpoint of performance it is generally
+ * only worth mapping relatively large files into memory.
+ *
+ * FileMessage msg = new FileMessage(in,delProps,msgProps);
+ * session.messageTransfer(dest,msg,0,0);
+ *
+ * The messageTransfer method will read the file in chunks
+ * and stream it.
+ *
+ */
+public class FileMessage implements Message
+{
+ private MessageProperties _messageProperties;
+ private DeliveryProperties _deliveryProperties;
+ private FileChannel _fileChannel;
+ private int _chunkSize;
+ private long _fileSize;
+ private long _pos = 0;
+
+ public FileMessage(FileInputStream in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException
+ {
+ _messageProperties = messageProperties;
+ _deliveryProperties = deliveryProperties;
+
+ _fileChannel = in.getChannel();
+ _chunkSize = chunkSize;
+ _fileSize = _fileChannel.size();
+
+ if (_fileSize <= _chunkSize)
+ {
+ _chunkSize = (int)_fileSize;
+ }
+ }
+
+ public void appendData(byte[] src)
+ {
+ throw new UnsupportedOperationException("This Message is read only after the initial source");
+ }
+
+ public void appendData(ByteBuffer src)
+ {
+ throw new UnsupportedOperationException("This Message is read only after the initial source");
+ }
+
+ public DeliveryProperties getDeliveryProperties()
+ {
+ return _deliveryProperties;
+ }
+
+ public MessageProperties getMessageProperties()
+ {
+ return _messageProperties;
+ }
+
+ public void readData(byte[] target) throws IOException
+ {
+ int readLen = target.length <= _chunkSize ? target.length : _chunkSize;
+ if (_pos + readLen > _fileSize)
+ {
+ readLen = (int)(_fileSize - _pos);
+ }
+ MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, readLen);
+ _pos += readLen;
+ bb.get(target);
+ }
+
+ public ByteBuffer readData() throws IOException
+ {
+ if (_pos + _chunkSize > _fileSize)
+ {
+ _chunkSize = (int)(_fileSize - _pos);
+ }
+ MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, _chunkSize);
+ _pos += _chunkSize;
+ return bb;
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
index 5d3f6a6e3e..4ff83db939 100644
--- a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
@@ -1,8 +1,8 @@
-package org.apache.qpidity.impl;
+package org.apache.qpidity.client.util;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
-import java.util.List;
import java.util.Queue;
import org.apache.qpidity.DeliveryProperties;
@@ -13,11 +13,12 @@ import org.apache.qpidity.client.MessageListener;
import org.apache.qpidity.client.MessagePartListener;
/**
+ * This is a simple message assembler.
+ * Will call onMessage method of the adaptee
+ * when all message data is read.
*
- * Will call onMessage method as soon as data is avialable
- * The client can then start to process the data while
- * the rest of the data is read.
- *
+ * This is a good convinience utility for handling
+ * small messages
*/
public class MessagePartListenerAdapter implements MessagePartListener
{
@@ -25,7 +26,7 @@ public class MessagePartListenerAdapter implements MessagePartListener
Message _currentMsg;
DeliveryProperties _currentDeliveryProps;
MessageProperties _currentMessageProps;
-
+
public MessagePartListenerAdapter(MessageListener listener)
{
_adaptee = listener;
@@ -37,12 +38,12 @@ public class MessagePartListenerAdapter implements MessagePartListener
ByteBuffer _readBuffer;
private int dataSize;
- public void appendData(byte[] src)
+ public void appendData(byte[] src) throws IOException
{
appendData(ByteBuffer.wrap(src));
}
- public void appendData(ByteBuffer src)
+ public void appendData(ByteBuffer src) throws IOException
{
_data.offer(src);
dataSize += src.remaining();
@@ -61,9 +62,9 @@ public class MessagePartListenerAdapter implements MessagePartListener
// since we provide the message only after completion
// we can assume that when this method is called we have
// received all data.
- public void readData(byte[] target)
+ public void readData(byte[] target) throws IOException
{
- if (_readBuffer == null)
+ if (_data.size() >0 && _readBuffer == null)
{
buildReadBuffer();
}
@@ -71,20 +72,59 @@ public class MessagePartListenerAdapter implements MessagePartListener
_readBuffer.get(target);
}
+ public ByteBuffer readData() throws IOException
+ {
+ if (_data.size() >0 && _readBuffer == null)
+ {
+ buildReadBuffer();
+ }
+
+ return _readBuffer;
+ }
+
private void buildReadBuffer()
{
- _readBuffer = ByteBuffer.allocate(dataSize);
- for(ByteBuffer buf:_data)
+ //optimize for the simple cases
+ if(_data.size() == 1)
{
- _readBuffer.put(buf);
+ _readBuffer = _data.element().duplicate();
+ }
+ else
+ {
+ _readBuffer = ByteBuffer.allocate(dataSize);
+ for(ByteBuffer buf:_data)
+ {
+ _readBuffer.put(buf);
+ }
}
}
+
+ //hack for testing
+ @Override public String toString()
+ {
+ if (_data.size() >0 && _readBuffer == null)
+ {
+ buildReadBuffer();
+ }
+ byte[] b = new byte[_readBuffer.limit()];
+ _readBuffer.get(b);
+ return new String(b);
+ }
};
}
public void addData(ByteBuffer src)
{
- _currentMsg.appendData(src);
+ try
+ {
+ _currentMsg.appendData(src);
+ }
+ catch(IOException e)
+ {
+ // A chance for IO exception
+ // doesn't occur as we are using
+ // a ByteBuffer
+ }
}
public void messageHeaders(Struct... headers)
@@ -103,7 +143,7 @@ public class MessagePartListenerAdapter implements MessagePartListener
}
public void messageReceived()
- {
+ {
_adaptee.onMessage(_currentMsg);
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java
deleted file mode 100644
index d325054bee..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.qpidity.impl;
-
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.client.ExceptionListener;
-import org.apache.qpidity.client.Session;
-import org.apache.qpidity.client.Connection;
-
-public class DemoClient
-{
-
- public static final void main(String[] args)
- {
- Connection conn = Client.createConnection();
- try{
- conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
- }catch(Exception e){
- e.printStackTrace();
- }
-
- Session ssn = conn.createSession(50000);
- ssn.setExceptionListener(new ExceptionListener()
- {
- public void onException(QpidException e)
- {
- System.out.println(e);
- }
- });
- ssn.queueDeclare("Queue1", null, null);
- ssn.sync();
-
- ssn.messageTransfer("Queue1", (short) 0, (short) 1);
- ssn.headers(new DeliveryProperties(),
- new MessageProperties());
- ssn.data("this is the data");
- ssn.endData();
-
- ssn.messageTransfer("Queue2", (short) 0, (short) 1);
- ssn.data("this should be rejected");
- ssn.endData();
- ssn.sync();
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index 3869c2a793..c071280b37 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -18,13 +18,13 @@
package org.apache.qpidity.jms;
import org.apache.qpidity.jms.message.QpidMessage;
-import org.apache.qpidity.impl.MessagePartListenerAdapter;
import org.apache.qpidity.RangeSet;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.Option;
import org.apache.qpidity.filter.MessageFilter;
import org.apache.qpidity.filter.JMSSelectorFilter;
import org.apache.qpidity.client.MessagePartListener;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
import org.apache.qpidity.exchange.ExchangeDefaults;
import javax.jms.*;
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
index 8d707e4ccc..41c8bc02c6 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
@@ -26,9 +26,9 @@ import javax.jms.Queue;
import javax.jms.QueueBrowser;
import org.apache.qpidity.client.MessagePartListener;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
import org.apache.qpidity.filter.JMSSelectorFilter;
import org.apache.qpidity.filter.MessageFilter;
-import org.apache.qpidity.impl.MessagePartListenerAdapter;
/**
* Implementation of the JMS QueueBrowser interface
diff --git a/java/common/src/main/java/org/apache/qpidity/Connection.java b/java/common/src/main/java/org/apache/qpidity/Connection.java
index c387a38b17..b70c8fae18 100644
--- a/java/common/src/main/java/org/apache/qpidity/Connection.java
+++ b/java/common/src/main/java/org/apache/qpidity/Connection.java
@@ -111,6 +111,8 @@ public class Connection implements ProtocolActions
}
// not sure if this is the right place
+ System.out.println("\n--------------------Broker Start Connection Negotiation -----------------------\n");
+
getChannel(0).connectionStart(header.getMajor(), header.getMinor(), null, "PLAIN", "utf8");
}
diff --git a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
index 537a7ef586..ff89567cee 100644
--- a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
@@ -46,8 +46,8 @@ import javax.security.sasl.SaslServer;
*/
public abstract class ConnectionDelegate extends Delegate<Channel>
{
- private String _username;
- private String _password;
+ private String _username = "guest";
+ private String _password = "guest";;
private String _mechanism;
private String _virtualHost;
private SaslClient saslClient;
@@ -70,6 +70,7 @@ public abstract class ConnectionDelegate extends Delegate<Channel>
//-----------------------------------------------
@Override public void connectionStart(Channel context, ConnectionStart struct)
{
+ System.out.println("\n--------------------Client Start Connection Negotiation -----------------------\n");
System.out.println("The broker has sent connection-start");
String mechanism = null;
@@ -132,15 +133,19 @@ public abstract class ConnectionDelegate extends Delegate<Channel>
String knownHosts = struct.getKnownHosts();
System.out.println("The broker has opened the connection for use");
System.out.println("The broker supplied the following hosts for failover " + knownHosts);
- _negotiationCompleteLock.lock();
- try
- {
- _negotiationComplete.signalAll();
- }
- finally
+ if(_negotiationCompleteLock != null)
{
- _negotiationCompleteLock.unlock();
+ _negotiationCompleteLock.lock();
+ try
+ {
+ _negotiationComplete.signalAll();
+ }
+ finally
+ {
+ _negotiationCompleteLock.unlock();
+ }
}
+ System.out.println("\n-------------------- Client End Connection Negotiation -----------------------\n");
}
public void connectionRedirect(Channel context, ConnectionRedirect struct)
@@ -240,8 +245,9 @@ public abstract class ConnectionDelegate extends Delegate<Channel>
@Override public void connectionOpen(Channel context, ConnectionOpen struct)
{
String hosts = "amqp:1223243232325";
- System.out.println("The client has sent connection-open-ok");
+ System.out.println("The client has sent connection-open");
context.connectionOpenOk(hosts);
+ System.out.println("\n-------------------- Broker End Connection Negotiation -----------------------\n");
}
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
index 683008fe8a..4949568bbf 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
@@ -20,19 +20,16 @@
*/
package org.apache.qpidity;
-import java.io.IOException;
+import static org.apache.qpidity.Functions.str;
+import java.io.IOException;
import java.nio.ByteBuffer;
-
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
-import static org.apache.qpidity.Functions.*;
-
/**
* ToyBroker
@@ -43,21 +40,28 @@ import static org.apache.qpidity.Functions.*;
class ToyBroker extends SessionDelegate
{
- private Map<String,Queue<Message>> queues;
+ private ToyExchange exchange;
private MessageTransfer xfr = null;
private DeliveryProperties props = null;
private Struct[] headers = null;
private List<Frame> frames = null;
-
- public ToyBroker(Map<String,Queue<Message>> queues)
+ private Map<String,String> consumers = new HashMap<String,String>();
+
+ public ToyBroker(ToyExchange exchange)
{
- this.queues = queues;
+ this.exchange = exchange;
}
@Override public void queueDeclare(Session ssn, QueueDeclare qd)
{
- queues.put(qd.getQueue(), new LinkedList());
- System.out.println("declared queue: " + qd.getQueue());
+ exchange.createQueue(qd.getQueue());
+ System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n");
+ }
+
+ @Override public void queueBind(Session ssn, QueueBind qb)
+ {
+ exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue());
+ System.out.println("\n==================> bound queue: " + qb.getQueue() + " with routing key " + qb.getRoutingKey() + "\n");
}
@Override public void queueQuery(Session ssn, QueueQuery qq)
@@ -65,6 +69,12 @@ class ToyBroker extends SessionDelegate
QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue());
ssn.executionResult(qq.getId(), result);
}
+
+ @Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
+ {
+ consumers.put(ms.getDestination(),ms.getQueue());
+ System.out.println("\n==================> message subscribe : " + ms.getDestination() + "\n");
+ }
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
{
@@ -88,16 +98,7 @@ class ToyBroker extends SessionDelegate
props = (DeliveryProperties) hdr;
}
}
-
- if (props != null && !props.getDiscardUnroutable())
- {
- String dest = xfr.getDestination();
- if (!queues.containsKey(dest))
- {
- reject(ssn);
- }
- }
-
+
this.headers = headers;
}
@@ -115,16 +116,17 @@ class ToyBroker extends SessionDelegate
if (frame.isLastSegment() && frame.isLastFrame())
{
String dest = xfr.getDestination();
- Queue queue = queues.get(dest);
- if (queue == null)
+ Message m = new Message(headers, frames);
+
+ if (exchange.route(dest,props.getRoutingKey(),m))
{
- reject(ssn);
+ System.out.println("queued " + m);
+ dispatchMessages(ssn);
}
else
{
- Message m = new Message(headers, frames);
- queue.offer(m);
- System.out.println("queued " + m);
+
+ reject(ssn);
}
ssn.processed(xfr);
xfr = null;
@@ -145,8 +147,35 @@ class ToyBroker extends SessionDelegate
ssn.messageReject(ranges, 0, "no such destination");
}
}
+
+ private void transferMessage(Session ssn,String dest, Message m)
+ {
+ System.out.println("\n==================> Transfering message to: " +dest + "\n");
+ ssn.messageTransfer(dest, (short)0, (short)0);
+ ssn.headers(m.headers);
+ for (Frame f : m.frames)
+ {
+ for (ByteBuffer b : f)
+ {
+ ssn.data(b);
+ }
+ }
+ ssn.endData();
+ }
+
+ public void dispatchMessages(Session ssn)
+ {
+ for (String dest: consumers.keySet())
+ {
+ Message m = exchange.getQueue(consumers.get(dest)).poll();
+ if(m != null)
+ {
+ transferMessage(ssn,dest,m);
+ }
+ }
+ }
- private class Message
+ class Message
{
private final Struct[] headers;
private final List<Frame> frames;
@@ -188,14 +217,12 @@ class ToyBroker extends SessionDelegate
public static final void main(String[] args) throws IOException
{
- final Map<String,Queue<Message>> queues =
- new HashMap<String,Queue<Message>>();
-
+ final ToyExchange exchange = new ToyExchange();
ConnectionDelegate delegate = new ConnectionDelegate()
{
public SessionDelegate getSessionDelegate()
{
- return new ToyBroker(queues);
+ return new ToyBroker(exchange);
}
};
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
new file mode 100644
index 0000000000..6fabd22462
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
@@ -0,0 +1,132 @@
+package org.apache.qpidity;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.qpidity.ToyBroker.Message;
+
+public class ToyExchange
+{
+ final static String DIRECT = "amq.direct";
+ final static String TOPIC = "amq.topic";
+
+ private Map<String,List<Queue<Message>>> directEx = new HashMap<String,List<Queue<Message>>>();
+ private Map<String,List<Queue<Message>>> topicEx = new HashMap<String,List<Queue<Message>>>();
+ private Map<String,Queue<Message>> queues = new HashMap<String,Queue<Message>>();
+
+ public void createQueue(String name)
+ {
+ queues.put(name, new LinkedList<Message>());
+ }
+
+ public Queue<Message> getQueue(String name)
+ {
+ return queues.get(name);
+ }
+
+ public void bindQueue(String type,String binding,String queueName)
+ {
+ Queue<Message> queue = queues.get(queueName);
+ binding = normalizeKey(binding);
+ if(DIRECT.equals(type))
+ {
+
+ if (directEx.containsKey(binding))
+ {
+ List<Queue<Message>> list = directEx.get(binding);
+ list.add(queue);
+ }
+ else
+ {
+ List<Queue<Message>> list = new LinkedList<Queue<Message>>();
+ list.add(queue);
+ directEx.put(binding,list);
+ }
+ }
+ else
+ {
+ if (topicEx.containsKey(binding))
+ {
+ List<Queue<Message>> list = topicEx.get(binding);
+ list.add(queue);
+ }
+ else
+ {
+ List<Queue<Message>> list = new LinkedList<Queue<Message>>();
+ list.add(queue);
+ topicEx.put(binding,list);
+ }
+ }
+ }
+
+ public boolean route(String dest,String routingKey,Message msg)
+ {
+ List<Queue<Message>> queues;
+ if(DIRECT.equals(dest))
+ {
+ queues = directEx.get(routingKey);
+ }
+ else
+ {
+ queues = matchWildCard(routingKey);
+ }
+ if(queues != null && queues.size()>0)
+ {
+ System.out.println("Message stored in " + queues.size() + " queues");
+ storeMessage(msg,queues);
+ return true;
+ }
+ else
+ {
+ System.out.println("Message unroutable " + msg);
+ return false;
+ }
+ }
+
+ private String normalizeKey(String routingKey)
+ {
+ if(routingKey.indexOf(".*")>1)
+ {
+ return routingKey.substring(0,routingKey.indexOf(".*"));
+ }
+ else
+ {
+ return routingKey;
+ }
+ }
+
+ private List<Queue<Message>> matchWildCard(String routingKey)
+ {
+ List<Queue<Message>> selected = new ArrayList<Queue<Message>>();
+
+ for(String key: topicEx.keySet())
+ {
+ Pattern p = Pattern.compile(key);
+ Matcher m = p.matcher(routingKey);
+ if (m.find())
+ {
+ for(Queue<Message> queue : topicEx.get(key))
+ {
+ selected.add(queue);
+ }
+ }
+ }
+
+ return selected;
+ }
+
+ private void storeMessage(Message msg,List<Queue<Message>> selected)
+ {
+ for(Queue<Message> queue : selected)
+ {
+ queue.offer(msg);
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/api/Message.java b/java/common/src/main/java/org/apache/qpidity/api/Message.java
index ccad3577f0..4e4a070fb4 100644
--- a/java/common/src/main/java/org/apache/qpidity/api/Message.java
+++ b/java/common/src/main/java/org/apache/qpidity/api/Message.java
@@ -1,5 +1,6 @@
package org.apache.qpidity.api;
+import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.qpidity.MessageProperties;
@@ -43,16 +44,16 @@ public interface Message
* </ul>
* @param src
*/
- public void appendData(byte[] src);
+ public void appendData(byte[] src) throws IOException;
- public void appendData(ByteBuffer src);
+ public void appendData(ByteBuffer src) throws IOException;
/**
* This will abstract the underlying message data.
* The Message implementation may not hold all message
* data in memory (especially in the case of large messages)
*
- * The read function might copy data from a
+ * The read function might copy data from
* <ul>
* <li> From memory (Ex: ByteBuffer)
* <li> From Disk
@@ -60,7 +61,8 @@ public interface Message
* </ul>
* @param target
*/
- public void readData(byte[] target);
+ public void readData(byte[] target) throws IOException;
+ public ByteBuffer readData() throws IOException;
}