summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java')
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java39
1 files changed, 39 insertions, 0 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
index b616aab126..4a84ccad37 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
@@ -42,6 +42,7 @@ import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.util.GZIPUtils;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@@ -255,6 +256,15 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ ByteBuffer buf = _message.getContent(_offset, _length);
+ long size = buf.remaining();
+ sender.send(buf.duplicate());
+ return size;
+ }
+
public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
{
throw new UnsupportedOperationException();
@@ -346,6 +356,15 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
_underlyingBody.writePayload(buffer);
}
+ public long writePayload(ByteBufferSender sender) throws IOException
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.writePayload(sender);
+ }
+
public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
throws AMQException
{
@@ -449,6 +468,18 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
@Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender);
+
+ size += (new AMQFrame(_channel, _headerBody)).writePayload(sender);
+
+ size += (new AMQFrame(_channel, _contentBody)).writePayload(sender);
+
+ return size;
+ }
+
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder();
@@ -490,6 +521,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
@Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender);
+ size += (new AMQFrame(_channel, _headerBody)).writePayload(sender);
+ return size;
+ }
+
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder();