diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java')
-rw-r--r-- | qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java | 39 |
1 files changed, 39 insertions, 0 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java index b616aab126..4a84ccad37 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java @@ -42,6 +42,7 @@ import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.util.GZIPUtils; public class ProtocolOutputConverterImpl implements ProtocolOutputConverter @@ -255,6 +256,15 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter } } + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + ByteBuffer buf = _message.getContent(_offset, _length); + long size = buf.remaining(); + sender.send(buf.duplicate()); + return size; + } + public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException { throw new UnsupportedOperationException(); @@ -346,6 +356,15 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter _underlyingBody.writePayload(buffer); } + public long writePayload(ByteBufferSender sender) throws IOException + { + if(_underlyingBody == null) + { + _underlyingBody = createAMQBody(); + } + return _underlyingBody.writePayload(sender); + } + public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException { @@ -449,6 +468,18 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter } @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender); + + size += (new AMQFrame(_channel, _headerBody)).writePayload(sender); + + size += (new AMQFrame(_channel, _contentBody)).writePayload(sender); + + return size; + } + + @Override public String toString() { StringBuilder builder = new StringBuilder(); @@ -490,6 +521,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter } @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender); + size += (new AMQFrame(_channel, _headerBody)).writePayload(sender); + return size; + } + + @Override public String toString() { StringBuilder builder = new StringBuilder(); |