diff options
7 files changed, 277 insertions, 129 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java index fcde60fa04..b5d8add9e2 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java @@ -1,5 +1,7 @@ package org.apache.qpidity.client; +import java.io.EOFException; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -45,15 +47,37 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options); } - public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) + public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode) throws IOException { - // need to break it down into small pieces - super.messageTransfer(exchange, confirmMode, acquireMode); + // The javadoc clearly says that this method is suitable for small messages + // therefore reading the content in one shot. + super.messageTransfer(destination, confirmMode, acquireMode); super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); - // super.data(bytes); * - // super.endData() + super.data(msg.readData()); + super.endData(); } + public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException + { + super.messageTransfer(destination, confirmMode, acquireMode); + super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); + boolean b = true; + int count = 0; + while(b) + { + try + { + System.out.println("count : " + count++); + super.data(msg.readData()); + } + catch(EOFException e) + { + b = false; + } + } + + super.endData(); + } public RangeSet getAccquiredMessages() { diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index 09595c8d0b..e4f2ae217c 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -18,6 +18,7 @@ */
package org.apache.qpidity.client;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
@@ -72,7 +73,47 @@ public interface Session //------------------------------------------------------
/**
* Transfer the given message to a specified exchange.
+ *
+ * <p>This is a convinience method for providing a complete message
+ * using a single method which internaly is mapped to messageTransfer(), headers() followed
+ * by data() and an endData().
+ * <b><i>This method should only be used by small messages</b></i></p>
*
+ * @param destination The exchange the message is being sent.
+ * @param msg The Message to be sent
+ * @param confirmMode <ul> </li>off (0): confirmation is not required, once a message has been transferred in pre-acquire
+ * mode (or once acquire has been sent in no-acquire mode) the message is considered
+ * transferred
+ * <p/>
+ * <li> on (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or
+ * explicit as in no-acquire mode) is not considered transferred until the original
+ * transfer is complete (signaled via execution.complete)
+ * </ul>
+ * @param acquireMode <ul>
+ * <li> no-acquire (0): the message must be explicitly acquired
+ * <li> pre-acquire (1): the message is acquired when the transfer starts
+ * </ul>
+ */
+ public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode)throws IOException;
+
+ /**
+ * <p>This is a convinience method for streaming a message using pull semantics
+ * using a single method as opposed to doing a push using messageTransfer(), headers() followed
+ * by a series of data() and an endData().</p>
+ * <p>Internally data will be pulled from Message object(which wrap's a data stream) using it's read()
+ * and pushed using messageTransfer(), headers() followed by a series of data() and an endData().
+ * <br><b><i>This method should only be used by large messages</b></i><br>
+ * There are two convinience Message classes provided to facilitate this.
+ * <ul>
+ * <li> <code>FileMessage</code>
+ * <li> <code>StreamingMessage</code>
+ * </ul>
+ * You could also implement a the <code>Message</code> interface to and wrap any
+ * data stream you want.
+ * </p>
+ *
+ * @param destination The exchange the message is being sent.
+ * @param msg The Message to be sent
* @param confirmMode <ul> </li>off (0): confirmation is not required, once a message has been transferred in pre-acquire
* mode (or once acquire has been sent in no-acquire mode) the message is considered
* transferred
@@ -85,10 +126,8 @@ public interface Session * <p/>
* <li> pre-acquire (1): the message is acquired when the transfer starts
* </ul>
- * @param exchange The exchange the message is being sent.
- * @param msg The Message to be sent
*/
- public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode);
+ public void messageStream(String destination, Message msg, short confirmMode, short acquireMode)throws IOException;
/**
* Declare the beginning of a message transfer operation. This operation must
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java new file mode 100644 index 0000000000..d67b32d019 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java @@ -0,0 +1,111 @@ +package org.apache.qpidity.client.util; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.api.Message; + +/** + * <p>A Simple implementation of the message interface + * for small messages. When the readData methods are called + * we assume the message is complete. i.e there want be any + * appendData operations after that.</p> + * + * <p>If you need large message support please see + * <code>FileMessage</code> and <code>StreamingMessage</code> + * </p> + */ +public class ByteBufferMessage implements Message +{ + private Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>(); + private ByteBuffer _readBuffer; + private int dataSize; + private DeliveryProperties _currentDeliveryProps; + private MessageProperties _currentMessageProps; + + + public void appendData(byte[] src) throws IOException + { + appendData(ByteBuffer.wrap(src)); + } + + public void appendData(ByteBuffer src) throws IOException + { + _data.offer(src); + dataSize += src.remaining(); + } + + public DeliveryProperties getDeliveryProperties() + { + return _currentDeliveryProps; + } + + public MessageProperties getMessageProperties() + { + return _currentMessageProps; + } + + public void setDeliveryProperties(DeliveryProperties props) + { + _currentDeliveryProps = props; + } + + public void setMessageProperties(MessageProperties props) + { + _currentMessageProps = props; + } + + public void readData(byte[] target) throws IOException + { + if (_data.size() >0 && _readBuffer == null) + { + buildReadBuffer(); + } + + _readBuffer.get(target); + } + + public ByteBuffer readData() throws IOException + { + if (_data.size() >0 && _readBuffer == null) + { + buildReadBuffer(); + } + + return _readBuffer; + } + + private void buildReadBuffer() + { + //optimize for the simple cases + if(_data.size() == 1) + { + _readBuffer = _data.element().duplicate(); + } + else + { + _readBuffer = ByteBuffer.allocate(dataSize); + for(ByteBuffer buf:_data) + { + _readBuffer.put(buf); + } + } + } + + //hack for testing + @Override public String toString() + { + if (_data.size() >0 && _readBuffer == null) + { + buildReadBuffer(); + } + ByteBuffer temp = _readBuffer.duplicate(); + byte[] b = new byte[temp.limit()]; + temp.get(b); + return new String(b); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java index 84f18dcca4..8f62334fcd 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java @@ -1,5 +1,6 @@ package org.apache.qpidity.client.util; +import java.io.EOFException; import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -29,10 +30,8 @@ import org.apache.qpidity.api.Message; * and stream it. * */ -public class FileMessage implements Message +public class FileMessage extends ReadOnlyMessage implements Message { - private MessageProperties _messageProperties; - private DeliveryProperties _deliveryProperties; private FileChannel _fileChannel; private int _chunkSize; private long _fileSize; @@ -52,46 +51,24 @@ public class FileMessage implements Message _chunkSize = (int)_fileSize; } } - - public void appendData(byte[] src) - { - throw new UnsupportedOperationException("This Message is read only after the initial source"); - } - - public void appendData(ByteBuffer src) - { - throw new UnsupportedOperationException("This Message is read only after the initial source"); - } - - public DeliveryProperties getDeliveryProperties() - { - return _deliveryProperties; - } - - public MessageProperties getMessageProperties() - { - return _messageProperties; - } public void readData(byte[] target) throws IOException { - int readLen = target.length <= _chunkSize ? target.length : _chunkSize; - if (_pos + readLen > _fileSize) - { - readLen = (int)(_fileSize - _pos); - } - MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, readLen); - _pos += readLen; - bb.get(target); + throw new UnsupportedOperationException(); } public ByteBuffer readData() throws IOException { + if (_pos == _fileSize) + { + throw new EOFException(); + } + if (_pos + _chunkSize > _fileSize) { _chunkSize = (int)(_fileSize - _pos); } - MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, _chunkSize); + MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, _chunkSize); _pos += _chunkSize; return bb; } diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java index 4ff83db939..88f950eb5d 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java @@ -2,13 +2,10 @@ package org.apache.qpidity.client.util; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.Queue; import org.apache.qpidity.DeliveryProperties; import org.apache.qpidity.MessageProperties; import org.apache.qpidity.Struct; -import org.apache.qpidity.api.Message; import org.apache.qpidity.client.MessageListener; import org.apache.qpidity.client.MessagePartListener; @@ -23,94 +20,12 @@ import org.apache.qpidity.client.MessagePartListener; public class MessagePartListenerAdapter implements MessagePartListener { MessageListener _adaptee; - Message _currentMsg; - DeliveryProperties _currentDeliveryProps; - MessageProperties _currentMessageProps; + ByteBufferMessage _currentMsg; public MessagePartListenerAdapter(MessageListener listener) { _adaptee = listener; - - // temp solution. - _currentMsg = new Message() - { - Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>(); - ByteBuffer _readBuffer; - private int dataSize; - - public void appendData(byte[] src) throws IOException - { - appendData(ByteBuffer.wrap(src)); - } - - public void appendData(ByteBuffer src) throws IOException - { - _data.offer(src); - dataSize += src.remaining(); - } - - public DeliveryProperties getDeliveryProperties() - { - return _currentDeliveryProps; - } - - public MessageProperties getMessageProperties() - { - return _currentMessageProps; - } - - // since we provide the message only after completion - // we can assume that when this method is called we have - // received all data. - public void readData(byte[] target) throws IOException - { - if (_data.size() >0 && _readBuffer == null) - { - buildReadBuffer(); - } - - _readBuffer.get(target); - } - - public ByteBuffer readData() throws IOException - { - if (_data.size() >0 && _readBuffer == null) - { - buildReadBuffer(); - } - - return _readBuffer; - } - - private void buildReadBuffer() - { - //optimize for the simple cases - if(_data.size() == 1) - { - _readBuffer = _data.element().duplicate(); - } - else - { - _readBuffer = ByteBuffer.allocate(dataSize); - for(ByteBuffer buf:_data) - { - _readBuffer.put(buf); - } - } - } - - //hack for testing - @Override public String toString() - { - if (_data.size() >0 && _readBuffer == null) - { - buildReadBuffer(); - } - byte[] b = new byte[_readBuffer.limit()]; - _readBuffer.get(b); - return new String(b); - } - }; + _currentMsg = new ByteBufferMessage(); } public void addData(ByteBuffer src) @@ -133,11 +48,11 @@ public class MessagePartListenerAdapter implements MessagePartListener { if(struct instanceof DeliveryProperties) { - _currentDeliveryProps = (DeliveryProperties)struct; + _currentMsg.setDeliveryProperties((DeliveryProperties)struct); } else if (struct instanceof MessageProperties) { - _currentMessageProps = (MessageProperties)struct; + _currentMsg.setMessageProperties((MessageProperties)struct); } } } diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java new file mode 100644 index 0000000000..d6b4b65942 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java @@ -0,0 +1,34 @@ +package org.apache.qpidity.client.util; + +import java.nio.ByteBuffer; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.api.Message; + +public abstract class ReadOnlyMessage implements Message +{ + MessageProperties _messageProperties; + DeliveryProperties _deliveryProperties; + + public void appendData(byte[] src) + { + throw new UnsupportedOperationException("This Message is read only after the initial source"); + } + + public void appendData(ByteBuffer src) + { + throw new UnsupportedOperationException("This Message is read only after the initial source"); + } + + public DeliveryProperties getDeliveryProperties() + { + return _deliveryProperties; + } + + public MessageProperties getMessageProperties() + { + return _messageProperties; + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java new file mode 100644 index 0000000000..9dfab40721 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java @@ -0,0 +1,48 @@ +package org.apache.qpidity.client.util; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.api.Message; + +public class StreamingMessage extends ReadOnlyMessage implements Message +{ + SocketChannel _socChannel; + private int _chunkSize; + private ByteBuffer _readBuf; + + public StreamingMessage(SocketChannel in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException + { + _messageProperties = messageProperties; + _deliveryProperties = deliveryProperties; + + _socChannel = in; + _chunkSize = chunkSize; + _readBuf = ByteBuffer.allocate(_chunkSize); + } + + public void readData(byte[] target) throws IOException + { + throw new UnsupportedOperationException(); + } + + public ByteBuffer readData() throws IOException + { + if(_socChannel.isConnected() && _socChannel.isOpen()) + { + _readBuf.clear(); + _socChannel.read(_readBuf); + } + else + { + throw new EOFException("The underlying socket/channel has closed"); + } + + return _readBuf.duplicate(); + } + +} |