diff options
16 files changed, 462 insertions, 128 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/Client.java b/java/client/src/main/java/org/apache/qpidity/client/Client.java index 13acff1ea6..8465475282 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/Client.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Client.java @@ -1,7 +1,6 @@ -package org.apache.qpidity.impl; +package org.apache.qpidity.client; import java.net.URL; -import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -14,9 +13,6 @@ import org.apache.qpidity.ConnectionDelegate; import org.apache.qpidity.MinaHandler; import org.apache.qpidity.QpidException; import org.apache.qpidity.SessionDelegate; -import org.apache.qpidity.client.DtxSession; -import org.apache.qpidity.client.ExceptionListener; -import org.apache.qpidity.client.Session; public class Client implements org.apache.qpidity.client.Connection diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java index 627829556c..fcde60fa04 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java @@ -1,4 +1,4 @@ -package org.apache.qpidity.impl; +package org.apache.qpidity.client; import java.util.HashMap; import java.util.List; @@ -9,8 +9,6 @@ import org.apache.qpidity.QpidException; import org.apache.qpidity.Range; import org.apache.qpidity.RangeSet; import org.apache.qpidity.api.Message; -import org.apache.qpidity.client.ExceptionListener; -import org.apache.qpidity.client.MessagePartListener; /** * Implements a Qpid Sesion. diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java index 70d1019a06..975dcb6d8b 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java +++ b/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java @@ -1,4 +1,4 @@ -package org.apache.qpidity.impl; +package org.apache.qpidity.client; import java.nio.ByteBuffer; @@ -12,7 +12,6 @@ import org.apache.qpidity.RangeSet; import org.apache.qpidity.Session; import org.apache.qpidity.SessionDelegate; import org.apache.qpidity.Struct; -import org.apache.qpidity.client.MessagePartListener; public class ClientSessionDelegate extends SessionDelegate diff --git a/java/client/src/main/java/org/apache/qpidity/client/Connection.java b/java/client/src/main/java/org/apache/qpidity/client/Connection.java index 2ea6db8943..9bc17b14a6 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Connection.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Connection.java @@ -20,6 +20,7 @@ package org.apache.qpidity.client; import java.net.URL; +import java.util.UUID; import org.apache.qpidity.QpidException; @@ -46,7 +47,7 @@ public interface Connection * @throws QpidException If the communication layer fails to connect with the broker. */ public void connect(URL url) throws QpidException; - + /** * Close this connection. * @@ -83,5 +84,6 @@ public interface Connection * * @param exceptionListner The execptionListener */ + public void setExceptionListener(ExceptionListener exceptionListner); } diff --git a/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java new file mode 100644 index 0000000000..e46065e0a0 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java @@ -0,0 +1,84 @@ +package org.apache.qpidity.client; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.api.Message; +import org.apache.qpidity.client.util.MessagePartListenerAdapter; + +public class DemoClient +{ + public static MessagePartListenerAdapter createAdapter() + { + return new MessagePartListenerAdapter(new MessageListener() + { + public void onMessage(Message m) + { + System.out.println("\n================== Received Msg =================="); + System.out.println("Message Id : " + m.getMessageProperties().getMessageId()); + System.out.println(m.toString()); + System.out.println("================== End Msg ==================\n"); + } + + }); + } + + public static final void main(String[] args) + { + Connection conn = Client.createConnection(); + try{ + conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); + }catch(Exception e){ + e.printStackTrace(); + } + + Session ssn = conn.createSession(50000); + ssn.setExceptionListener(new ExceptionListener() + { + public void onException(QpidException e) + { + System.out.println(e); + } + }); + ssn.queueDeclare("queue1", null, null); + ssn.queueBind("queue1", "amq.direct", "queue1",null); + ssn.sync(); + + ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); + + // queue + ssn.messageTransfer("amq.direct", (short) 0, (short) 1); + ssn.headers(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123")); + ssn.data("this is the data"); + ssn.endData(); + + //reject + ssn.messageTransfer("amq.direct", (short) 0, (short) 1); + ssn.data("this should be rejected"); + ssn.headers(new DeliveryProperties().setRoutingKey("stocks")); + ssn.endData(); + ssn.sync(); + + // topic subs + ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null); + ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null); + ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null); + ssn.sync(); + + ssn.queueDeclare("topic1", null, null); + ssn.queueBind("topic1", "amq.topic", "stock.*",null); + ssn.queueDeclare("topic2", null, null); + ssn.queueBind("topic2", "amq.topic", "stock.us.*",null); + ssn.queueDeclare("topic3", null, null); + ssn.queueBind("topic3", "amq.topic", "stock.us.rh",null); + ssn.sync(); + + // topic + ssn.messageTransfer("amq.topic", (short) 0, (short) 1); + ssn.data("Topic message"); + ssn.headers(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456")); + ssn.endData(); + ssn.sync(); + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index 84de268232..09595c8d0b 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -20,7 +20,6 @@ package org.apache.qpidity.client; import java.nio.ByteBuffer;
import java.util.Map;
-import java.util.UUID;
import org.apache.qpidity.Option;
import org.apache.qpidity.RangeSet;
@@ -67,13 +66,6 @@ public interface Session */
public void sessionSuspend();
- /**
- * This will resume an existing session
- * <p> Upon resume the session is attached with an underlying channel
- * hence making operation on this session available.
- */
- public void sessionResume(UUID sessionId);
-
//------------------------------------------------------
// Messaging methods
// Producer
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java new file mode 100644 index 0000000000..84f18dcca4 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java @@ -0,0 +1,99 @@ +package org.apache.qpidity.client.util; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.api.Message; + +/** + * FileMessage provides pull style semantics for + * larges messages backed by a disk. + * Instead of loading all data into memeory it uses + * FileChannel to map regions of the file into memeory + * at a time. + * + * The write methods are not supported. + * + * From the standpoint of performance it is generally + * only worth mapping relatively large files into memory. + * + * FileMessage msg = new FileMessage(in,delProps,msgProps); + * session.messageTransfer(dest,msg,0,0); + * + * The messageTransfer method will read the file in chunks + * and stream it. + * + */ +public class FileMessage implements Message +{ + private MessageProperties _messageProperties; + private DeliveryProperties _deliveryProperties; + private FileChannel _fileChannel; + private int _chunkSize; + private long _fileSize; + private long _pos = 0; + + public FileMessage(FileInputStream in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException + { + _messageProperties = messageProperties; + _deliveryProperties = deliveryProperties; + + _fileChannel = in.getChannel(); + _chunkSize = chunkSize; + _fileSize = _fileChannel.size(); + + if (_fileSize <= _chunkSize) + { + _chunkSize = (int)_fileSize; + } + } + + public void appendData(byte[] src) + { + throw new UnsupportedOperationException("This Message is read only after the initial source"); + } + + public void appendData(ByteBuffer src) + { + throw new UnsupportedOperationException("This Message is read only after the initial source"); + } + + public DeliveryProperties getDeliveryProperties() + { + return _deliveryProperties; + } + + public MessageProperties getMessageProperties() + { + return _messageProperties; + } + + public void readData(byte[] target) throws IOException + { + int readLen = target.length <= _chunkSize ? target.length : _chunkSize; + if (_pos + readLen > _fileSize) + { + readLen = (int)(_fileSize - _pos); + } + MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, readLen); + _pos += readLen; + bb.get(target); + } + + public ByteBuffer readData() throws IOException + { + if (_pos + _chunkSize > _fileSize) + { + _chunkSize = (int)(_fileSize - _pos); + } + MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, _chunkSize); + _pos += _chunkSize; + return bb; + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java index 5d3f6a6e3e..4ff83db939 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java @@ -1,8 +1,8 @@ -package org.apache.qpidity.impl; +package org.apache.qpidity.client.util; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.LinkedList; -import java.util.List; import java.util.Queue; import org.apache.qpidity.DeliveryProperties; @@ -13,11 +13,12 @@ import org.apache.qpidity.client.MessageListener; import org.apache.qpidity.client.MessagePartListener; /** + * This is a simple message assembler. + * Will call onMessage method of the adaptee + * when all message data is read. * - * Will call onMessage method as soon as data is avialable - * The client can then start to process the data while - * the rest of the data is read. - * + * This is a good convinience utility for handling + * small messages */ public class MessagePartListenerAdapter implements MessagePartListener { @@ -25,7 +26,7 @@ public class MessagePartListenerAdapter implements MessagePartListener Message _currentMsg; DeliveryProperties _currentDeliveryProps; MessageProperties _currentMessageProps; - + public MessagePartListenerAdapter(MessageListener listener) { _adaptee = listener; @@ -37,12 +38,12 @@ public class MessagePartListenerAdapter implements MessagePartListener ByteBuffer _readBuffer; private int dataSize; - public void appendData(byte[] src) + public void appendData(byte[] src) throws IOException { appendData(ByteBuffer.wrap(src)); } - public void appendData(ByteBuffer src) + public void appendData(ByteBuffer src) throws IOException { _data.offer(src); dataSize += src.remaining(); @@ -61,9 +62,9 @@ public class MessagePartListenerAdapter implements MessagePartListener // since we provide the message only after completion // we can assume that when this method is called we have // received all data. - public void readData(byte[] target) + public void readData(byte[] target) throws IOException { - if (_readBuffer == null) + if (_data.size() >0 && _readBuffer == null) { buildReadBuffer(); } @@ -71,20 +72,59 @@ public class MessagePartListenerAdapter implements MessagePartListener _readBuffer.get(target); } + public ByteBuffer readData() throws IOException + { + if (_data.size() >0 && _readBuffer == null) + { + buildReadBuffer(); + } + + return _readBuffer; + } + private void buildReadBuffer() { - _readBuffer = ByteBuffer.allocate(dataSize); - for(ByteBuffer buf:_data) + //optimize for the simple cases + if(_data.size() == 1) { - _readBuffer.put(buf); + _readBuffer = _data.element().duplicate(); + } + else + { + _readBuffer = ByteBuffer.allocate(dataSize); + for(ByteBuffer buf:_data) + { + _readBuffer.put(buf); + } } } + + //hack for testing + @Override public String toString() + { + if (_data.size() >0 && _readBuffer == null) + { + buildReadBuffer(); + } + byte[] b = new byte[_readBuffer.limit()]; + _readBuffer.get(b); + return new String(b); + } }; } public void addData(ByteBuffer src) { - _currentMsg.appendData(src); + try + { + _currentMsg.appendData(src); + } + catch(IOException e) + { + // A chance for IO exception + // doesn't occur as we are using + // a ByteBuffer + } } public void messageHeaders(Struct... headers) @@ -103,7 +143,7 @@ public class MessagePartListenerAdapter implements MessagePartListener } public void messageReceived() - { + { _adaptee.onMessage(_currentMsg); } } diff --git a/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java deleted file mode 100644 index d325054bee..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.apache.qpidity.impl; - -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.client.ExceptionListener; -import org.apache.qpidity.client.Session; -import org.apache.qpidity.client.Connection; - -public class DemoClient -{ - - public static final void main(String[] args) - { - Connection conn = Client.createConnection(); - try{ - conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); - }catch(Exception e){ - e.printStackTrace(); - } - - Session ssn = conn.createSession(50000); - ssn.setExceptionListener(new ExceptionListener() - { - public void onException(QpidException e) - { - System.out.println(e); - } - }); - ssn.queueDeclare("Queue1", null, null); - ssn.sync(); - - ssn.messageTransfer("Queue1", (short) 0, (short) 1); - ssn.headers(new DeliveryProperties(), - new MessageProperties()); - ssn.data("this is the data"); - ssn.endData(); - - ssn.messageTransfer("Queue2", (short) 0, (short) 1); - ssn.data("this should be rejected"); - ssn.endData(); - ssn.sync(); - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index 3869c2a793..c071280b37 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -18,13 +18,13 @@ package org.apache.qpidity.jms; import org.apache.qpidity.jms.message.QpidMessage; -import org.apache.qpidity.impl.MessagePartListenerAdapter; import org.apache.qpidity.RangeSet; import org.apache.qpidity.QpidException; import org.apache.qpidity.Option; import org.apache.qpidity.filter.MessageFilter; import org.apache.qpidity.filter.JMSSelectorFilter; import org.apache.qpidity.client.MessagePartListener; +import org.apache.qpidity.client.util.MessagePartListenerAdapter; import org.apache.qpidity.exchange.ExchangeDefaults; import javax.jms.*; diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java index 8d707e4ccc..41c8bc02c6 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java @@ -26,9 +26,9 @@ import javax.jms.Queue; import javax.jms.QueueBrowser; import org.apache.qpidity.client.MessagePartListener; +import org.apache.qpidity.client.util.MessagePartListenerAdapter; import org.apache.qpidity.filter.JMSSelectorFilter; import org.apache.qpidity.filter.MessageFilter; -import org.apache.qpidity.impl.MessagePartListenerAdapter; /** * Implementation of the JMS QueueBrowser interface diff --git a/java/common/src/main/java/org/apache/qpidity/Connection.java b/java/common/src/main/java/org/apache/qpidity/Connection.java index c387a38b17..b70c8fae18 100644 --- a/java/common/src/main/java/org/apache/qpidity/Connection.java +++ b/java/common/src/main/java/org/apache/qpidity/Connection.java @@ -111,6 +111,8 @@ public class Connection implements ProtocolActions } // not sure if this is the right place + System.out.println("\n--------------------Broker Start Connection Negotiation -----------------------\n"); + getChannel(0).connectionStart(header.getMajor(), header.getMinor(), null, "PLAIN", "utf8"); } diff --git a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java index 537a7ef586..ff89567cee 100644 --- a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java @@ -46,8 +46,8 @@ import javax.security.sasl.SaslServer; */ public abstract class ConnectionDelegate extends Delegate<Channel> { - private String _username; - private String _password; + private String _username = "guest"; + private String _password = "guest";; private String _mechanism; private String _virtualHost; private SaslClient saslClient; @@ -70,6 +70,7 @@ public abstract class ConnectionDelegate extends Delegate<Channel> //----------------------------------------------- @Override public void connectionStart(Channel context, ConnectionStart struct) { + System.out.println("\n--------------------Client Start Connection Negotiation -----------------------\n"); System.out.println("The broker has sent connection-start"); String mechanism = null; @@ -132,15 +133,19 @@ public abstract class ConnectionDelegate extends Delegate<Channel> String knownHosts = struct.getKnownHosts(); System.out.println("The broker has opened the connection for use"); System.out.println("The broker supplied the following hosts for failover " + knownHosts); - _negotiationCompleteLock.lock(); - try - { - _negotiationComplete.signalAll(); - } - finally + if(_negotiationCompleteLock != null) { - _negotiationCompleteLock.unlock(); + _negotiationCompleteLock.lock(); + try + { + _negotiationComplete.signalAll(); + } + finally + { + _negotiationCompleteLock.unlock(); + } } + System.out.println("\n-------------------- Client End Connection Negotiation -----------------------\n"); } public void connectionRedirect(Channel context, ConnectionRedirect struct) @@ -240,8 +245,9 @@ public abstract class ConnectionDelegate extends Delegate<Channel> @Override public void connectionOpen(Channel context, ConnectionOpen struct) { String hosts = "amqp:1223243232325"; - System.out.println("The client has sent connection-open-ok"); + System.out.println("The client has sent connection-open"); context.connectionOpenOk(hosts); + System.out.println("\n-------------------- Broker End Connection Negotiation -----------------------\n"); } diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index 683008fe8a..4949568bbf 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -20,19 +20,16 @@ */ package org.apache.qpidity; -import java.io.IOException; +import static org.apache.qpidity.Functions.str; +import java.io.IOException; import java.nio.ByteBuffer; - import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; -import static org.apache.qpidity.Functions.*; - /** * ToyBroker @@ -43,21 +40,28 @@ import static org.apache.qpidity.Functions.*; class ToyBroker extends SessionDelegate { - private Map<String,Queue<Message>> queues; + private ToyExchange exchange; private MessageTransfer xfr = null; private DeliveryProperties props = null; private Struct[] headers = null; private List<Frame> frames = null; - - public ToyBroker(Map<String,Queue<Message>> queues) + private Map<String,String> consumers = new HashMap<String,String>(); + + public ToyBroker(ToyExchange exchange) { - this.queues = queues; + this.exchange = exchange; } @Override public void queueDeclare(Session ssn, QueueDeclare qd) { - queues.put(qd.getQueue(), new LinkedList()); - System.out.println("declared queue: " + qd.getQueue()); + exchange.createQueue(qd.getQueue()); + System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n"); + } + + @Override public void queueBind(Session ssn, QueueBind qb) + { + exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue()); + System.out.println("\n==================> bound queue: " + qb.getQueue() + " with routing key " + qb.getRoutingKey() + "\n"); } @Override public void queueQuery(Session ssn, QueueQuery qq) @@ -65,6 +69,12 @@ class ToyBroker extends SessionDelegate QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue()); ssn.executionResult(qq.getId(), result); } + + @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) + { + consumers.put(ms.getDestination(),ms.getQueue()); + System.out.println("\n==================> message subscribe : " + ms.getDestination() + "\n"); + } @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { @@ -88,16 +98,7 @@ class ToyBroker extends SessionDelegate props = (DeliveryProperties) hdr; } } - - if (props != null && !props.getDiscardUnroutable()) - { - String dest = xfr.getDestination(); - if (!queues.containsKey(dest)) - { - reject(ssn); - } - } - + this.headers = headers; } @@ -115,16 +116,17 @@ class ToyBroker extends SessionDelegate if (frame.isLastSegment() && frame.isLastFrame()) { String dest = xfr.getDestination(); - Queue queue = queues.get(dest); - if (queue == null) + Message m = new Message(headers, frames); + + if (exchange.route(dest,props.getRoutingKey(),m)) { - reject(ssn); + System.out.println("queued " + m); + dispatchMessages(ssn); } else { - Message m = new Message(headers, frames); - queue.offer(m); - System.out.println("queued " + m); + + reject(ssn); } ssn.processed(xfr); xfr = null; @@ -145,8 +147,35 @@ class ToyBroker extends SessionDelegate ssn.messageReject(ranges, 0, "no such destination"); } } + + private void transferMessage(Session ssn,String dest, Message m) + { + System.out.println("\n==================> Transfering message to: " +dest + "\n"); + ssn.messageTransfer(dest, (short)0, (short)0); + ssn.headers(m.headers); + for (Frame f : m.frames) + { + for (ByteBuffer b : f) + { + ssn.data(b); + } + } + ssn.endData(); + } + + public void dispatchMessages(Session ssn) + { + for (String dest: consumers.keySet()) + { + Message m = exchange.getQueue(consumers.get(dest)).poll(); + if(m != null) + { + transferMessage(ssn,dest,m); + } + } + } - private class Message + class Message { private final Struct[] headers; private final List<Frame> frames; @@ -188,14 +217,12 @@ class ToyBroker extends SessionDelegate public static final void main(String[] args) throws IOException { - final Map<String,Queue<Message>> queues = - new HashMap<String,Queue<Message>>(); - + final ToyExchange exchange = new ToyExchange(); ConnectionDelegate delegate = new ConnectionDelegate() { public SessionDelegate getSessionDelegate() { - return new ToyBroker(queues); + return new ToyBroker(exchange); } }; diff --git a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java new file mode 100644 index 0000000000..6fabd22462 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/ToyExchange.java @@ -0,0 +1,132 @@ +package org.apache.qpidity; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.qpidity.ToyBroker.Message; + +public class ToyExchange +{ + final static String DIRECT = "amq.direct"; + final static String TOPIC = "amq.topic"; + + private Map<String,List<Queue<Message>>> directEx = new HashMap<String,List<Queue<Message>>>(); + private Map<String,List<Queue<Message>>> topicEx = new HashMap<String,List<Queue<Message>>>(); + private Map<String,Queue<Message>> queues = new HashMap<String,Queue<Message>>(); + + public void createQueue(String name) + { + queues.put(name, new LinkedList<Message>()); + } + + public Queue<Message> getQueue(String name) + { + return queues.get(name); + } + + public void bindQueue(String type,String binding,String queueName) + { + Queue<Message> queue = queues.get(queueName); + binding = normalizeKey(binding); + if(DIRECT.equals(type)) + { + + if (directEx.containsKey(binding)) + { + List<Queue<Message>> list = directEx.get(binding); + list.add(queue); + } + else + { + List<Queue<Message>> list = new LinkedList<Queue<Message>>(); + list.add(queue); + directEx.put(binding,list); + } + } + else + { + if (topicEx.containsKey(binding)) + { + List<Queue<Message>> list = topicEx.get(binding); + list.add(queue); + } + else + { + List<Queue<Message>> list = new LinkedList<Queue<Message>>(); + list.add(queue); + topicEx.put(binding,list); + } + } + } + + public boolean route(String dest,String routingKey,Message msg) + { + List<Queue<Message>> queues; + if(DIRECT.equals(dest)) + { + queues = directEx.get(routingKey); + } + else + { + queues = matchWildCard(routingKey); + } + if(queues != null && queues.size()>0) + { + System.out.println("Message stored in " + queues.size() + " queues"); + storeMessage(msg,queues); + return true; + } + else + { + System.out.println("Message unroutable " + msg); + return false; + } + } + + private String normalizeKey(String routingKey) + { + if(routingKey.indexOf(".*")>1) + { + return routingKey.substring(0,routingKey.indexOf(".*")); + } + else + { + return routingKey; + } + } + + private List<Queue<Message>> matchWildCard(String routingKey) + { + List<Queue<Message>> selected = new ArrayList<Queue<Message>>(); + + for(String key: topicEx.keySet()) + { + Pattern p = Pattern.compile(key); + Matcher m = p.matcher(routingKey); + if (m.find()) + { + for(Queue<Message> queue : topicEx.get(key)) + { + selected.add(queue); + } + } + } + + return selected; + } + + private void storeMessage(Message msg,List<Queue<Message>> selected) + { + for(Queue<Message> queue : selected) + { + queue.offer(msg); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/api/Message.java b/java/common/src/main/java/org/apache/qpidity/api/Message.java index ccad3577f0..4e4a070fb4 100644 --- a/java/common/src/main/java/org/apache/qpidity/api/Message.java +++ b/java/common/src/main/java/org/apache/qpidity/api/Message.java @@ -1,5 +1,6 @@ package org.apache.qpidity.api; +import java.io.IOException; import java.nio.ByteBuffer; import org.apache.qpidity.MessageProperties; @@ -43,16 +44,16 @@ public interface Message * </ul> * @param src */ - public void appendData(byte[] src); + public void appendData(byte[] src) throws IOException; - public void appendData(ByteBuffer src); + public void appendData(ByteBuffer src) throws IOException; /** * This will abstract the underlying message data. * The Message implementation may not hold all message * data in memory (especially in the case of large messages) * - * The read function might copy data from a + * The read function might copy data from * <ul> * <li> From memory (Ex: ByteBuffer) * <li> From Disk @@ -60,7 +61,8 @@ public interface Message * </ul> * @param target */ - public void readData(byte[] target); + public void readData(byte[] target) throws IOException; + public ByteBuffer readData() throws IOException; } |