diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-01-30 16:44:48 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-01-30 16:44:48 +0000 |
commit | 0b536f6fa61cb192c9cba08c443a10c779aa2a91 (patch) | |
tree | a5d57f7e63220c12ca9bea901843292b468c769e /java | |
parent | 0e139c730f3ed193df52b6cdecce4f71cd92ab71 (diff) | |
download | qpid-python-0b536f6fa61cb192c9cba08c443a10c779aa2a91.tar.gz |
QPID-3789 : [Java] Remove duplication of output converters and optimise startup time
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1237772 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/Broker.java | 5 | ||||
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java (renamed from java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java) | 47 | ||||
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java | 35 | ||||
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java | 431 | ||||
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java | 429 | ||||
-rwxr-xr-x | java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java | 15 | ||||
-rwxr-xr-x | java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java | 8 | ||||
-rw-r--r-- | java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java | 2 |
8 files changed, 59 insertions, 913 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/java/broker/src/main/java/org/apache/qpid/server/Broker.java index 8bc95a32f2..cf5b57e803 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Broker.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Broker.java @@ -186,7 +186,6 @@ public class Broker { bindAddress = InetAddress.getByAddress(parseIP(bindAddr)); } - String hostName = bindAddress.getCanonicalHostName(); if (!serverConfig.getSSLOnly()) { @@ -199,7 +198,7 @@ public class Broker final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance(); final MultiVersionProtocolEngineFactory protocolEngineFactory = - new MultiVersionProtocolEngineFactory(hostName, supported); + new MultiVersionProtocolEngineFactory(supported); transport.accept(settings, protocolEngineFactory, null); ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), @@ -224,7 +223,7 @@ public class Broker final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance(); final MultiVersionProtocolEngineFactory protocolEngineFactory = - new MultiVersionProtocolEngineFactory(hostName, supported); + new MultiVersionProtocolEngineFactory(supported); transport.accept(settings, protocolEngineFactory, sslContext); ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, sslPort), diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java index 8d49481c25..cfdcf7fb43 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java @@ -1,4 +1,4 @@ -package org.apache.qpid.server.output.amqp0_9_1; +package org.apache.qpid.server.output; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -32,15 +32,11 @@ import org.apache.qpid.framing.BasicGetOkBody; import org.apache.qpid.framing.BasicReturnBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.MessageTransferMessage; -import org.apache.qpid.server.output.HeaderPropertiesConverter; -import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.transport.DeliveryProperties; @@ -49,27 +45,17 @@ import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; -public class ProtocolOutputConverterImpl implements ProtocolOutputConverter +class ProtocolOutputConverterImpl implements ProtocolOutputConverter { - private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91); - - public static Factory getInstanceFactory() - { - return new Factory() - { - - public ProtocolOutputConverter newInstance(AMQProtocolSession session) - { - return new ProtocolOutputConverterImpl(session); - } - }; - } + private static final int BASIC_CLASS_ID = 60; + private final MethodRegistry _methodRegistry; private final AMQProtocolSession _protocolSession; - private ProtocolOutputConverterImpl(AMQProtocolSession session) + ProtocolOutputConverterImpl(AMQProtocolSession session, MethodRegistry methodRegistry) { _protocolSession = session; + _methodRegistry = methodRegistry; } @@ -97,7 +83,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter { final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost()); - ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID); + ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID); chb.setBodySize(message.getSize()); return chb; } @@ -199,15 +185,6 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter } } - private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody) - { - - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - contentHeaderBody); - return contentHeader; - } - - public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException { AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize); @@ -248,7 +225,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public AMQBody createAMQBody() { - return METHOD_REGISTRY.createBasicDeliverBody(consumerTag, + return _methodRegistry.createBasicDeliverBody(consumerTag, deliveryTag, isRedelivered, exchangeName, @@ -316,7 +293,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter final boolean isRedelivered = entry.isRedelivered(); BasicGetOkBody getOkBody = - METHOD_REGISTRY.createBasicGetOkBody(deliveryTag, + _methodRegistry.createBasicGetOkBody(deliveryTag, isRedelivered, exchangeName, routingKey, @@ -327,7 +304,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public byte getProtocolMinorVersion() { - return getProtocolSession().getProtocolMinorVersion(); + return _protocolSession.getProtocolMinorVersion(); } public byte getProtocolMajorVersion() @@ -341,7 +318,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter { BasicReturnBody basicReturnBody = - METHOD_REGISTRY.createBasicReturnBody(replyCode, + _methodRegistry.createBasicReturnBody(replyCode, replyText, messagePublishInfo.getExchange(), messagePublishInfo.getRoutingKey()); @@ -369,7 +346,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) { - BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag); + BasicCancelOkBody basicCancelOkBody = _methodRegistry.createBasicCancelOkBody(consumerTag); writeFrame(basicCancelOkBody.generateFrame(channelId)); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java index 5d1ae48535..dcbfd89298 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java @@ -26,6 +26,7 @@ */
package org.apache.qpid.server.output;
+import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.server.output.ProtocolOutputConverter.Factory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -42,19 +43,19 @@ public class ProtocolOutputConverterRegistry static
{
- register(ProtocolVersion.v8_0, org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl.getInstanceFactory());
- register(ProtocolVersion.v0_9, org.apache.qpid.server.output.amqp0_9.ProtocolOutputConverterImpl.getInstanceFactory());
- register(ProtocolVersion.v0_91, org.apache.qpid.server.output.amqp0_9_1.ProtocolOutputConverterImpl.getInstanceFactory());
+ register(ProtocolVersion.v8_0);
+ register(ProtocolVersion.v0_9);
+ register(ProtocolVersion.v0_91);
}
private ProtocolOutputConverterRegistry()
{
}
- private static void register(ProtocolVersion version, Factory converter)
+ private static void register(ProtocolVersion version)
{
- _registry.put(version,converter);
+ _registry.put(version,new ConverterFactory(version));
}
@@ -62,4 +63,28 @@ public class ProtocolOutputConverterRegistry {
return _registry.get(session.getProtocolVersion()).newInstance(session);
}
+
+ private static class ConverterFactory implements Factory
+ {
+ private ProtocolVersion _protocolVersion;
+ private MethodRegistry _methodRegistry;
+ private int _classId;
+
+ public ConverterFactory(ProtocolVersion pv)
+ {
+ _protocolVersion = pv;
+
+ }
+
+ public synchronized ProtocolOutputConverter newInstance(AMQProtocolSession session)
+ {
+ if(_methodRegistry == null)
+ {
+
+ _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
+
+ }
+ return new ProtocolOutputConverterImpl(session, _methodRegistry);
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java deleted file mode 100644 index f90afa0be9..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ /dev/null @@ -1,431 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/* - * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. - * Supported AMQP versions: - * 8-0 - */ -package org.apache.qpid.server.output.amqp0_8; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.BasicGetOkBody; -import org.apache.qpid.framing.BasicReturnBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.server.message.AMQMessage; -import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.server.message.MessageTransferMessage; -import org.apache.qpid.server.output.HeaderPropertiesConverter; -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.transport.DeliveryProperties; - -import java.io.DataOutput; -import java.io.IOException; - -public class ProtocolOutputConverterImpl implements ProtocolOutputConverter -{ - - private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); - - public static Factory getInstanceFactory() - { - return new Factory() - { - - public ProtocolOutputConverter newInstance(AMQProtocolSession session) - { - return new ProtocolOutputConverterImpl(session); - } - }; - } - - - private final AMQProtocolSession _protocolSession; - - private ProtocolOutputConverterImpl(AMQProtocolSession session) - { - _protocolSession = session; - } - - - public AMQProtocolSession getProtocolSession() - { - return _protocolSession; - } - - public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) - throws AMQException - { - AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag); - writeMessageDelivery(entry, channelId, deliverBody); - } - - - private ContentHeaderBody getContentHeaderBody(QueueEntry entry) - throws AMQException - { - if(entry.getMessage() instanceof AMQMessage) - { - return ((AMQMessage)entry.getMessage()).getContentHeaderBody(); - } - else - { - final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost()); - ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID); - chb.setBodySize(message.getSize()); - return chb; - } - } - - - private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody) - throws AMQException - { - writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody); - } - - private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody) - throws AMQException - { - - - int bodySize = (int) message.getSize(); - - if(bodySize == 0) - { - SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody, - contentHeaderBody); - - writeFrame(compositeBlock); - } - else - { - int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead(); - - - int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; - - int writtenSize = capacity; - - AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity); - - CompositeAMQBodyBlock - compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); - writeFrame(compositeBlock); - - while(writtenSize < bodySize) - { - capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize; - MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity); - writtenSize += capacity; - - writeFrame(new AMQFrame(channelId, body)); - } - } - } - - private class MessageContentSourceBody implements AMQBody - { - public static final byte TYPE = 3; - private int _length; - private MessageContentSource _message; - private int _offset; - - public MessageContentSourceBody(MessageContentSource message, int offset, int length) - { - _message = message; - _offset = offset; - _length = length; - } - - public byte getFrameType() - { - return TYPE; - } - - public int getSize() - { - return _length; - } - - public void writePayload(DataOutput buffer) throws IOException - { - byte[] data = new byte[_length]; - - _message.getContent(java.nio.ByteBuffer.wrap(data), _offset); - - buffer.write(data); - } - - public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException - { - throw new UnsupportedOperationException(); - } - } - - private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody) - { - - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - contentHeaderBody); - return contentHeader; - } - - - public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException - { - AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize); - writeMessageDelivery(entry, channelId, deliver); - } - - - private AMQBody createEncodedDeliverBody(QueueEntry entry, - final long deliveryTag, - final AMQShortString consumerTag) - throws AMQException - { - - final AMQShortString exchangeName; - final AMQShortString routingKey; - - if(entry.getMessage() instanceof AMQMessage) - { - final AMQMessage message = (AMQMessage) entry.getMessage(); - final MessagePublishInfo pb = message.getMessagePublishInfo(); - exchangeName = pb.getExchange(); - routingKey = pb.getRoutingKey(); - } - else - { - MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - DeliveryProperties delvProps = message.getHeader().getDeliveryProperties(); - exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange()); - routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey()); - } - - final boolean isRedelivered = entry.isRedelivered(); - - final AMQBody returnBlock = new AMQBody() - { - - private AMQBody _underlyingBody; - - public AMQBody createAMQBody() - { - return METHOD_REGISTRY.createBasicDeliverBody(consumerTag, - deliveryTag, - isRedelivered, - exchangeName, - routingKey); - - - - - - } - - public byte getFrameType() - { - return AMQMethodBody.TYPE; - } - - public int getSize() - { - if(_underlyingBody == null) - { - _underlyingBody = createAMQBody(); - } - return _underlyingBody.getSize(); - } - - public void writePayload(DataOutput buffer) throws IOException - { - if(_underlyingBody == null) - { - _underlyingBody = createAMQBody(); - } - _underlyingBody.writePayload(buffer); - } - - public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) - throws AMQException - { - throw new AMQException("This block should never be dispatched!"); - } - }; - return returnBlock; - } - - private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize) - throws AMQException - { - final AMQShortString exchangeName; - final AMQShortString routingKey; - - if(entry.getMessage() instanceof AMQMessage) - { - final AMQMessage message = (AMQMessage) entry.getMessage(); - final MessagePublishInfo pb = message.getMessagePublishInfo(); - exchangeName = pb.getExchange(); - routingKey = pb.getRoutingKey(); - } - else - { - MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - DeliveryProperties delvProps = message.getHeader().getDeliveryProperties(); - exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange()); - routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey()); - } - - final boolean isRedelivered = entry.isRedelivered(); - - BasicGetOkBody getOkBody = - METHOD_REGISTRY.createBasicGetOkBody(deliveryTag, - isRedelivered, - exchangeName, - routingKey, - queueSize); - - return getOkBody; - } - - public byte getProtocolMinorVersion() - { - return getProtocolSession().getProtocolMinorVersion(); - } - - public byte getProtocolMajorVersion() - { - return getProtocolSession().getProtocolMajorVersion(); - } - - private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, - int replyCode, - AMQShortString replyText) throws AMQException - { - - BasicReturnBody basicReturnBody = - METHOD_REGISTRY.createBasicReturnBody(replyCode, - replyText, - messagePublishInfo.getExchange(), - messagePublishInfo.getRoutingKey()); - - - return basicReturnBody; - } - - public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText) - throws AMQException - { - - AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText); - - writeMessageDelivery(message, header, channelId, returnFrame); - } - - - public void writeFrame(AMQDataBlock block) - { - getProtocolSession().writeFrame(block); - } - - - public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) - { - - BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag); - writeFrame(basicCancelOkBody.generateFrame(channelId)); - - } - - - public static final class CompositeAMQBodyBlock extends AMQDataBlock - { - public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead(); - - private final AMQBody _methodBody; - private final AMQBody _headerBody; - private final AMQBody _contentBody; - private final int _channel; - - - public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody) - { - _channel = channel; - _methodBody = methodBody; - _headerBody = headerBody; - _contentBody = contentBody; - - } - - public long getSize() - { - return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize(); - } - - public void writePayload(DataOutput buffer) throws IOException - { - AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); - } - } - - public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock - { - public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead(); - - private final AMQBody _methodBody; - private final AMQBody _headerBody; - private final int _channel; - - - public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody) - { - _channel = channel; - _methodBody = methodBody; - _headerBody = headerBody; - - } - - public long getSize() - { - return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ; - } - - public void writePayload(DataOutput buffer) throws IOException - { - AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); - } - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java deleted file mode 100644 index 8bee368b80..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ /dev/null @@ -1,429 +0,0 @@ -package org.apache.qpid.server.output.amqp0_9; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.BasicGetOkBody; -import org.apache.qpid.framing.BasicReturnBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.amqp_0_9.BasicGetBodyImpl; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.server.message.AMQMessage; -import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.server.message.MessageTransferMessage; -import org.apache.qpid.server.output.HeaderPropertiesConverter; -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.transport.DeliveryProperties; - -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; - -public class ProtocolOutputConverterImpl implements ProtocolOutputConverter -{ - private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); - - - public static Factory getInstanceFactory() - { - return new Factory() - { - - public ProtocolOutputConverter newInstance(AMQProtocolSession session) - { - return new ProtocolOutputConverterImpl(session); - } - }; - } - - private final AMQProtocolSession _protocolSession; - - private ProtocolOutputConverterImpl(AMQProtocolSession session) - { - _protocolSession = session; - } - - - public AMQProtocolSession getProtocolSession() - { - return _protocolSession; - } - - public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) - throws AMQException - { - AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag); - writeMessageDelivery(entry, channelId, deliverBody); - } - - - private ContentHeaderBody getContentHeaderBody(QueueEntry entry) - throws AMQException - { - if(entry.getMessage() instanceof AMQMessage) - { - return ((AMQMessage)entry.getMessage()).getContentHeaderBody(); - } - else - { - final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost()); - ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID); - chb.setBodySize(message.getSize()); - return chb; - } - } - - - private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody) - throws AMQException - { - writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody); - } - - private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody) - throws AMQException - { - - - int bodySize = (int) message.getSize(); - - if(bodySize == 0) - { - SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody, - contentHeaderBody); - - writeFrame(compositeBlock); - } - else - { - int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead(); - - - int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; - - int writtenSize = capacity; - - AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity); - - - CompositeAMQBodyBlock - compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); - writeFrame(compositeBlock); - - while(writtenSize < bodySize) - { - capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize; - MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity); - writtenSize += capacity; - - writeFrame(new AMQFrame(channelId, body)); - } - } - } - - private class MessageContentSourceBody implements AMQBody - { - public static final byte TYPE = 3; - private int _length; - private MessageContentSource _message; - private int _offset; - - public MessageContentSourceBody(MessageContentSource message, int offset, int length) - { - _message = message; - _offset = offset; - _length = length; - } - - public byte getFrameType() - { - return TYPE; - } - - public int getSize() - { - return _length; - } - - public void writePayload(DataOutput buffer) throws IOException - { - byte[] data = new byte[_length]; - - _message.getContent(ByteBuffer.wrap(data), _offset); - - buffer.write(data); - } - - public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException - { - throw new UnsupportedOperationException(); - } - } - - - private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody) - { - - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - contentHeaderBody); - return contentHeader; - } - - - public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException - { - AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize); - writeMessageDelivery(entry, channelId, deliver); - } - - - private AMQBody createEncodedDeliverBody(QueueEntry entry, - final long deliveryTag, - final AMQShortString consumerTag) - throws AMQException - { - - final AMQShortString exchangeName; - final AMQShortString routingKey; - - if(entry.getMessage() instanceof AMQMessage) - { - final AMQMessage message = (AMQMessage) entry.getMessage(); - final MessagePublishInfo pb = message.getMessagePublishInfo(); - exchangeName = pb.getExchange(); - routingKey = pb.getRoutingKey(); - } - else - { - MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - DeliveryProperties delvProps = message.getHeader().getDeliveryProperties(); - exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange()); - routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey()); - } - - final boolean isRedelivered = entry.isRedelivered(); - - final AMQBody returnBlock = new AMQBody() - { - - private AMQBody _underlyingBody; - - public AMQBody createAMQBody() - { - return METHOD_REGISTRY.createBasicDeliverBody(consumerTag, - deliveryTag, - isRedelivered, - exchangeName, - routingKey); - - - - - - } - - public byte getFrameType() - { - return AMQMethodBody.TYPE; - } - - public int getSize() - { - if(_underlyingBody == null) - { - _underlyingBody = createAMQBody(); - } - return _underlyingBody.getSize(); - } - - public void writePayload(DataOutput buffer) throws IOException - { - if(_underlyingBody == null) - { - _underlyingBody = createAMQBody(); - } - _underlyingBody.writePayload(buffer); - } - - public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) - throws AMQException - { - throw new AMQException("This block should never be dispatched!"); - } - }; - return returnBlock; - } - - private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize) - throws AMQException - { - final AMQShortString exchangeName; - final AMQShortString routingKey; - - if(entry.getMessage() instanceof AMQMessage) - { - final AMQMessage message = (AMQMessage) entry.getMessage(); - final MessagePublishInfo pb = message.getMessagePublishInfo(); - exchangeName = pb.getExchange(); - routingKey = pb.getRoutingKey(); - } - else - { - MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - DeliveryProperties delvProps = message.getHeader().getDeliveryProperties(); - exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange()); - routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey()); - } - - final boolean isRedelivered = entry.isRedelivered(); - - BasicGetOkBody getOkBody = - METHOD_REGISTRY.createBasicGetOkBody(deliveryTag, - isRedelivered, - exchangeName, - routingKey, - queueSize); - - return getOkBody; - } - - public byte getProtocolMinorVersion() - { - return getProtocolSession().getProtocolMinorVersion(); - } - - public byte getProtocolMajorVersion() - { - return getProtocolSession().getProtocolMajorVersion(); - } - - private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, - int replyCode, - AMQShortString replyText) throws AMQException - { - - BasicReturnBody basicReturnBody = - METHOD_REGISTRY.createBasicReturnBody(replyCode, - replyText, - messagePublishInfo.getExchange(), - messagePublishInfo.getRoutingKey()); - - - return basicReturnBody; - } - - public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText) - throws AMQException - { - - AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText); - - writeMessageDelivery(message, header, channelId, returnFrame); - } - - - public void writeFrame(AMQDataBlock block) - { - getProtocolSession().writeFrame(block); - } - - - public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) - { - - BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag); - writeFrame(basicCancelOkBody.generateFrame(channelId)); - - } - - - public static final class CompositeAMQBodyBlock extends AMQDataBlock - { - public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead(); - - private final AMQBody _methodBody; - private final AMQBody _headerBody; - private final AMQBody _contentBody; - private final int _channel; - - - public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody) - { - _channel = channel; - _methodBody = methodBody; - _headerBody = headerBody; - _contentBody = contentBody; - - } - - public long getSize() - { - return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize(); - } - - public void writePayload(DataOutput buffer) throws IOException - { - AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); - } - } - - public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock - { - public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead(); - - private final AMQBody _methodBody; - private final AMQBody _headerBody; - private final int _channel; - - - public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody) - { - _channel = channel; - _methodBody = methodBody; - _headerBody = headerBody; - - } - - public long getSize() - { - return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ; - } - - public void writePayload(DataOutput buffer) throws IOException - { - AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); - } - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index b734a06d5f..bba8e83e14 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -30,6 +30,7 @@ import org.apache.qpid.transport.ConnectionDelegate; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.Set; @@ -49,23 +50,20 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine(); public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, - String fqdn, Set<AmqpProtocolVersion> supported, NetworkConnection network, long id) { - this(appRegistry,fqdn,supported,id); + this(appRegistry, supported,id); setNetworkConnection(network); } public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, - String fqdn, Set<AmqpProtocolVersion> supported, long id) { _id = id; _appRegistry = appRegistry; - _fqdn = fqdn; _supported = supported; } @@ -177,6 +175,15 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) { _network = network; + SocketAddress address = _network.getLocalAddress(); + if (address instanceof InetSocketAddress) + { + _fqdn = ((InetSocketAddress) address).getHostName(); + } + else + { + throw new IllegalArgumentException("Unsupported socket address class: " + address); + } _sender = sender; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java index 69dec79267..ce5095dd2b 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -34,24 +34,22 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory private static final AtomicLong ID_GENERATOR = new AtomicLong(0); private final IApplicationRegistry _appRegistry; - private final String _fqdn; private final Set<AmqpProtocolVersion> _supported; - public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions) + public MultiVersionProtocolEngineFactory(Set<AmqpProtocolVersion> supportedVersions) { _appRegistry = ApplicationRegistry.getInstance(); - _fqdn = fqdn; _supported = supportedVersions; } public ServerProtocolEngine newProtocolEngine(NetworkConnection network) { - return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement()); + return new MultiVersionProtocolEngine(_appRegistry, _supported, network, ID_GENERATOR.getAndIncrement()); } public ServerProtocolEngine newProtocolEngine() { - return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, ID_GENERATOR.getAndIncrement()); + return new MultiVersionProtocolEngine(_appRegistry, _supported, ID_GENERATOR.getAndIncrement()); } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java index 31b293216b..24a735c274 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java @@ -121,7 +121,7 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase Set<AmqpProtocolVersion> versions = EnumSet.allOf(AmqpProtocolVersion.class); MultiVersionProtocolEngineFactory factory = - new MultiVersionProtocolEngineFactory("localhost", versions); + new MultiVersionProtocolEngineFactory(versions); //create a dummy to retrieve the 'current' ID number long previousId = factory.newProtocolEngine(new TestNetworkConnection()).getConnectionId(); |