summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/ClientSession.java34
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Session.java45
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java111
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java41
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java93
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java34
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java48
7 files changed, 277 insertions, 129 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
index fcde60fa04..b5d8add9e2 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
@@ -1,5 +1,7 @@
package org.apache.qpidity.client;
+import java.io.EOFException;
+import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -45,15 +47,37 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa
super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options);
}
- public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode)
+ public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode) throws IOException
{
- // need to break it down into small pieces
- super.messageTransfer(exchange, confirmMode, acquireMode);
+ // The javadoc clearly says that this method is suitable for small messages
+ // therefore reading the content in one shot.
+ super.messageTransfer(destination, confirmMode, acquireMode);
super.headers(msg.getDeliveryProperties(),msg.getMessageProperties());
- // super.data(bytes); *
- // super.endData()
+ super.data(msg.readData());
+ super.endData();
}
+ public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException
+ {
+ super.messageTransfer(destination, confirmMode, acquireMode);
+ super.headers(msg.getDeliveryProperties(),msg.getMessageProperties());
+ boolean b = true;
+ int count = 0;
+ while(b)
+ {
+ try
+ {
+ System.out.println("count : " + count++);
+ super.data(msg.readData());
+ }
+ catch(EOFException e)
+ {
+ b = false;
+ }
+ }
+
+ super.endData();
+ }
public RangeSet getAccquiredMessages()
{
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java
index 09595c8d0b..e4f2ae217c 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Session.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java
@@ -18,6 +18,7 @@
*/
package org.apache.qpidity.client;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
@@ -72,7 +73,47 @@ public interface Session
//------------------------------------------------------
/**
* Transfer the given message to a specified exchange.
+ *
+ * <p>This is a convinience method for providing a complete message
+ * using a single method which internaly is mapped to messageTransfer(), headers() followed
+ * by data() and an endData().
+ * <b><i>This method should only be used by small messages</b></i></p>
*
+ * @param destination The exchange the message is being sent.
+ * @param msg The Message to be sent
+ * @param confirmMode <ul> </li>off (0): confirmation is not required, once a message has been transferred in pre-acquire
+ * mode (or once acquire has been sent in no-acquire mode) the message is considered
+ * transferred
+ * <p/>
+ * <li> on (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or
+ * explicit as in no-acquire mode) is not considered transferred until the original
+ * transfer is complete (signaled via execution.complete)
+ * </ul>
+ * @param acquireMode <ul>
+ * <li> no-acquire (0): the message must be explicitly acquired
+ * <li> pre-acquire (1): the message is acquired when the transfer starts
+ * </ul>
+ */
+ public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode)throws IOException;
+
+ /**
+ * <p>This is a convinience method for streaming a message using pull semantics
+ * using a single method as opposed to doing a push using messageTransfer(), headers() followed
+ * by a series of data() and an endData().</p>
+ * <p>Internally data will be pulled from Message object(which wrap's a data stream) using it's read()
+ * and pushed using messageTransfer(), headers() followed by a series of data() and an endData().
+ * <br><b><i>This method should only be used by large messages</b></i><br>
+ * There are two convinience Message classes provided to facilitate this.
+ * <ul>
+ * <li> <code>FileMessage</code>
+ * <li> <code>StreamingMessage</code>
+ * </ul>
+ * You could also implement a the <code>Message</code> interface to and wrap any
+ * data stream you want.
+ * </p>
+ *
+ * @param destination The exchange the message is being sent.
+ * @param msg The Message to be sent
* @param confirmMode <ul> </li>off (0): confirmation is not required, once a message has been transferred in pre-acquire
* mode (or once acquire has been sent in no-acquire mode) the message is considered
* transferred
@@ -85,10 +126,8 @@ public interface Session
* <p/>
* <li> pre-acquire (1): the message is acquired when the transfer starts
* </ul>
- * @param exchange The exchange the message is being sent.
- * @param msg The Message to be sent
*/
- public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode);
+ public void messageStream(String destination, Message msg, short confirmMode, short acquireMode)throws IOException;
/**
* Declare the beginning of a message transfer operation. This operation must
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
new file mode 100644
index 0000000000..d67b32d019
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
@@ -0,0 +1,111 @@
+package org.apache.qpidity.client.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+/**
+ * <p>A Simple implementation of the message interface
+ * for small messages. When the readData methods are called
+ * we assume the message is complete. i.e there want be any
+ * appendData operations after that.</p>
+ *
+ * <p>If you need large message support please see
+ * <code>FileMessage</code> and <code>StreamingMessage</code>
+ * </p>
+ */
+public class ByteBufferMessage implements Message
+{
+ private Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>();
+ private ByteBuffer _readBuffer;
+ private int dataSize;
+ private DeliveryProperties _currentDeliveryProps;
+ private MessageProperties _currentMessageProps;
+
+
+ public void appendData(byte[] src) throws IOException
+ {
+ appendData(ByteBuffer.wrap(src));
+ }
+
+ public void appendData(ByteBuffer src) throws IOException
+ {
+ _data.offer(src);
+ dataSize += src.remaining();
+ }
+
+ public DeliveryProperties getDeliveryProperties()
+ {
+ return _currentDeliveryProps;
+ }
+
+ public MessageProperties getMessageProperties()
+ {
+ return _currentMessageProps;
+ }
+
+ public void setDeliveryProperties(DeliveryProperties props)
+ {
+ _currentDeliveryProps = props;
+ }
+
+ public void setMessageProperties(MessageProperties props)
+ {
+ _currentMessageProps = props;
+ }
+
+ public void readData(byte[] target) throws IOException
+ {
+ if (_data.size() >0 && _readBuffer == null)
+ {
+ buildReadBuffer();
+ }
+
+ _readBuffer.get(target);
+ }
+
+ public ByteBuffer readData() throws IOException
+ {
+ if (_data.size() >0 && _readBuffer == null)
+ {
+ buildReadBuffer();
+ }
+
+ return _readBuffer;
+ }
+
+ private void buildReadBuffer()
+ {
+ //optimize for the simple cases
+ if(_data.size() == 1)
+ {
+ _readBuffer = _data.element().duplicate();
+ }
+ else
+ {
+ _readBuffer = ByteBuffer.allocate(dataSize);
+ for(ByteBuffer buf:_data)
+ {
+ _readBuffer.put(buf);
+ }
+ }
+ }
+
+ //hack for testing
+ @Override public String toString()
+ {
+ if (_data.size() >0 && _readBuffer == null)
+ {
+ buildReadBuffer();
+ }
+ ByteBuffer temp = _readBuffer.duplicate();
+ byte[] b = new byte[temp.limit()];
+ temp.get(b);
+ return new String(b);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
index 84f18dcca4..8f62334fcd 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
@@ -1,5 +1,6 @@
package org.apache.qpidity.client.util;
+import java.io.EOFException;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -29,10 +30,8 @@ import org.apache.qpidity.api.Message;
* and stream it.
*
*/
-public class FileMessage implements Message
+public class FileMessage extends ReadOnlyMessage implements Message
{
- private MessageProperties _messageProperties;
- private DeliveryProperties _deliveryProperties;
private FileChannel _fileChannel;
private int _chunkSize;
private long _fileSize;
@@ -52,46 +51,24 @@ public class FileMessage implements Message
_chunkSize = (int)_fileSize;
}
}
-
- public void appendData(byte[] src)
- {
- throw new UnsupportedOperationException("This Message is read only after the initial source");
- }
-
- public void appendData(ByteBuffer src)
- {
- throw new UnsupportedOperationException("This Message is read only after the initial source");
- }
-
- public DeliveryProperties getDeliveryProperties()
- {
- return _deliveryProperties;
- }
-
- public MessageProperties getMessageProperties()
- {
- return _messageProperties;
- }
public void readData(byte[] target) throws IOException
{
- int readLen = target.length <= _chunkSize ? target.length : _chunkSize;
- if (_pos + readLen > _fileSize)
- {
- readLen = (int)(_fileSize - _pos);
- }
- MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, readLen);
- _pos += readLen;
- bb.get(target);
+ throw new UnsupportedOperationException();
}
public ByteBuffer readData() throws IOException
{
+ if (_pos == _fileSize)
+ {
+ throw new EOFException();
+ }
+
if (_pos + _chunkSize > _fileSize)
{
_chunkSize = (int)(_fileSize - _pos);
}
- MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, _chunkSize);
+ MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, _chunkSize);
_pos += _chunkSize;
return bb;
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
index 4ff83db939..88f950eb5d 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
@@ -2,13 +2,10 @@ package org.apache.qpidity.client.util;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.Queue;
import org.apache.qpidity.DeliveryProperties;
import org.apache.qpidity.MessageProperties;
import org.apache.qpidity.Struct;
-import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.MessageListener;
import org.apache.qpidity.client.MessagePartListener;
@@ -23,94 +20,12 @@ import org.apache.qpidity.client.MessagePartListener;
public class MessagePartListenerAdapter implements MessagePartListener
{
MessageListener _adaptee;
- Message _currentMsg;
- DeliveryProperties _currentDeliveryProps;
- MessageProperties _currentMessageProps;
+ ByteBufferMessage _currentMsg;
public MessagePartListenerAdapter(MessageListener listener)
{
_adaptee = listener;
-
- // temp solution.
- _currentMsg = new Message()
- {
- Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>();
- ByteBuffer _readBuffer;
- private int dataSize;
-
- public void appendData(byte[] src) throws IOException
- {
- appendData(ByteBuffer.wrap(src));
- }
-
- public void appendData(ByteBuffer src) throws IOException
- {
- _data.offer(src);
- dataSize += src.remaining();
- }
-
- public DeliveryProperties getDeliveryProperties()
- {
- return _currentDeliveryProps;
- }
-
- public MessageProperties getMessageProperties()
- {
- return _currentMessageProps;
- }
-
- // since we provide the message only after completion
- // we can assume that when this method is called we have
- // received all data.
- public void readData(byte[] target) throws IOException
- {
- if (_data.size() >0 && _readBuffer == null)
- {
- buildReadBuffer();
- }
-
- _readBuffer.get(target);
- }
-
- public ByteBuffer readData() throws IOException
- {
- if (_data.size() >0 && _readBuffer == null)
- {
- buildReadBuffer();
- }
-
- return _readBuffer;
- }
-
- private void buildReadBuffer()
- {
- //optimize for the simple cases
- if(_data.size() == 1)
- {
- _readBuffer = _data.element().duplicate();
- }
- else
- {
- _readBuffer = ByteBuffer.allocate(dataSize);
- for(ByteBuffer buf:_data)
- {
- _readBuffer.put(buf);
- }
- }
- }
-
- //hack for testing
- @Override public String toString()
- {
- if (_data.size() >0 && _readBuffer == null)
- {
- buildReadBuffer();
- }
- byte[] b = new byte[_readBuffer.limit()];
- _readBuffer.get(b);
- return new String(b);
- }
- };
+ _currentMsg = new ByteBufferMessage();
}
public void addData(ByteBuffer src)
@@ -133,11 +48,11 @@ public class MessagePartListenerAdapter implements MessagePartListener
{
if(struct instanceof DeliveryProperties)
{
- _currentDeliveryProps = (DeliveryProperties)struct;
+ _currentMsg.setDeliveryProperties((DeliveryProperties)struct);
}
else if (struct instanceof MessageProperties)
{
- _currentMessageProps = (MessageProperties)struct;
+ _currentMsg.setMessageProperties((MessageProperties)struct);
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
new file mode 100644
index 0000000000..d6b4b65942
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
@@ -0,0 +1,34 @@
+package org.apache.qpidity.client.util;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+public abstract class ReadOnlyMessage implements Message
+{
+ MessageProperties _messageProperties;
+ DeliveryProperties _deliveryProperties;
+
+ public void appendData(byte[] src)
+ {
+ throw new UnsupportedOperationException("This Message is read only after the initial source");
+ }
+
+ public void appendData(ByteBuffer src)
+ {
+ throw new UnsupportedOperationException("This Message is read only after the initial source");
+ }
+
+ public DeliveryProperties getDeliveryProperties()
+ {
+ return _deliveryProperties;
+ }
+
+ public MessageProperties getMessageProperties()
+ {
+ return _messageProperties;
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
new file mode 100644
index 0000000000..9dfab40721
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
@@ -0,0 +1,48 @@
+package org.apache.qpidity.client.util;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+public class StreamingMessage extends ReadOnlyMessage implements Message
+{
+ SocketChannel _socChannel;
+ private int _chunkSize;
+ private ByteBuffer _readBuf;
+
+ public StreamingMessage(SocketChannel in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException
+ {
+ _messageProperties = messageProperties;
+ _deliveryProperties = deliveryProperties;
+
+ _socChannel = in;
+ _chunkSize = chunkSize;
+ _readBuf = ByteBuffer.allocate(_chunkSize);
+ }
+
+ public void readData(byte[] target) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public ByteBuffer readData() throws IOException
+ {
+ if(_socChannel.isConnected() && _socChannel.isOpen())
+ {
+ _readBuf.clear();
+ _socChannel.read(_readBuf);
+ }
+ else
+ {
+ throw new EOFException("The underlying socket/channel has closed");
+ }
+
+ return _readBuf.duplicate();
+ }
+
+}