summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-09-30 15:30:43 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-09-30 15:30:43 +0000
commit907a77ca345ae585038d502a059f9e0a42d1d50c (patch)
tree6a205097a89c60fe4daaba0c8ec1a5545220f800
parent65bd98e140d6a186721219cd2b94c6be82ef0dba (diff)
downloadqpid-python-907a77ca345ae585038d502a059f9e0a42d1d50c.tar.gz
Remove handlers
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6125-ProtocolRefactoring@1628473 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java53
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java32
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java4
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java36
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java43
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java76
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java66
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java75
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java219
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java224
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java118
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java58
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java72
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java66
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java131
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java76
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java55
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java68
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java75
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java72
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java63
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java116
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java139
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java164
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java89
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java238
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java196
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java108
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/OnCurrentThreadExecutor.java34
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java161
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java226
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java129
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java126
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java152
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java2049
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java80
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java79
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java71
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java85
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java85
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java61
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java10
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java56
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterRegistry.java90
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/StateAwareMethodListener.java35
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java6
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java30
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java7
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockMessagePublishInfo.java52
-rwxr-xr-xjava/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java2
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java61
-rw-r--r--java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java33
-rw-r--r--java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java44
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java14
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java50
55 files changed, 2004 insertions, 4556 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
index 262931f054..1d689090ae 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
@@ -30,7 +30,18 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.bind.tuple.TupleBase;
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
import org.apache.log4j.Logger;
+
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQFrameDecodingException;
@@ -41,22 +52,11 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
-import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
-import com.sleepycat.bind.tuple.ByteBinding;
-import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.bind.tuple.TupleBase;
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.Transaction;
-
public class UpgradeFrom4To5 extends AbstractStoreUpgrade
{
private static final String OLD_DELIVERY_DB = "deliveryDb_v4";
@@ -662,34 +662,7 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade
final boolean mandatory = tupleInput.readBoolean();
final boolean immediate = tupleInput.readBoolean();
- return new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return exchange;
- }
-
- public void setExchange(AMQShortString exchange)
- {
-
- }
-
- public boolean isImmediate()
- {
- return immediate;
- }
-
- public boolean isMandatory()
- {
- return mandatory;
- }
-
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- };
+ return new MessagePublishInfo(exchange, immediate, mandatory, routingKey);
}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index 1c1c13ce07..745fb6186d 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -234,36 +234,8 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
private MessagePublishInfo createPublishInfoBody_0_8()
{
- return new MessagePublishInfo()
- {
- public AMQShortString getExchange()
- {
- return new AMQShortString("exchange12345");
- }
-
- @Override
- public void setExchange(AMQShortString exchange)
- {
- }
-
- @Override
- public boolean isImmediate()
- {
- return false;
- }
-
- @Override
- public boolean isMandatory()
- {
- return true;
- }
-
- @Override
- public AMQShortString getRoutingKey()
- {
- return new AMQShortString("routingKey12345");
- }
- };
+ return new MessagePublishInfo(new AMQShortString("exchange12345"), false, true,
+ new AMQShortString("routingKey12345"));
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index cc6b76957a..188c8b6879 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -74,7 +74,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
-import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry;
+import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterImpl;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
@@ -1229,7 +1229,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
_protocolVersion = pv;
_methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
- _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
+ _protocolOutputConverter = new ProtocolOutputConverterImpl(this);
_dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(this);
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index 7fca10b388..5469b9760d 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -135,38 +135,10 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
private MessageMetaData convertMetaData(final InternalMessage serverMsg, final String bodyMimeType, final int size)
{
- MessagePublishInfo publishInfo = new MessagePublishInfo()
- {
- @Override
- public AMQShortString getExchange()
- {
- return AMQShortString.EMPTY_STRING;
- }
-
- @Override
- public void setExchange(final AMQShortString amqShortString)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isImmediate()
- {
- return false;
- }
-
- @Override
- public boolean isMandatory()
- {
- return false;
- }
-
- @Override
- public AMQShortString getRoutingKey()
- {
- return AMQShortString.valueOf(serverMsg.getInitialRoutingAddress());
- }
- };
+ MessagePublishInfo publishInfo = new MessagePublishInfo(AMQShortString.EMPTY_STRING,
+ false,
+ false,
+ AMQShortString.valueOf(serverMsg.getInitialRoutingAddress()));
final BasicContentHeaderProperties props = new BasicContentHeaderProperties();
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index 7cc00e713e..6b2902d0fa 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -20,7 +20,12 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.Set;
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.AMQProtocolVersionException;
@@ -37,12 +42,6 @@ import org.apache.qpid.server.util.ByteBufferOutputStream;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.util.ByteBufferInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Set;
-
/**
* Encapsulates a publish body and a content header. In the context of the message store these are treated as a
* single unit.
@@ -170,33 +169,11 @@ public class MessageMetaData implements StorableMessageMetaData
long arrivalTime = EncodingUtils.readLong(dais);
MessagePublishInfo publishBody =
- new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return exchange;
- }
-
- public void setExchange(AMQShortString exchange)
- {
- }
-
- public boolean isImmediate()
- {
- return (flags & IMMEDIATE_FLAG) != 0;
- }
-
- public boolean isMandatory()
- {
- return (flags & MANDATORY_FLAG) != 0;
- }
-
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- };
+ new MessagePublishInfo(exchange,
+ (flags & IMMEDIATE_FLAG) != 0,
+ (flags & MANDATORY_FLAG) != 0,
+ routingKey);
+
return new MessageMetaData(publishBody, chb, arrivalTime);
}
catch (IOException e)
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java
deleted file mode 100644
index df66120731..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AccessRequestBody;
-import org.apache.qpid.framing.AccessRequestOkBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-/**
- * @author Apache Software Foundation
- *
- *
- */
-public class AccessRequestHandler implements StateAwareMethodListener<AccessRequestBody>
-{
- private static final AccessRequestHandler _instance = new AccessRequestHandler();
-
-
- public static AccessRequestHandler getInstance()
- {
- return _instance;
- }
-
- private AccessRequestHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- AccessRequestBody body,
- int channelId) throws AMQException
- {
- final AMQChannel channel = connection.getChannel(channelId);
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
-
- MethodRegistry methodRegistry = connection.getMethodRegistry();
-
- if(ProtocolVersion.v0_91.equals(connection.getProtocolVersion()) )
- {
- throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9");
- }
-
- // We don't implement access control class, but to keep clients happy that expect it
- // always use the "0" ticket.
- AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
- channel.sync();
- connection.writeFrame(response.generateFrame(channelId));
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java
deleted file mode 100644
index efc91800a1..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckBody>
-{
- private static final Logger _log = Logger.getLogger(BasicAckMethodHandler.class);
-
- private static final BasicAckMethodHandler _instance = new BasicAckMethodHandler();
-
- public static BasicAckMethodHandler getInstance()
- {
- return _instance;
- }
-
- private BasicAckMethodHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- BasicAckBody body,
- int channelId) throws AMQException
- {
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Ack(Tag:" + body.getDeliveryTag() + ":Mult:" + body.getMultiple() + ") received on channel " + channelId);
- }
-
- final AMQChannel channel = connection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
-
- // this method throws an AMQException if the delivery tag is not known
- channel.acknowledgeMessage(body.getDeliveryTag(), body.getMultiple());
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java
deleted file mode 100644
index 16498b3e88..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicCancelBody>
-{
- private static final Logger _log = Logger.getLogger(BasicCancelMethodHandler.class);
-
- private static final BasicCancelMethodHandler _instance = new BasicCancelMethodHandler();
-
- public static BasicCancelMethodHandler getInstance()
- {
- return _instance;
- }
-
- private BasicCancelMethodHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- BasicCancelBody body,
- int channelId) throws AMQException
- {
- final AMQChannel channel = connection.getChannel(channelId);
-
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
-
- if (_log.isDebugEnabled())
- {
- _log.debug("BasicCancel: for:" + body.getConsumerTag() +
- " nowait:" + body.getNowait());
- }
-
- channel.unsubscribeConsumer(body.getConsumerTag());
- if (!body.getNowait())
- {
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(body.getConsumerTag());
- channel.sync();
- connection.writeFrame(cancelOkBody.generateFrame(channelId));
- }
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
deleted file mode 100644
index b4219fe29c..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import java.security.AccessControlException;
-import java.util.Collection;
-import java.util.HashSet;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.filter.AMQInvalidArgumentException;
-import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-
-public class BasicConsumeMethodHandler implements StateAwareMethodListener<BasicConsumeBody>
-{
- private static final Logger _logger = Logger.getLogger(BasicConsumeMethodHandler.class);
-
- private static final BasicConsumeMethodHandler _instance = new BasicConsumeMethodHandler();
-
- public static BasicConsumeMethodHandler getInstance()
- {
- return _instance;
- }
-
- private BasicConsumeMethodHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- BasicConsumeBody body,
- int channelId) throws AMQException
- {
- AMQChannel channel = connection.getChannel(channelId);
- VirtualHostImpl<?,?,?> vHost = connection.getVirtualHost();
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
- else
- {
- channel.sync();
- String queueName = body.getQueue() == null ? null : body.getQueue().asString();
- if (_logger.isDebugEnabled())
- {
- _logger.debug("BasicConsume: from '" + queueName +
- "' for:" + body.getConsumerTag() +
- " nowait:" + body.getNowait() +
- " args:" + body.getArguments());
- }
-
- MessageSource queue = queueName == null ? channel.getDefaultQueue() : vHost.getQueue(queueName);
- final Collection<MessageSource> sources = new HashSet<>();
- if(queue != null)
- {
- sources.add(queue);
- }
- else if(vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
- && body.getArguments() != null
- && body.getArguments().get("x-multiqueue") instanceof Collection)
- {
- for(Object object : (Collection<Object>)body.getArguments().get("x-multiqueue"))
- {
- String sourceName = String.valueOf(object);
- sourceName = sourceName.trim();
- if(sourceName.length() != 0)
- {
- MessageSource source = vHost.getMessageSource(sourceName);
- if(source == null)
- {
- sources.clear();
- break;
- }
- else
- {
- sources.add(source);
- }
- }
- }
- queueName = body.getArguments().get("x-multiqueue").toString();
- }
-
- if (sources.isEmpty())
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("No queue for '" + queueName + "'");
- }
- if (queueName != null)
- {
- String msg = "No such queue, '" + queueName + "'";
- throw body.getChannelException(AMQConstant.NOT_FOUND, msg, connection.getMethodRegistry());
- }
- else
- {
- String msg = "No queue name provided, no default queue defined.";
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg, connection.getMethodRegistry());
- }
- }
- else
- {
- final AMQShortString consumerTagName;
-
- if (body.getConsumerTag() != null)
- {
- consumerTagName = body.getConsumerTag().intern(false);
- }
- else
- {
- consumerTagName = null;
- }
-
- try
- {
- if(consumerTagName == null || channel.getSubscription(consumerTagName) == null)
- {
-
- AMQShortString consumerTag = channel.consumeFromSource(consumerTagName,
- sources,
- !body.getNoAck(),
- body.getArguments(),
- body.getExclusive(),
- body.getNoLocal());
- if (!body.getNowait())
- {
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
- connection.writeFrame(responseBody.generateFrame(channelId));
-
- }
- }
- else
- {
- AMQShortString msg = AMQShortString.validValueOf("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
-
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- msg, // replytext
- body.getClazz(),
- body.getMethod());
- connection.writeFrame(responseBody.generateFrame(0));
- }
-
- }
- catch (AMQInvalidArgumentException ise)
- {
- _logger.debug("Closing connection due to invalid selector");
-
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(),
- AMQShortString.validValueOf(ise.getMessage()),
- body.getClazz(),
- body.getMethod());
- connection.writeFrame(responseBody.generateFrame(channelId));
-
-
- }
- catch (AMQQueue.ExistingExclusiveConsumer e)
- {
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
- "Cannot subscribe to queue "
- + queue.getName()
- + " as it already has an existing exclusive consumer", connection.getMethodRegistry());
- }
- catch (AMQQueue.ExistingConsumerPreventsExclusive e)
- {
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
- "Cannot subscribe to queue "
- + queue.getName()
- + " exclusively as it already has a consumer", connection.getMethodRegistry());
- }
- catch (AccessControlException e)
- {
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
- "Cannot subscribe to queue "
- + queue.getName()
- + " permission denied", connection.getMethodRegistry());
- }
- catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
- {
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
- "Cannot subscribe to queue "
- + queue.getName()
- + " as it already has an incompatible exclusivity policy", connection.getMethodRegistry());
- }
-
- }
- }
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
deleted file mode 100644
index d650292546..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import java.security.AccessControlException;
-import java.util.EnumSet;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicGetBody;
-import org.apache.qpid.framing.BasicGetEmptyBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.flow.MessageOnlyCreditManager;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQMessage;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
-import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
-import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-
-public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
-{
- private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
-
- private static final BasicGetMethodHandler _instance = new BasicGetMethodHandler();
-
- public static BasicGetMethodHandler getInstance()
- {
- return _instance;
- }
-
- private BasicGetMethodHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- BasicGetBody body,
- int channelId) throws AMQException
- {
-
- VirtualHostImpl vHost = connection.getVirtualHost();
-
- AMQChannel channel = connection.getChannel(channelId);
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
- else
- {
- channel.sync();
- AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().toString());
- if (queue == null)
- {
- _log.info("No queue for '" + body.getQueue() + "'");
- if(body.getQueue()!=null)
- {
- throw body.getConnectionException(AMQConstant.NOT_FOUND,
- "No such queue, '" + body.getQueue()+ "'", connection.getMethodRegistry());
- }
- else
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "No queue name provided, no default queue defined.", connection.getMethodRegistry());
- }
- }
- else
- {
-
- try
- {
- if (!performGet(queue,connection, channel, !body.getNoAck()))
- {
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- // TODO - set clusterId
- BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
-
-
- connection.writeFrame(responseBody.generateFrame(channelId));
- }
- }
- catch (AccessControlException e)
- {
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
- e.getMessage(), connection.getMethodRegistry());
- }
- catch (MessageSource.ExistingExclusiveConsumer e)
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue has an exclusive consumer", connection.getMethodRegistry());
- }
- catch (MessageSource.ExistingConsumerPreventsExclusive e)
- {
- throw body.getConnectionException(AMQConstant.INTERNAL_ERROR,
- "The GET request has been evaluated as an exclusive consumer, " +
- "this is likely due to a programming error in the Qpid broker",
- connection.getMethodRegistry());
- }
- catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue has an incompatible exclusivit policy", connection.getMethodRegistry());
- }
- }
- }
- }
-
- public static boolean performGet(final AMQQueue queue,
- final AMQProtocolSession session,
- final AMQChannel channel,
- final boolean acks)
- throws AMQException, MessageSource.ExistingConsumerPreventsExclusive,
- MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused
- {
-
- final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
-
- final GetDeliveryMethod getDeliveryMethod =
- new GetDeliveryMethod(singleMessageCredit, session, channel, queue);
- final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
- {
-
- public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag)
- {
- channel.addUnacknowledgedMessage(entry, deliveryTag, null);
- }
- };
-
- ConsumerTarget_0_8 target;
- EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES);
- if(acks)
- {
-
- target = ConsumerTarget_0_8.createAckTarget(channel,
- AMQShortString.EMPTY_STRING, null,
- singleMessageCredit, getDeliveryMethod, getRecordMethod);
- }
- else
- {
- target = ConsumerTarget_0_8.createGetNoAckTarget(channel,
- AMQShortString.EMPTY_STRING, null,
- singleMessageCredit, getDeliveryMethod, getRecordMethod);
- }
-
- ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
- sub.flush();
- sub.close();
- return(getDeliveryMethod.hasDeliveredMessage());
-
-
- }
-
-
- private static class GetDeliveryMethod implements ClientDeliveryMethod
- {
-
- private final FlowCreditManager _singleMessageCredit;
- private final AMQProtocolSession _session;
- private final AMQChannel _channel;
- private final AMQQueue _queue;
- private boolean _deliveredMessage;
-
- public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
- final AMQProtocolSession session,
- final AMQChannel channel, final AMQQueue queue)
- {
- _singleMessageCredit = singleMessageCredit;
- _session = session;
- _channel = channel;
- _queue = queue;
- }
-
- @Override
- public long deliverToClient(final ConsumerImpl sub, final ServerMessage message,
- final InstanceProperties props, final long deliveryTag)
- {
- _singleMessageCredit.useCreditForMessage(message.getSize());
- long size =_session.getProtocolOutputConverter().writeGetOk(message,
- props,
- _channel.getChannelId(),
- deliveryTag,
- _queue.getQueueDepthMessages());
-
- _deliveredMessage = true;
- return size;
- }
-
- public boolean hasDeliveredMessage()
- {
- return _deliveredMessage;
- }
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
deleted file mode 100644
index 41f727c7d4..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import java.security.AccessControlException;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.MessagePublishInfo;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.message.MessageDestination;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-
-public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody>
-{
- private static final Logger _logger = Logger.getLogger(BasicPublishMethodHandler.class);
-
- private static final BasicPublishMethodHandler _instance = new BasicPublishMethodHandler();
-
-
- public static BasicPublishMethodHandler getInstance()
- {
- return _instance;
- }
-
- private BasicPublishMethodHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- BasicPublishBody body,
- int channelId) throws AMQException
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Publish received on channel " + channelId);
- }
-
- AMQShortString exchangeName = body.getExchange();
- VirtualHostImpl vHost = connection.getVirtualHost();
-
- // TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
-
- MessageDestination destination;
-
- if (exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName))
- {
- destination = vHost.getDefaultDestination();
- }
- else
- {
- destination = vHost.getMessageDestination(exchangeName.toString());
- }
-
- // if the exchange does not exist we raise a channel exception
- if (destination == null)
- {
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name",
- connection.getMethodRegistry());
- }
- else
- {
- // The partially populated BasicDeliver frame plus the received route body
- // is stored in the channel. Once the final body frame has been received
- // it is routed to the exchange.
- AMQChannel channel = connection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
-
- MessagePublishInfo info = new MessagePublishInfo(body.getExchange(),
- body.getImmediate(),
- body.getMandatory(),
- body.getRoutingKey());
- info.setExchange(exchangeName);
- try
- {
- channel.setPublishFrame(info, destination);
- }
- catch (AccessControlException e)
- {
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
- e.getMessage(),
- connection.getMethodRegistry());
- }
- }
- }
-
-}
-
-
-
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java
deleted file mode 100644
index 9464be4c6e..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
-{
- private static final BasicQosHandler _instance = new BasicQosHandler();
-
- public static BasicQosHandler getInstance()
- {
- return _instance;
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- BasicQosBody body,
- int channelId) throws AMQException
- {
- AMQChannel channel = connection.getChannel(channelId);
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
- channel.sync();
- channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount());
-
-
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody();
- connection.writeFrame(responseBody.generateFrame(channelId));
-
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java
deleted file mode 100644
index 29ddf4421a..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-public class BasicRecoverMethodHandler implements StateAwareMethodListener<BasicRecoverBody>
-{
- private static final Logger _logger = Logger.getLogger(BasicRecoverMethodHandler.class);
-
- private static final BasicRecoverMethodHandler _instance = new BasicRecoverMethodHandler();
-
- public static BasicRecoverMethodHandler getInstance()
- {
- return _instance;
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- BasicRecoverBody body,
- int channelId) throws AMQException
- {
- _logger.debug("Recover received on protocol session " + connection + " and channel " + channelId);
- AMQChannel channel = connection.getChannel(channelId);
-
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
-
- channel.resend();
-
- // Qpid 0-8 hacks a synchronous -ok onto recover.
- // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
- if(connection.getProtocolVersion().equals(ProtocolVersion.v8_0))
- {
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
- channel.sync();
- connection.writeFrame(recoverOk.generateFrame(channelId));
-
- }
-
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java
deleted file mode 100644
index b75492a65d..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicRecoverSyncBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<BasicRecoverSyncBody>
-{
- private static final Logger _logger = Logger.getLogger(BasicRecoverSyncMethodHandler.class);
-
- private static final BasicRecoverSyncMethodHandler _instance = new BasicRecoverSyncMethodHandler();
-
- public static BasicRecoverSyncMethodHandler getInstance()
- {
- return _instance;
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- BasicRecoverSyncBody body,
- int channelId) throws AMQException
- {
-
- _logger.debug("Recover received on protocol session " + connection + " and channel " + channelId);
- AMQChannel channel = connection.getChannel(channelId);
-
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
- channel.sync();
- channel.resend();
-
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
- connection.writeFrame(recoverOk.generateFrame(channelId));
-
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
deleted file mode 100644
index 1f299893f9..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicRejectBody;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicRejectBody>
-{
- private static final Logger _logger = Logger.getLogger(BasicRejectMethodHandler.class);
-
- private static BasicRejectMethodHandler _instance = new BasicRejectMethodHandler();
-
- public static BasicRejectMethodHandler getInstance()
- {
- return _instance;
- }
-
- private BasicRejectMethodHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- BasicRejectBody body,
- int channelId) throws AMQException
- {
-
- AMQChannel channel = connection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Rejecting:" + body.getDeliveryTag() +
- ": Requeue:" + body.getRequeue() +
- " on channel:" + channel.debugIdentity());
- }
-
- long deliveryTag = body.getDeliveryTag();
-
- MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
-
- if (message == null)
- {
- _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
- }
- else
- {
-
- if (message.getMessage() == null)
- {
- _logger.warn("Message has already been purged, unable to Reject.");
- return;
- }
-
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() +
- ": Requeue:" + body.getRequeue() +
- " on channel:" + channel.debugIdentity());
- }
-
- if (body.getRequeue())
- {
- //this requeue represents a message rejected from the pre-dispatch queue
- //therefore we need to amend the delivery counter.
- message.decrementDeliveryCount();
-
- channel.requeue(deliveryTag);
- }
- else
- {
- // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here
- // as it would prevent redelivery
- // message.reject();
-
- final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
- _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
- if (maxDeliveryCountEnabled)
- {
- final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
- _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
- if (deliveredTooManyTimes)
- {
- channel.deadLetter(body.getDeliveryTag());
- }
- else
- {
- //this requeue represents a message rejected because of a recover/rollback that we
- //are not ready to DLQ. We rely on the reject command to resend from the unacked map
- //and therefore need to increment the delivery counter so we cancel out the effect
- //of the AMQChannel#resend() decrement.
- message.incrementDeliveryCount();
- }
- }
- else
- {
- channel.requeue(deliveryTag);
- }
- }
- }
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java
deleted file mode 100644
index a9e52c5240..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody>
-{
- private static final Logger _logger = Logger.getLogger(ChannelCloseHandler.class);
-
- private static ChannelCloseHandler _instance = new ChannelCloseHandler();
-
- public static ChannelCloseHandler getInstance()
- {
- return _instance;
- }
-
- private ChannelCloseHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- ChannelCloseBody body,
- int channelId) throws AMQException
- {
-
- if (_logger.isInfoEnabled())
- {
- _logger.info("Received channel close for id " + channelId + " citing class " + body.getClassId() +
- " and method " + body.getMethodId());
- }
-
-
- AMQChannel channel = connection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel", connection.getMethodRegistry());
- }
- channel.sync();
- connection.closeChannel(channelId);
- // Client requested closure so we don't wait for ok we send it
- connection.closeChannelOk(channelId);
-
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody();
- connection.writeFrame(responseBody.generateFrame(channelId));
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java
deleted file mode 100644
index fe9d20e151..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCloseOkBody>
-{
- private static final Logger _logger = Logger.getLogger(ChannelCloseOkHandler.class);
-
- private static ChannelCloseOkHandler _instance = new ChannelCloseOkHandler();
-
- public static ChannelCloseOkHandler getInstance()
- {
- return _instance;
- }
-
- private ChannelCloseOkHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- ChannelCloseOkBody body,
- int channelId) throws AMQException
- {
-
- _logger.info("Received channel-close-ok for channel-id " + channelId);
-
- // Let the Protocol Session know the channel is now closed.
- connection.closeChannelOk(channelId);
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java
deleted file mode 100644
index 99c0e3b2de..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowBody>
-{
- private static final Logger _logger = Logger.getLogger(ChannelFlowHandler.class);
-
- private static ChannelFlowHandler _instance = new ChannelFlowHandler();
-
- public static ChannelFlowHandler getInstance()
- {
- return _instance;
- }
-
- private ChannelFlowHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- ChannelFlowBody body,
- int channelId) throws AMQException
- {
-
-
- AMQChannel channel = connection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
- channel.sync();
- channel.setSuspended(!body.getActive());
- _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive());
-
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive());
- connection.writeFrame(responseBody.generateFrame(channelId));
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java
deleted file mode 100644
index cb1a59ba2a..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-
-public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenBody>
-{
- private static final Logger _logger = Logger.getLogger(ChannelOpenHandler.class);
-
- private static ChannelOpenHandler _instance = new ChannelOpenHandler();
-
- public static ChannelOpenHandler getInstance()
- {
- return _instance;
- }
-
- private ChannelOpenHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- ChannelOpenBody body,
- int channelId) throws AMQException
- {
- VirtualHostImpl virtualHost = connection.getVirtualHost();
-
- // Protect the broker against out of order frame request.
- if (virtualHost == null)
- {
- throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null);
- }
- _logger.info("Connecting to: " + virtualHost.getName());
-
- final AMQChannel channel = new AMQChannel(connection,channelId, virtualHost.getMessageStore());
-
- connection.addChannel(channel);
-
- ChannelOpenOkBody response;
-
-
- response = connection.getMethodRegistry().createChannelOpenOkBody();
-
-
-
- connection.writeFrame(response.generateFrame(channelId));
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java
deleted file mode 100644
index c4a8eb4acb..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
-{
- private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class);
-
- private static ConnectionCloseMethodHandler _instance = new ConnectionCloseMethodHandler();
-
- public static ConnectionCloseMethodHandler getInstance()
- {
- return _instance;
- }
-
- private ConnectionCloseMethodHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- ConnectionCloseBody body,
- int channelId) throws AMQException
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" +
- body.getReplyText() + " for " + connection);
- }
- try
- {
- connection.closeSession();
- }
- catch (Exception e)
- {
- _logger.error("Error closing protocol session: " + e, e);
- }
-
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
- connection.writeFrame(responseBody.generateFrame(channelId));
-
- connection.closeProtocolSession();
-
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java
deleted file mode 100644
index 03c43cc80a..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener<ConnectionCloseOkBody>
-{
- private static final Logger _logger = Logger.getLogger(ConnectionCloseOkMethodHandler.class);
-
- private static ConnectionCloseOkMethodHandler _instance = new ConnectionCloseOkMethodHandler();
-
- public static ConnectionCloseOkMethodHandler getInstance()
- {
- return _instance;
- }
-
- private ConnectionCloseOkMethodHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- ConnectionCloseOkBody body,
- int channelId) throws AMQException
- {
- //todo should this not do more than just log the method?
- _logger.info("Received Connection-close-ok");
-
- try
- {
- connection.changeState(AMQState.CONNECTION_CLOSED);
- connection.closeSession();
- }
- catch (Exception e)
- {
- _logger.error("Error closing protocol session: " + e, e);
- }
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
deleted file mode 100644
index 20c5e90f5d..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import java.security.AccessControlException;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-
-public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody>
-{
- private static final Logger _logger = Logger.getLogger(ConnectionOpenMethodHandler.class);
-
- private static ConnectionOpenMethodHandler _instance = new ConnectionOpenMethodHandler();
-
- public static ConnectionOpenMethodHandler getInstance()
- {
- return _instance;
- }
-
- private ConnectionOpenMethodHandler()
- {
- }
-
- private static AMQShortString generateClientID()
- {
- return new AMQShortString(Long.toString(System.currentTimeMillis()));
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- ConnectionOpenBody body,
- int channelId) throws AMQException
- {
-
- //ignore leading '/'
- String virtualHostName;
- if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/')
- {
- virtualHostName = new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString();
- }
- else
- {
- virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost());
- }
-
- VirtualHostImpl virtualHost = ((AmqpPort)connection.getPort()).getVirtualHost(virtualHostName);
-
- if (virtualHost == null)
- {
- throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'",
- connection.getMethodRegistry());
- }
- else
- {
- // Check virtualhost access
- if (virtualHost.getState() != State.ACTIVE)
- {
- throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active",
- connection.getMethodRegistry());
- }
-
- connection.setVirtualHost(virtualHost);
- try
- {
- virtualHost.getSecurityManager().authoriseCreateConnection(connection);
- }
- catch (AccessControlException e)
- {
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
- }
-
- // See Spec (0.8.2). Section 3.1.2 Virtual Hosts
- if (connection.getContextKey() == null)
- {
- connection.setContextKey(generateClientID());
- }
-
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
-
- connection.changeState(AMQState.CONNECTION_OPEN);
-
- connection.writeFrame(responseBody.generateFrame(channelId));
- }
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
deleted file mode 100644
index 001719759a..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
-
-public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener<ConnectionSecureOkBody>
-{
- private static final Logger _logger = Logger.getLogger(ConnectionSecureOkMethodHandler.class);
-
- private static ConnectionSecureOkMethodHandler _instance = new ConnectionSecureOkMethodHandler();
-
- public static ConnectionSecureOkMethodHandler getInstance()
- {
- return _instance;
- }
-
- private ConnectionSecureOkMethodHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- ConnectionSecureOkBody body,
- int channelId) throws AMQException
- {
- Broker<?> broker = connection.getBroker();
-
- SubjectCreator subjectCreator = connection.getSubjectCreator();
-
- SaslServer ss = connection.getSaslServer();
- if (ss == null)
- {
- throw new AMQException("No SASL context set up in session");
- }
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
- switch (authResult.getStatus())
- {
- case ERROR:
- Exception cause = authResult.getCause();
-
- _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
-
- // This should be abstracted
- connection.changeState(AMQState.CONNECTION_CLOSING);
-
- ConnectionCloseBody connectionCloseBody =
- methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
- AMQConstant.NOT_ALLOWED.getName(),
- body.getClazz(),
- body.getMethod());
-
- connection.writeFrame(connectionCloseBody.generateFrame(0));
- disposeSaslServer(connection);
- break;
- case SUCCESS:
- if (_logger.isInfoEnabled())
- {
- _logger.info("Connected as: " + authResult.getSubject());
- }
- connection.changeState(AMQState.CONNECTION_NOT_TUNED);
-
- int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
-
- if(frameMax <= 0)
- {
- frameMax = Integer.MAX_VALUE;
- }
-
- ConnectionTuneBody tuneBody =
- methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
- frameMax,
- broker.getConnection_heartBeatDelay());
- connection.writeFrame(tuneBody.generateFrame(0));
- connection.setAuthorizedSubject(authResult.getSubject());
- disposeSaslServer(connection);
- break;
- case CONTINUE:
- connection.changeState(AMQState.CONNECTION_NOT_AUTH);
-
- ConnectionSecureBody
- secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
- connection.writeFrame(secureBody.generateFrame(0));
- }
- }
-
- private void disposeSaslServer(AMQProtocolSession ps)
- {
- SaslServer ss = ps.getSaslServer();
- if (ss != null)
- {
- ps.setSaslServer(null);
- try
- {
- ss.dispose();
- }
- catch (SaslException e)
- {
- _logger.error("Error disposing of Sasl server: " + e);
- }
- }
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
deleted file mode 100644
index 328c03bf74..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
-
-
-public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<ConnectionStartOkBody>
-{
- private static final Logger _logger = Logger.getLogger(ConnectionStartOkMethodHandler.class);
-
- private static ConnectionStartOkMethodHandler _instance = new ConnectionStartOkMethodHandler();
-
- public static ConnectionStartOkMethodHandler getInstance()
- {
- return _instance;
- }
-
- private ConnectionStartOkMethodHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- ConnectionStartOkBody body,
- int channelId) throws AMQException
- {
- Broker<?> broker = connection.getBroker();
-
- _logger.info("SASL Mechanism selected: " + body.getMechanism());
- _logger.info("Locale selected: " + body.getLocale());
-
- SubjectCreator subjectCreator = connection.getSubjectCreator();
- SaslServer ss = null;
- try
- {
- ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()),
- connection.getLocalFQDN(),
- connection.getPeerPrincipal());
-
- if (ss == null)
- {
- throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.getMechanism(),
- connection.getMethodRegistry());
- }
-
- connection.setSaslServer(ss);
-
- final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
- //save clientProperties
- connection.setClientProperties(body.getClientProperties());
-
- MethodRegistry methodRegistry = connection.getMethodRegistry();
-
- switch (authResult.getStatus())
- {
- case ERROR:
- Exception cause = authResult.getCause();
-
- _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
-
- connection.changeState(AMQState.CONNECTION_CLOSING);
-
- ConnectionCloseBody closeBody =
- methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- AMQConstant.NOT_ALLOWED.getName(),
- body.getClazz(),
- body.getMethod());
-
- connection.writeFrame(closeBody.generateFrame(0));
- disposeSaslServer(connection);
- break;
-
- case SUCCESS:
- if (_logger.isInfoEnabled())
- {
- _logger.info("Connected as: " + authResult.getSubject());
- }
- connection.setAuthorizedSubject(authResult.getSubject());
-
- connection.changeState(AMQState.CONNECTION_NOT_TUNED);
- int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
-
- if(frameMax <= 0)
- {
- frameMax = Integer.MAX_VALUE;
- }
-
- ConnectionTuneBody
- tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
- frameMax,
- broker.getConnection_heartBeatDelay());
- connection.writeFrame(tuneBody.generateFrame(0));
- break;
- case CONTINUE:
- connection.changeState(AMQState.CONNECTION_NOT_AUTH);
-
- ConnectionSecureBody
- secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
- connection.writeFrame(secureBody.generateFrame(0));
- }
- }
- catch (SaslException e)
- {
- disposeSaslServer(connection);
- throw new AMQException("SASL error: " + e, e);
- }
- }
-
- private void disposeSaslServer(AMQProtocolSession ps)
- {
- SaslServer ss = ps.getSaslServer();
- if (ss != null)
- {
- ps.setSaslServer(null);
- try
- {
- ss.dispose();
- }
- catch (SaslException e)
- {
- _logger.error("Error disposing of Sasl server: " + e);
- }
- }
- }
-
-}
-
-
-
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java
deleted file mode 100644
index d5f066063d..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQConnectionException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<ConnectionTuneOkBody>
-{
- private static final Logger _logger = Logger.getLogger(ConnectionTuneOkMethodHandler.class);
-
- private static ConnectionTuneOkMethodHandler _instance = new ConnectionTuneOkMethodHandler();
-
- public static ConnectionTuneOkMethodHandler getInstance()
- {
- return _instance;
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- ConnectionTuneOkBody body,
- int channelId) throws AMQException
- {
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug(body);
- }
- connection.changeState(AMQState.CONNECTION_NOT_OPENED);
-
- connection.initHeartbeats(body.getHeartbeat());
-
- int brokerFrameMax = connection.getBroker().getContextValue(Integer.class,Broker.BROKER_FRAME_SIZE);
- if(brokerFrameMax <= 0)
- {
- brokerFrameMax = Integer.MAX_VALUE;
- }
-
- if(body.getFrameMax() > (long) brokerFrameMax)
- {
- throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
- "Attempt to set max frame size to " + body.getFrameMax()
- + "greater than the broker will allow: "
- + brokerFrameMax,
- body.getClazz(), body.getMethod(),
- connection.getMethodRegistry(),null);
- }
- else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode())
- {
- throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
- "Attempt to set max frame size to " + body.getFrameMax()
- + "which is smaller than the specification definined minimum: "
- + AMQConstant.FRAME_MIN_SIZE.getCode(),
- body.getClazz(), body.getMethod(),
- connection.getMethodRegistry(),null);
- }
- int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax();
- connection.setMaxFrameSize(frameMax);
-
- long maxChannelNumber = body.getChannelMax();
- //0 means no implied limit, except that forced by protocol limitations (0xFFFF)
- connection.setMaximumNumberOfChannels(maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber);
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
deleted file mode 100644
index 22e377c219..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-
-/**
- * @author Apache Software Foundation
- *
- *
- */
-public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody>
-{
- private static final ExchangeBoundHandler _instance = new ExchangeBoundHandler();
-
- public static final int OK = 0;
-
- public static final int EXCHANGE_NOT_FOUND = 1;
-
- public static final int QUEUE_NOT_FOUND = 2;
-
- public static final int NO_BINDINGS = 3;
-
- public static final int QUEUE_NOT_BOUND = 4;
-
- public static final int NO_QUEUE_BOUND_WITH_RK = 5;
-
- public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6;
-
- public static ExchangeBoundHandler getInstance()
- {
- return _instance;
- }
-
- private ExchangeBoundHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- ExchangeBoundBody body,
- int channelId) throws AMQException
- {
- VirtualHostImpl virtualHost = connection.getVirtualHost();
- MethodRegistry methodRegistry = connection.getMethodRegistry();
-
- final AMQChannel channel = connection.getChannel(channelId);
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
- channel.sync();
-
-
- AMQShortString exchangeName = body.getExchange();
- AMQShortString queueName = body.getQueue();
- AMQShortString routingKey = body.getRoutingKey();
- ExchangeBoundOkBody response;
-
- if(isDefaultExchange(exchangeName))
- {
- if(routingKey == null)
- {
- if(queueName == null)
- {
- response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueues().isEmpty() ? NO_BINDINGS : OK, null);
- }
- else
- {
- AMQQueue queue = virtualHost.getQueue(queueName.toString());
- if (queue == null)
- {
-
- response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
- AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText
- }
- else
- {
- response = methodRegistry.createExchangeBoundOkBody(OK, null);
- }
- }
- }
- else
- {
- if(queueName == null)
- {
- response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueue(routingKey.toString()) == null ? NO_QUEUE_BOUND_WITH_RK : OK, null);
- }
- else
- {
- AMQQueue queue = virtualHost.getQueue(queueName.toString());
- if (queue == null)
- {
-
- response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
- AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText
- }
- else
- {
- response = methodRegistry.createExchangeBoundOkBody(queueName.equals(routingKey) ? OK : SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, null);
- }
- }
- }
- }
- else
- {
- ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString());
- if (exchange == null)
- {
-
-
- response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
- AMQShortString.validValueOf("Exchange '" + exchangeName + "' not found"));
- }
- else if (routingKey == null)
- {
- if (queueName == null)
- {
- if (exchange.hasBindings())
- {
- response = methodRegistry.createExchangeBoundOkBody(OK, null);
- }
- else
- {
-
- response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode
- null); // replyText
- }
- }
- else
- {
-
- AMQQueue queue = virtualHost.getQueue(queueName.toString());
- if (queue == null)
- {
-
- response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
- AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText
- }
- else
- {
- if (exchange.isBound(queue))
- {
-
- response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
- null); // replyText
- }
- else
- {
-
- response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode
- AMQShortString.validValueOf("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText
- }
- }
- }
- }
- else if (queueName != null)
- {
- AMQQueue queue = virtualHost.getQueue(queueName.toString());
- if (queue == null)
- {
-
- response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
- AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText
- }
- else
- {
- String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString();
- if (exchange.isBound(bindingKey, queue))
- {
-
- response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
- null); // replyText
- }
- else
- {
-
- String message = "Queue '" + queueName + "' not bound with routing key '" +
- body.getRoutingKey() + "' to exchange '" + exchangeName + "'";
-
- response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
- AMQShortString.validValueOf(message)); // replyText
- }
- }
- }
- else
- {
- if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString()))
- {
-
- response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
- null); // replyText
- }
- else
- {
-
- response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode
- AMQShortString.validValueOf("No queue bound with routing key '" + body.getRoutingKey() +
- "' to exchange '" + exchangeName + "'")); // replyText
- }
- }
- }
- connection.writeFrame(response.generateFrame(channelId));
- }
-
- protected boolean isDefaultExchange(final AMQShortString exchangeName)
- {
- return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING);
- }
-
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
deleted file mode 100644
index f90f47d77c..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import java.security.AccessControlException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQConnectionException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.NoFactoryForTypeException;
-import org.apache.qpid.server.model.UnknownConfiguredObjectException;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.virtualhost.ExchangeExistsException;
-import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-
-public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody>
-{
- private static final Logger _logger = Logger.getLogger(ExchangeDeclareHandler.class);
-
- private static final ExchangeDeclareHandler _instance = new ExchangeDeclareHandler();
-
- public static ExchangeDeclareHandler getInstance()
- {
- return _instance;
- }
-
- private ExchangeDeclareHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- ExchangeDeclareBody body,
- int channelId) throws AMQException
- {
- VirtualHostImpl virtualHost = connection.getVirtualHost();
- final AMQChannel channel = connection.getChannel(channelId);
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
-
- final AMQShortString exchangeName = body.getExchange();
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + exchangeName);
- }
-
- ExchangeImpl exchange;
-
- if(isDefaultExchange(exchangeName))
- {
- if(!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(body.getType()))
- {
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: "
- + " of type "
- + ExchangeDefaults.DIRECT_EXCHANGE_CLASS
- + " to " + body.getType() +".",
- body.getClazz(), body.getMethod(),
- connection.getMethodRegistry(),null);
- }
- }
- else
- {
- if (body.getPassive())
- {
- exchange = virtualHost.getExchange(exchangeName.toString());
- if(exchange == null)
- {
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName,
- connection.getMethodRegistry());
- }
- else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getType().equals(body.getType().asString()))
- {
-
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
- exchangeName + " of type " + exchange.getType()
- + " to " + body.getType() +".",body.getClazz(), body.getMethod(),connection.getMethodRegistry(),null);
- }
-
- }
- else
- {
- try
- {
- String name = exchangeName == null ? null : exchangeName.intern().toString();
- String type = body.getType() == null ? null : body.getType().intern().toString();
-
- Map<String,Object> attributes = new HashMap<String, Object>();
- if(body.getArguments() != null)
- {
- attributes.putAll(FieldTable.convertToMap(body.getArguments()));
- }
- attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
- attributes.put(org.apache.qpid.server.model.Exchange.NAME,name);
- attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type);
- attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable());
- attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
- body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
- if(!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE))
- {
- attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
- }
- exchange = virtualHost.createExchange(attributes);
-
- }
- catch(ReservedExchangeNameException e)
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Attempt to declare exchange: " + exchangeName +
- " which begins with reserved prefix.", connection.getMethodRegistry());
-
- }
- catch(ExchangeExistsException e)
- {
- exchange = e.getExistingExchange();
- if(!new AMQShortString(exchange.getType()).equals(body.getType()))
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
- + exchangeName + " of type "
- + exchange.getType()
- + " to " + body.getType() + ".",
- connection.getMethodRegistry());
- }
- }
- catch(NoFactoryForTypeException e)
- {
- throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange type '"+e.getType()+"' for exchange '" + exchangeName + "'", connection.getMethodRegistry());
- }
- catch (AccessControlException e)
- {
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
- }
- catch (UnknownConfiguredObjectException e)
- {
- // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
- throw body.getConnectionException(AMQConstant.NOT_FOUND,
- "Unknown alternate exchange "
- + (e.getName() != null
- ? "name: \"" + e.getName() + "\""
- : "id: " + e.getId()),
- connection.getMethodRegistry());
- }
- catch (IllegalArgumentException e)
- {
- throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Error creating exchange '"+exchangeName+"': " + e.getMessage(),connection.getMethodRegistry());
- }
- }
- }
-
- if(!body.getNowait())
- {
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
- channel.sync();
- connection.writeFrame(responseBody.generateFrame(channelId));
- }
- }
-
- protected boolean isDefaultExchange(final AMQShortString exchangeName)
- {
- return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING);
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
deleted file mode 100644
index b5c10c190e..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import java.security.AccessControlException;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.framing.ExchangeDeleteOkBody;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
-import org.apache.qpid.server.virtualhost.RequiredExchangeException;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-
-public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody>
-{
- private static final ExchangeDeleteHandler _instance = new ExchangeDeleteHandler();
-
- public static ExchangeDeleteHandler getInstance()
- {
- return _instance;
- }
-
- private ExchangeDeleteHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- ExchangeDeleteBody body,
- int channelId) throws AMQException
- {
- VirtualHostImpl virtualHost = connection.getVirtualHost();
- final AMQChannel channel = connection.getChannel(channelId);
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
- channel.sync();
- try
- {
-
- if(isDefaultExchange(body.getExchange()))
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted", connection.getMethodRegistry());
- }
-
- final String exchangeName = body.getExchange().toString();
-
- final ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
- if(exchange == null)
- {
- throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange(),
- connection.getMethodRegistry());
- }
-
- virtualHost.removeExchange(exchange, !body.getIfUnused());
-
- ExchangeDeleteOkBody responseBody = connection.getMethodRegistry().createExchangeDeleteOkBody();
-
- connection.writeFrame(responseBody.generateFrame(channelId));
- }
-
- catch (ExchangeIsAlternateException e)
- {
- throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange",
- connection.getMethodRegistry());
-
- }
- catch (RequiredExchangeException e)
- {
- throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange '"+body.getExchange()+"' cannot be deleted",
- connection.getMethodRegistry());
- }
- catch (AccessControlException e)
- {
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
- }
- }
-
-
- protected boolean isDefaultExchange(final AMQShortString exchangeName)
- {
- return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING);
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/OnCurrentThreadExecutor.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/OnCurrentThreadExecutor.java
deleted file mode 100644
index 6ff511ea30..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/OnCurrentThreadExecutor.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import java.util.concurrent.Executor;
-
-/**
- * An executor that executes the task on the current thread.
- */
-public class OnCurrentThreadExecutor implements Executor
-{
- public void execute(Runnable command)
- {
- command.run();
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
deleted file mode 100644
index c47a4b528f..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import java.security.AccessControlException;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-
-public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
-{
- private static final Logger _log = Logger.getLogger(QueueBindHandler.class);
-
- private static final QueueBindHandler _instance = new QueueBindHandler();
-
- public static QueueBindHandler getInstance()
- {
- return _instance;
- }
-
- private QueueBindHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- QueueBindBody body,
- int channelId) throws AMQException
- {
- VirtualHostImpl virtualHost = connection.getVirtualHost();
- AMQChannel channel = connection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
-
- final AMQQueue queue;
- final AMQShortString routingKey;
-
- final AMQShortString queueName = body.getQueue();
-
- if (queueName == null)
- {
-
- queue = channel.getDefaultQueue();
-
- if (queue == null)
- {
- throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null",
- connection.getMethodRegistry());
- }
-
- if (body.getRoutingKey() == null)
- {
- routingKey = AMQShortString.valueOf(queue.getName());
- }
- else
- {
- routingKey = body.getRoutingKey().intern();
- }
- }
- else
- {
- queue = virtualHost.getQueue(queueName.toString());
- routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern();
- }
-
- if (queue == null)
- {
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.",
- connection.getMethodRegistry());
- }
-
- if(isDefaultExchange(body.getExchange()))
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot bind the queue " + queueName + " to the default exchange",
- connection.getMethodRegistry());
- }
-
- final String exchangeName = body.getExchange().toString();
-
- final ExchangeImpl exch = virtualHost.getExchange(exchangeName);
- if (exch == null)
- {
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.",
- connection.getMethodRegistry());
- }
-
-
- try
- {
-
- Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments());
- String bindingKey = String.valueOf(routingKey);
-
- if (!exch.isBound(bindingKey, arguments, queue))
- {
-
- if(!exch.addBinding(bindingKey, queue, arguments) && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exch.getType()))
- {
- exch.replaceBinding(bindingKey, queue, arguments);
- }
- }
- }
- catch (AccessControlException e)
- {
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
- }
-
- if (_log.isInfoEnabled())
- {
- _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
- }
- if (!body.getNowait())
- {
- channel.sync();
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
- connection.writeFrame(responseBody.generateFrame(channelId));
-
- }
- }
-
- protected boolean isDefaultExchange(final AMQShortString exchangeName)
- {
- return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING);
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
deleted file mode 100644
index 0e1016c319..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import java.security.AccessControlException;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.model.ExclusivityPolicy;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.virtualhost.QueueExistsException;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-
-public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
-{
- private static final Logger _logger = Logger.getLogger(QueueDeclareHandler.class);
-
- private static final QueueDeclareHandler _instance = new QueueDeclareHandler();
-
- public static QueueDeclareHandler getInstance()
- {
- return _instance;
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- QueueDeclareBody body,
- int channelId) throws AMQException
- {
- final AMQSessionModel session = connection.getChannel(channelId);
- VirtualHostImpl virtualHost = connection.getVirtualHost();
-
- final AMQShortString queueName;
-
- // if we aren't given a queue name, we create one which we return to the client
- if ((body.getQueue() == null) || (body.getQueue().length() == 0))
- {
- queueName = createName();
- }
- else
- {
- queueName = body.getQueue().intern();
- }
-
- AMQQueue queue;
-
- //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
-
- AMQChannel channel = connection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
-
- if(body.getPassive())
- {
- queue = virtualHost.getQueue(queueName.toString());
- if (queue == null)
- {
- String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
- throw body.getChannelException(AMQConstant.NOT_FOUND, msg, connection.getMethodRegistry());
- }
- else
- {
- if (!queue.verifySessionAccess(channel))
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue " + queue.getName() + " is exclusive, but not created on this Connection.",
- connection.getMethodRegistry());
- }
-
- //set this as the default queue on the channel:
- channel.setDefaultQueue(queue);
- }
- }
- else
- {
-
- try
- {
-
- queue = createQueue(channel, queueName, body, virtualHost, connection);
-
- }
- catch(QueueExistsException qe)
- {
-
- queue = qe.getExistingQueue();
-
- if (!queue.verifySessionAccess(channel))
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue " + queue.getName() + " is exclusive, but not created on this Connection.",
- connection.getMethodRegistry());
- }
- else if(queue.isExclusive() != body.getExclusive())
- {
-
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
- "Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: "
- + queue.isExclusive() + " requested " + body.getExclusive() + ")",
- connection.getMethodRegistry());
- }
- else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
- || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT)))
- {
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
- "Cannot re-declare queue '" + queue.getName() + "' with different lifetime policy (was: "
- + queue.getLifetimePolicy() + " requested autodelete: " + body.getAutoDelete() + ")",
- connection.getMethodRegistry());
- }
- else if(queue.isDurable() != body.getDurable())
- {
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
- "Cannot re-declare queue '" + queue.getName() + "' with different durability (was: "
- + queue.isDurable() + " requested " + body.getDurable() + ")",
- connection.getMethodRegistry());
- }
-
- }
- catch (AccessControlException e)
- {
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
- }
-
- //set this as the default queue on the channel:
- channel.setDefaultQueue(queue);
- }
-
- if (!body.getNowait())
- {
- channel.sync();
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- QueueDeclareOkBody responseBody =
- methodRegistry.createQueueDeclareOkBody(queueName,
- queue.getQueueDepthMessages(),
- queue.getConsumerCount());
- connection.writeFrame(responseBody.generateFrame(channelId));
-
- _logger.info("Queue " + queueName + " declared successfully");
- }
- }
-
- protected AMQShortString createName()
- {
- return new AMQShortString("tmp_" + UUID.randomUUID());
- }
-
- protected AMQQueue createQueue(final AMQChannel channel, final AMQShortString queueName,
- QueueDeclareBody body,
- final VirtualHostImpl virtualHost,
- final AMQProtocolSession session)
- throws AMQException, QueueExistsException
- {
-
- final boolean durable = body.getDurable();
- final boolean autoDelete = body.getAutoDelete();
- final boolean exclusive = body.getExclusive();
-
-
- Map<String, Object> attributes =
- QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments()));
- final String queueNameString = AMQShortString.toString(queueName);
- attributes.put(Queue.NAME, queueNameString);
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.DURABLE, durable);
-
- LifetimePolicy lifetimePolicy;
- ExclusivityPolicy exclusivityPolicy;
-
- if(exclusive)
- {
- lifetimePolicy = autoDelete
- ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
- : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
- exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION;
- }
- else
- {
- lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
- exclusivityPolicy = ExclusivityPolicy.NONE;
- }
-
- attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
- attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
-
-
- final AMQQueue queue = virtualHost.createQueue(attributes);
-
- return queue;
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
deleted file mode 100644
index 123c076a25..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import java.security.AccessControlException;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-
-public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
-{
- private static final QueueDeleteHandler _instance = new QueueDeleteHandler();
-
- public static QueueDeleteHandler getInstance()
- {
- return _instance;
- }
-
- private final boolean _failIfNotFound;
-
- public QueueDeleteHandler()
- {
- this(true);
- }
-
- public QueueDeleteHandler(boolean failIfNotFound)
- {
- _failIfNotFound = failIfNotFound;
-
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- QueueDeleteBody body,
- int channelId) throws AMQException
- {
- VirtualHostImpl virtualHost = connection.getVirtualHost();
-
- AMQChannel channel = connection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
- channel.sync();
- AMQQueue queue;
- if (body.getQueue() == null)
- {
-
- //get the default queue on the channel:
- queue = channel.getDefaultQueue();
- }
- else
- {
- queue = virtualHost.getQueue(body.getQueue().toString());
- }
-
- if (queue == null)
- {
- if (_failIfNotFound)
- {
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.",
- connection.getMethodRegistry());
- }
- }
- else
- {
- if (body.getIfEmpty() && !queue.isEmpty())
- {
- throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.",
- connection.getMethodRegistry());
- }
- else if (body.getIfUnused() && !queue.isUnused())
- {
- // TODO - Error code
- throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.",
- connection.getMethodRegistry());
- }
- else
- {
- if (!queue.verifySessionAccess(channel))
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue " + queue.getName() + " is exclusive, but not created on this Connection.",
- connection.getMethodRegistry());
- }
-
- int purged = 0;
- try
- {
- purged = virtualHost.removeQueue(queue);
- }
- catch (AccessControlException e)
- {
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
- }
-
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
- connection.writeFrame(responseBody.generateFrame(channelId));
- }
- }
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
deleted file mode 100644
index 2c06fef1e2..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import java.security.AccessControlException;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.QueuePurgeBody;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-
-public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
-{
- private static final QueuePurgeHandler _instance = new QueuePurgeHandler();
-
- public static QueuePurgeHandler getInstance()
- {
- return _instance;
- }
-
- private final boolean _failIfNotFound;
-
- public QueuePurgeHandler()
- {
- this(true);
- }
-
- public QueuePurgeHandler(boolean failIfNotFound)
- {
- _failIfNotFound = failIfNotFound;
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- QueuePurgeBody body,
- int channelId) throws AMQException
- {
- VirtualHostImpl virtualHost = connection.getVirtualHost();
-
- AMQChannel channel = connection.getChannel(channelId);
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
- AMQQueue queue;
- if(body.getQueue() == null)
- {
-
- //get the default queue on the channel:
- queue = channel.getDefaultQueue();
-
- if(queue == null)
- {
- if(_failIfNotFound)
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified.", connection.getMethodRegistry());
- }
- }
- }
- else
- {
- queue = virtualHost.getQueue(body.getQueue().toString());
- }
-
- if(queue == null)
- {
- if(_failIfNotFound)
- {
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.",
- connection.getMethodRegistry());
- }
- }
- else
- {
- if (!queue.verifySessionAccess(channel))
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue is exclusive, but not created on this Connection.", connection.getMethodRegistry());
- }
-
- long purged = 0;
- try
- {
- purged = queue.clearQueue();
- }
- catch (AccessControlException e)
- {
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
- }
-
-
- if(!body.getNowait())
- {
- channel.sync();
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
- connection.writeFrame(responseBody.generateFrame(channelId));
-
- }
- }
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
deleted file mode 100644
index 1b2d3c0653..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import java.security.AccessControlException;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.QueueUnbindBody;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-
-public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody>
-{
- private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class);
-
- private static final QueueUnbindHandler _instance = new QueueUnbindHandler();
-
- public static QueueUnbindHandler getInstance()
- {
- return _instance;
- }
-
- private QueueUnbindHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- QueueUnbindBody body,
- int channelId) throws AMQException
- {
-
- if (ProtocolVersion.v8_0.equals(connection.getProtocolVersion()))
- {
- // 0-8 does not support QueueUnbind
- throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + connection.getProtocolVersion(), null);
- }
-
- VirtualHostImpl virtualHost = connection.getVirtualHost();
-
- final AMQQueue queue;
- final AMQShortString routingKey;
-
-
- AMQChannel channel = connection.getChannel(channelId);
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
-
- if (body.getQueue() == null)
- {
-
- queue = channel.getDefaultQueue();
-
- if (queue == null)
- {
- throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null",
- connection.getMethodRegistry());
- }
-
- routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(false);
-
- }
- else
- {
- queue = virtualHost.getQueue(body.getQueue().toString());
- routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(false);
- }
-
- if (queue == null)
- {
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.",
- connection.getMethodRegistry());
- }
-
- if(isDefaultExchange(body.getExchange()))
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Cannot unbind the queue "
- + queue.getName()
- + " from the default exchange", connection.getMethodRegistry());
- }
-
- final ExchangeImpl exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString());
- if (exch == null)
- {
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.",
- connection.getMethodRegistry());
- }
-
- if(!exch.hasBinding(String.valueOf(routingKey), queue))
- {
- throw body.getChannelException(AMQConstant.NOT_FOUND,"No such binding", connection.getMethodRegistry());
- }
- else
- {
- try
- {
- exch.deleteBinding(String.valueOf(routingKey), queue);
- }
- catch (AccessControlException e)
- {
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
- }
- }
-
-
- if (_log.isInfoEnabled())
- {
- _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
- }
-
-
- final AMQMethodBody responseBody = connection.getMethodRegistry().createQueueUnbindOkBody();
- channel.sync();
- connection.writeFrame(responseBody.generateFrame(channelId));
- }
-
- protected boolean isDefaultExchange(final AMQShortString exchangeName)
- {
- return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING);
- }
-
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java
index ce735306ee..ada1b43ad2 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java
@@ -20,92 +20,71 @@
*/
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.security.AccessControlException;
+import java.util.Collection;
+import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.UUID;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.MessageOnlyCreditManager;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.NoFactoryForTypeException;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.UnknownConfiguredObjectException;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.AMQChannel;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
+import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
+import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
+import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
+import org.apache.qpid.server.protocol.v0_8.state.AMQState;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
+import org.apache.qpid.server.virtualhost.ExchangeExistsException;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.QueueExistsException;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
+import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class ServerMethodDispatcherImpl implements MethodDispatcher
{
- private final AMQProtocolSession<?> _connection;
-
- private static interface DispatcherFactory
- {
- public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection);
- }
-
- private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
- new HashMap<ProtocolVersion, DispatcherFactory>();
-
-
- static
- {
- _dispatcherFactories.put(ProtocolVersion.v8_0,
- new DispatcherFactory()
- {
- public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
- {
- return new ServerMethodDispatcherImpl_8_0(connection);
- }
- });
-
- _dispatcherFactories.put(ProtocolVersion.v0_9,
- new DispatcherFactory()
- {
- public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
- {
- return new ServerMethodDispatcherImpl_0_9(connection);
- }
- });
- _dispatcherFactories.put(ProtocolVersion.v0_91,
- new DispatcherFactory()
- {
- public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
- {
- return new ServerMethodDispatcherImpl_0_91(connection);
- }
- });
-
- }
-
-
- private static final AccessRequestHandler _accessRequestHandler = AccessRequestHandler.getInstance();
- private static final ChannelCloseHandler _channelCloseHandler = ChannelCloseHandler.getInstance();
- private static final ChannelOpenHandler _channelOpenHandler = ChannelOpenHandler.getInstance();
- private static final ChannelCloseOkHandler _channelCloseOkHandler = ChannelCloseOkHandler.getInstance();
- private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
- private static final ConnectionCloseOkMethodHandler _connectionCloseOkMethodHandler = ConnectionCloseOkMethodHandler.getInstance();
- private static final ConnectionOpenMethodHandler _connectionOpenMethodHandler = ConnectionOpenMethodHandler.getInstance();
- private static final ConnectionTuneOkMethodHandler _connectionTuneOkMethodHandler = ConnectionTuneOkMethodHandler.getInstance();
- private static final ConnectionSecureOkMethodHandler _connectionSecureOkMethodHandler = ConnectionSecureOkMethodHandler.getInstance();
- private static final ConnectionStartOkMethodHandler _connectionStartOkMethodHandler = ConnectionStartOkMethodHandler.getInstance();
- private static final ExchangeDeclareHandler _exchangeDeclareHandler = ExchangeDeclareHandler.getInstance();
- private static final ExchangeDeleteHandler _exchangeDeleteHandler = ExchangeDeleteHandler.getInstance();
- private static final ExchangeBoundHandler _exchangeBoundHandler = ExchangeBoundHandler.getInstance();
- private static final BasicAckMethodHandler _basicAckMethodHandler = BasicAckMethodHandler.getInstance();
- private static final BasicRecoverMethodHandler _basicRecoverMethodHandler = BasicRecoverMethodHandler.getInstance();
- private static final BasicConsumeMethodHandler _basicConsumeMethodHandler = BasicConsumeMethodHandler.getInstance();
- private static final BasicGetMethodHandler _basicGetMethodHandler = BasicGetMethodHandler.getInstance();
- private static final BasicCancelMethodHandler _basicCancelMethodHandler = BasicCancelMethodHandler.getInstance();
- private static final BasicPublishMethodHandler _basicPublishMethodHandler = BasicPublishMethodHandler.getInstance();
- private static final BasicQosHandler _basicQosHandler = BasicQosHandler.getInstance();
- private static final QueueBindHandler _queueBindHandler = QueueBindHandler.getInstance();
- private static final QueueDeclareHandler _queueDeclareHandler = QueueDeclareHandler.getInstance();
- private static final QueueDeleteHandler _queueDeleteHandler = QueueDeleteHandler.getInstance();
- private static final QueuePurgeHandler _queuePurgeHandler = QueuePurgeHandler.getInstance();
- private static final ChannelFlowHandler _channelFlowHandler = ChannelFlowHandler.getInstance();
- private static final TxSelectHandler _txSelectHandler = TxSelectHandler.getInstance();
- private static final TxCommitHandler _txCommitHandler = TxCommitHandler.getInstance();
- private static final TxRollbackHandler _txRollbackHandler = TxRollbackHandler.getInstance();
- private static final BasicRejectMethodHandler _basicRejectMethodHandler = BasicRejectMethodHandler.getInstance();
+ private static final Logger _logger = Logger.getLogger(ServerMethodDispatcherImpl.class);
+ private final AMQProtocolSession<?> _connection;
public static MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
{
- return _dispatcherFactories.get(connection.getProtocolVersion()).createMethodDispatcher(connection);
+ return new ServerMethodDispatcherImpl(connection);
}
@@ -122,61 +101,618 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
{
- _accessRequestHandler.methodReceived(getConnection(), body, channelId);
+ final AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ if(ProtocolVersion.v0_91.equals(_connection.getProtocolVersion()) )
+ {
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9");
+ }
+
+ // We don't implement access control class, but to keep clients happy that expect it
+ // always use the "0" ticket.
+ AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
+ channel.sync();
+ _connection.writeFrame(response.generateFrame(channelId));
return true;
}
public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
{
- _basicAckMethodHandler.methodReceived(getConnection(), body, channelId);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Ack(Tag:" + body.getDeliveryTag() + ":Mult:" + body.getMultiple() + ") received on channel " + channelId);
+ }
+
+ final AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ // this method throws an AMQException if the delivery tag is not known
+ channel.acknowledgeMessage(body.getDeliveryTag(), body.getMultiple());
return true;
}
public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
{
- _basicCancelMethodHandler.methodReceived(getConnection(), body, channelId);
+ final AMQChannel channel = _connection.getChannel(channelId);
+
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("BasicCancel: for:" + body.getConsumerTag() +
+ " nowait:" + body.getNowait());
+ }
+
+ channel.unsubscribeConsumer(body.getConsumerTag());
+ if (!body.getNowait())
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(body.getConsumerTag());
+ channel.sync();
+ _connection.writeFrame(cancelOkBody.generateFrame(channelId));
+ }
return true;
}
public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException
{
- _basicConsumeMethodHandler.methodReceived(getConnection(), body, channelId);
+ AMQChannel channel = _connection.getChannel(channelId);
+ VirtualHostImpl<?,?,?> vHost = _connection.getVirtualHost();
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ else
+ {
+ channel.sync();
+ String queueName = body.getQueue() == null ? null : body.getQueue().asString();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("BasicConsume: from '" + queueName +
+ "' for:" + body.getConsumerTag() +
+ " nowait:" + body.getNowait() +
+ " args:" + body.getArguments());
+ }
+
+ MessageSource queue = queueName == null ? channel.getDefaultQueue() : vHost.getQueue(queueName);
+ final Collection<MessageSource> sources = new HashSet<>();
+ if(queue != null)
+ {
+ sources.add(queue);
+ }
+ else if(vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
+ && body.getArguments() != null
+ && body.getArguments().get("x-multiqueue") instanceof Collection)
+ {
+ for(Object object : (Collection<Object>) body.getArguments().get("x-multiqueue"))
+ {
+ String sourceName = String.valueOf(object);
+ sourceName = sourceName.trim();
+ if(sourceName.length() != 0)
+ {
+ MessageSource source = vHost.getMessageSource(sourceName);
+ if(source == null)
+ {
+ sources.clear();
+ break;
+ }
+ else
+ {
+ sources.add(source);
+ }
+ }
+ }
+ queueName = body.getArguments().get("x-multiqueue").toString();
+ }
+
+ if (sources.isEmpty())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("No queue for '" + queueName + "'");
+ }
+ if (queueName != null)
+ {
+ String msg = "No such queue, '" + queueName + "'";
+ throw body.getChannelException(AMQConstant.NOT_FOUND, msg, _connection.getMethodRegistry());
+ }
+ else
+ {
+ String msg = "No queue name provided, no default queue defined.";
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg, _connection.getMethodRegistry());
+ }
+ }
+ else
+ {
+ final AMQShortString consumerTagName;
+
+ if (body.getConsumerTag() != null)
+ {
+ consumerTagName = body.getConsumerTag().intern(false);
+ }
+ else
+ {
+ consumerTagName = null;
+ }
+
+ try
+ {
+ if(consumerTagName == null || channel.getSubscription(consumerTagName) == null)
+ {
+
+ AMQShortString consumerTag = channel.consumeFromSource(consumerTagName,
+ sources,
+ !body.getNoAck(),
+ body.getArguments(),
+ body.getExclusive(),
+ body.getNoLocal());
+ if (!body.getNowait())
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
+ }
+ }
+ else
+ {
+ AMQShortString msg = AMQShortString.validValueOf("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg, // replytext
+ body.getClazz(),
+ body.getMethod());
+ _connection.writeFrame(responseBody.generateFrame(0));
+ }
+
+ }
+ catch (AMQInvalidArgumentException ise)
+ {
+ _logger.debug("Closing connection due to invalid selector");
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(),
+ AMQShortString.validValueOf(ise.getMessage()),
+ body.getClazz(),
+ body.getMethod());
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
+
+ }
+ catch (AMQQueue.ExistingExclusiveConsumer e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " as it already has an existing exclusive consumer",
+ _connection.getMethodRegistry());
+ }
+ catch (AMQQueue.ExistingConsumerPreventsExclusive e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " exclusively as it already has a consumer",
+ _connection.getMethodRegistry());
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " permission denied", _connection.getMethodRegistry());
+ }
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " as it already has an incompatible exclusivity policy",
+ _connection.getMethodRegistry());
+ }
+
+ }
+ }
return true;
}
public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException
{
- _basicGetMethodHandler.methodReceived(getConnection(), body, channelId);
+
+ VirtualHostImpl vHost = _connection.getVirtualHost();
+
+ AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ else
+ {
+ channel.sync();
+ AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().toString());
+ if (queue == null)
+ {
+ _logger.info("No queue for '" + body.getQueue() + "'");
+ if(body.getQueue()!=null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,
+ "No such queue, '" + body.getQueue() + "'",
+ _connection.getMethodRegistry());
+ }
+ else
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "No queue name provided, no default queue defined.",
+ _connection.getMethodRegistry());
+ }
+ }
+ else
+ {
+
+ try
+ {
+ if (!performGet(queue, _connection, channel, !body.getNoAck()))
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
+
+
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(), _connection.getMethodRegistry());
+ }
+ catch (MessageSource.ExistingExclusiveConsumer e)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue has an exclusive consumer",
+ _connection.getMethodRegistry());
+ }
+ catch (MessageSource.ExistingConsumerPreventsExclusive e)
+ {
+ throw body.getConnectionException(AMQConstant.INTERNAL_ERROR,
+ "The GET request has been evaluated as an exclusive consumer, " +
+ "this is likely due to a programming error in the Qpid broker",
+ _connection.getMethodRegistry());
+ }
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue has an incompatible exclusivit policy",
+ _connection.getMethodRegistry());
+ }
+ }
+ }
return true;
}
+ public static boolean performGet(final AMQQueue queue,
+ final AMQProtocolSession session,
+ final AMQChannel channel,
+ final boolean acks)
+ throws AMQException, MessageSource.ExistingConsumerPreventsExclusive,
+ MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused
+ {
+
+ final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
+
+ final GetDeliveryMethod getDeliveryMethod =
+ new GetDeliveryMethod(singleMessageCredit, session, channel, queue);
+ final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
+ {
+
+ public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag)
+ {
+ channel.addUnacknowledgedMessage(entry, deliveryTag, null);
+ }
+ };
+
+ ConsumerTarget_0_8 target;
+ EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES);
+ if(acks)
+ {
+
+ target = ConsumerTarget_0_8.createAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ }
+ else
+ {
+ target = ConsumerTarget_0_8.createGetNoAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ }
+
+ ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
+ sub.flush();
+ sub.close();
+ return(getDeliveryMethod.hasDeliveredMessage());
+
+
+ }
+
+
+ private static class GetDeliveryMethod implements ClientDeliveryMethod
+ {
+
+ private final FlowCreditManager _singleMessageCredit;
+ private final AMQProtocolSession _session;
+ private final AMQChannel _channel;
+ private final AMQQueue _queue;
+ private boolean _deliveredMessage;
+
+ public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
+ final AMQProtocolSession session,
+ final AMQChannel channel, final AMQQueue queue)
+ {
+ _singleMessageCredit = singleMessageCredit;
+ _session = session;
+ _channel = channel;
+ _queue = queue;
+ }
+
+ @Override
+ public long deliverToClient(final ConsumerImpl sub, final ServerMessage message,
+ final InstanceProperties props, final long deliveryTag)
+ {
+ _singleMessageCredit.useCreditForMessage(message.getSize());
+ long size =_session.getProtocolOutputConverter().writeGetOk(message,
+ props,
+ _channel.getChannelId(),
+ deliveryTag,
+ _queue.getQueueDepthMessages());
+
+ _deliveredMessage = true;
+ return size;
+ }
+
+ public boolean hasDeliveredMessage()
+ {
+ return _deliveredMessage;
+ }
+ }
+
public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException
{
- _basicPublishMethodHandler.methodReceived(getConnection(), body, channelId);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Publish received on channel " + channelId);
+ }
+
+ AMQShortString exchangeName = body.getExchange();
+ VirtualHostImpl vHost = _connection.getVirtualHost();
+
+ // TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
+
+ MessageDestination destination;
+
+ if (exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName))
+ {
+ destination = vHost.getDefaultDestination();
+ }
+ else
+ {
+ destination = vHost.getMessageDestination(exchangeName.toString());
+ }
+
+ // if the exchange does not exist we raise a channel exception
+ if (destination == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name",
+ _connection.getMethodRegistry());
+ }
+ else
+ {
+ // The partially populated BasicDeliver frame plus the received route body
+ // is stored in the channel. Once the final body frame has been received
+ // it is routed to the exchange.
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ MessagePublishInfo info = new MessagePublishInfo(body.getExchange(),
+ body.getImmediate(),
+ body.getMandatory(),
+ body.getRoutingKey());
+ info.setExchange(exchangeName);
+ try
+ {
+ channel.setPublishFrame(info, destination);
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+ }
return true;
}
public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException
{
- _basicQosHandler.methodReceived(getConnection(), body, channelId);
+ AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ channel.sync();
+ channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount());
+
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
return true;
}
public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException
{
- _basicRecoverMethodHandler.methodReceived(getConnection(), body, channelId);
+ _logger.debug("Recover received on protocol session " + _connection
+ + " and channel " + channelId);
+ AMQChannel channel = _connection.getChannel(channelId);
+
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ channel.resend();
+
+ // Qpid 0-8 hacks a synchronous -ok onto recover.
+ // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
+ if(_connection.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
+ channel.sync();
+ _connection.writeFrame(recoverOk.generateFrame(channelId));
+
+ }
+
return true;
}
public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException
{
- _basicRejectMethodHandler.methodReceived(getConnection(), body, channelId);
+
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting:" + body.getDeliveryTag() +
+ ": Requeue:" + body.getRequeue() +
+ " on channel:" + channel.debugIdentity());
+ }
+
+ long deliveryTag = body.getDeliveryTag();
+
+ MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+
+ if (message == null)
+ {
+ _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
+ }
+ else
+ {
+
+ if (message.getMessage() == null)
+ {
+ _logger.warn("Message has already been purged, unable to Reject.");
+ }
+ else
+ {
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() +
+ ": Requeue:" + body.getRequeue() +
+ " on channel:" + channel.debugIdentity());
+ }
+
+ if (body.getRequeue())
+ {
+ //this requeue represents a message rejected from the pre-dispatch queue
+ //therefore we need to amend the delivery counter.
+ message.decrementDeliveryCount();
+
+ channel.requeue(deliveryTag);
+ }
+ else
+ {
+ // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here
+ // as it would prevent redelivery
+ // message.reject();
+
+ final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
+ _logger.debug("maxDeliveryCountEnabled: "
+ + maxDeliveryCountEnabled
+ + " deliveryTag "
+ + deliveryTag);
+ if (maxDeliveryCountEnabled)
+ {
+ final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
+ _logger.debug("deliveredTooManyTimes: "
+ + deliveredTooManyTimes
+ + " deliveryTag "
+ + deliveryTag);
+ if (deliveredTooManyTimes)
+ {
+ channel.deadLetter(body.getDeliveryTag());
+ }
+ else
+ {
+ //this requeue represents a message rejected because of a recover/rollback that we
+ //are not ready to DLQ. We rely on the reject command to resend from the unacked map
+ //and therefore need to increment the delivery counter so we cancel out the effect
+ //of the AMQChannel#resend() decrement.
+ message.incrementDeliveryCount();
+ }
+ }
+ else
+ {
+ channel.requeue(deliveryTag);
+ }
+ }
+ }
+ }
return true;
}
public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
{
- _channelOpenHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+ // Protect the broker against out of order frame request.
+ if (virtualHost == null)
+ {
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null);
+ }
+ _logger.info("Connecting to: " + virtualHost.getName());
+
+ final AMQChannel channel = new AMQChannel(_connection, channelId, virtualHost.getMessageStore());
+
+ _connection.addChannel(channel);
+
+ ChannelOpenOkBody response;
+
+
+ response = _connection.getMethodRegistry().createChannelOpenOkBody();
+
+
+ _connection.writeFrame(response.generateFrame(channelId));
return true;
}
@@ -186,20 +722,6 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
throw new UnexpectedMethodException(body);
}
- @Override
- public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody body, final int channelId)
- throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- @Override
- public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody body,
- final int channelId)
- throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
{
@@ -238,21 +760,64 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
{
- _channelCloseHandler.methodReceived(getConnection(), body, channelId);
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Received channel close for id " + channelId
+ + " citing class " + body.getClassId() +
+ " and method " + body.getMethodId());
+ }
+
+
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getConnectionException(AMQConstant.CHANNEL_ERROR,
+ "Trying to close unknown channel",
+ _connection.getMethodRegistry());
+ }
+ channel.sync();
+ _connection.closeChannel(channelId);
+ // Client requested closure so we don't wait for ok we send it
+ _connection.closeChannelOk(channelId);
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
return true;
}
public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
{
- _channelCloseOkHandler.methodReceived(getConnection(), body, channelId);
+
+ _logger.info("Received channel-close-ok for channel-id " + channelId);
+
+ // Let the Protocol Session know the channel is now closed.
+ _connection.closeChannelOk(channelId);
return true;
}
public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
{
- _channelFlowHandler.methodReceived(getConnection(), body, channelId);
+ final AMQProtocolSession<?> connection = getConnection();
+
+
+ AMQChannel channel = connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
+ }
+ channel.sync();
+ channel.setSuspended(!body.getActive());
+ _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive());
+
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive());
+ connection.writeFrame(responseBody.generateFrame(channelId));
return true;
}
@@ -269,23 +834,103 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
{
- _connectionOpenMethodHandler.methodReceived(getConnection(), body, channelId);
+
+ //ignore leading '/'
+ String virtualHostName;
+ if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/')
+ {
+ virtualHostName = new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString();
+ }
+ else
+ {
+ virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost());
+ }
+
+ VirtualHostImpl virtualHost = ((AmqpPort) _connection.getPort()).getVirtualHost(virtualHostName);
+
+ if (virtualHost == null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'",
+ _connection.getMethodRegistry());
+ }
+ else
+ {
+ // Check virtualhost access
+ if (virtualHost.getState() != State.ACTIVE)
+ {
+ throw body.getConnectionException(AMQConstant.CONNECTION_FORCED,
+ "Virtual host '" + virtualHost.getName() + "' is not active",
+ _connection.getMethodRegistry());
+ }
+
+ _connection.setVirtualHost(virtualHost);
+ try
+ {
+ virtualHost.getSecurityManager().authoriseCreateConnection(_connection);
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+
+ // See Spec (0.8.2). Section 3.1.2 Virtual Hosts
+ if (_connection.getContextKey() == null)
+ {
+ _connection.setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis())));
+ }
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
+
+ _connection.changeState(AMQState.CONNECTION_OPEN);
+
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
return true;
}
public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
{
- _connectionCloseMethodHandler.methodReceived(getConnection(), body, channelId);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" +
+ body.getReplyText() + " for " + _connection);
+ }
+ try
+ {
+ _connection.closeSession();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error closing protocol session: " + e, e);
+ }
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
+ _connection.closeProtocolSession();
+
return true;
}
public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
{
- _connectionCloseOkMethodHandler.methodReceived(
- getConnection(),
- body, channelId);
+ _logger.info("Received Connection-close-ok");
+
+ try
+ {
+ _connection.changeState(AMQState.CONNECTION_CLOSED);
+ _connection.closeSession();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error closing protocol session: " + e, e);
+ }
return true;
}
@@ -368,92 +1013,1246 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
{
- _connectionSecureOkMethodHandler.methodReceived(
- getConnection(),
- body, channelId);
+ Broker<?> broker = _connection.getBroker();
+
+ SubjectCreator subjectCreator = _connection.getSubjectCreator();
+
+ SaslServer ss = _connection.getSaslServer();
+ if (ss == null)
+ {
+ throw new AMQException("No SASL context set up in session");
+ }
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
+ switch (authResult.getStatus())
+ {
+ case ERROR:
+ Exception cause = authResult.getCause();
+
+ _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
+ // This should be abstracted
+ _connection.changeState(AMQState.CONNECTION_CLOSING);
+
+ ConnectionCloseBody connectionCloseBody =
+ methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
+ AMQConstant.NOT_ALLOWED.getName(),
+ body.getClazz(),
+ body.getMethod());
+
+ _connection.writeFrame(connectionCloseBody.generateFrame(0));
+ disposeSaslServer(_connection);
+ break;
+ case SUCCESS:
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Connected as: " + authResult.getSubject());
+ }
+ _connection.changeState(AMQState.CONNECTION_NOT_TUNED);
+
+ int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+
+ if(frameMax <= 0)
+ {
+ frameMax = Integer.MAX_VALUE;
+ }
+
+ ConnectionTuneBody tuneBody =
+ methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
+ frameMax,
+ broker.getConnection_heartBeatDelay());
+ _connection.writeFrame(tuneBody.generateFrame(0));
+ _connection.setAuthorizedSubject(authResult.getSubject());
+ disposeSaslServer(_connection);
+ break;
+ case CONTINUE:
+ _connection.changeState(AMQState.CONNECTION_NOT_AUTH);
+
+ ConnectionSecureBody
+ secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
+ _connection.writeFrame(secureBody.generateFrame(0));
+ }
return true;
}
+ private void disposeSaslServer(AMQProtocolSession ps)
+ {
+ SaslServer ss = ps.getSaslServer();
+ if (ss != null)
+ {
+ ps.setSaslServer(null);
+ try
+ {
+ ss.dispose();
+ }
+ catch (SaslException e)
+ {
+ _logger.error("Error disposing of Sasl server: " + e);
+ }
+ }
+ }
+
public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
{
- _connectionStartOkMethodHandler.methodReceived(
- getConnection(),
- body, channelId);
+ Broker<?> broker = _connection.getBroker();
+
+ _logger.info("SASL Mechanism selected: " + body.getMechanism());
+ _logger.info("Locale selected: " + body.getLocale());
+
+ SubjectCreator subjectCreator = _connection.getSubjectCreator();
+ SaslServer ss = null;
+ try
+ {
+ ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()),
+ _connection.getLocalFQDN(),
+ _connection.getPeerPrincipal());
+
+ if (ss == null)
+ {
+ throw body.getConnectionException(AMQConstant.RESOURCE_ERROR,
+ "Unable to create SASL Server:" + body.getMechanism(),
+ _connection.getMethodRegistry());
+ }
+
+ _connection.setSaslServer(ss);
+
+ final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
+ //save clientProperties
+ _connection.setClientProperties(body.getClientProperties());
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ switch (authResult.getStatus())
+ {
+ case ERROR:
+ Exception cause = authResult.getCause();
+
+ _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
+ _connection.changeState(AMQState.CONNECTION_CLOSING);
+
+ ConnectionCloseBody closeBody =
+ methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ AMQConstant.NOT_ALLOWED.getName(),
+ body.getClazz(),
+ body.getMethod());
+
+ _connection.writeFrame(closeBody.generateFrame(0));
+ disposeSaslServer(_connection);
+ break;
+
+ case SUCCESS:
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Connected as: " + authResult.getSubject());
+ }
+ _connection.setAuthorizedSubject(authResult.getSubject());
+
+ _connection.changeState(AMQState.CONNECTION_NOT_TUNED);
+ int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+
+ if(frameMax <= 0)
+ {
+ frameMax = Integer.MAX_VALUE;
+ }
+
+ ConnectionTuneBody
+ tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
+ frameMax,
+ broker.getConnection_heartBeatDelay());
+ _connection.writeFrame(tuneBody.generateFrame(0));
+ break;
+ case CONTINUE:
+ _connection.changeState(AMQState.CONNECTION_NOT_AUTH);
+
+ ConnectionSecureBody
+ secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
+ _connection.writeFrame(secureBody.generateFrame(0));
+ }
+ }
+ catch (SaslException e)
+ {
+ disposeSaslServer(_connection);
+ throw new AMQException("SASL error: " + e, e);
+ }
return true;
}
public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
{
- _connectionTuneOkMethodHandler.methodReceived(getConnection(), body, channelId);
+ final AMQProtocolSession<?> connection = getConnection();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(body);
+ }
+ connection.changeState(AMQState.CONNECTION_NOT_OPENED);
+
+ connection.initHeartbeats(body.getHeartbeat());
+
+ int brokerFrameMax = connection.getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+ if(brokerFrameMax <= 0)
+ {
+ brokerFrameMax = Integer.MAX_VALUE;
+ }
+
+ if(body.getFrameMax() > (long) brokerFrameMax)
+ {
+ throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
+ "Attempt to set max frame size to " + body.getFrameMax()
+ + " greater than the broker will allow: "
+ + brokerFrameMax,
+ body.getClazz(), body.getMethod(),
+ connection.getMethodRegistry(),null);
+ }
+ else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode())
+ {
+ throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
+ "Attempt to set max frame size to " + body.getFrameMax()
+ + " which is smaller than the specification definined minimum: "
+ + AMQConstant.FRAME_MIN_SIZE.getCode(),
+ body.getClazz(), body.getMethod(),
+ connection.getMethodRegistry(),null);
+ }
+ int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax();
+ connection.setMaxFrameSize(frameMax);
+
+ long maxChannelNumber = body.getChannelMax();
+ //0 means no implied limit, except that forced by protocol limitations (0xFFFF)
+ connection.setMaximumNumberOfChannels(maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber);
return true;
}
+ public static final int OK = 0;
+ public static final int EXCHANGE_NOT_FOUND = 1;
+ public static final int QUEUE_NOT_FOUND = 2;
+ public static final int NO_BINDINGS = 3;
+ public static final int QUEUE_NOT_BOUND = 4;
+ public static final int NO_QUEUE_BOUND_WITH_RK = 5;
+ public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6;
+
public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
{
- _exchangeBoundHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ final AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ channel.sync();
+
+
+ AMQShortString exchangeName = body.getExchange();
+ AMQShortString queueName = body.getQueue();
+ AMQShortString routingKey = body.getRoutingKey();
+ ExchangeBoundOkBody response;
+
+ if(isDefaultExchange(exchangeName))
+ {
+ if(routingKey == null)
+ {
+ if(queueName == null)
+ {
+ response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueues().isEmpty() ? NO_BINDINGS : OK, null);
+ }
+ else
+ {
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
+ {
+ response = methodRegistry.createExchangeBoundOkBody(OK, null);
+ }
+ }
+ }
+ else
+ {
+ if(queueName == null)
+ {
+ response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueue(routingKey.toString()) == null ? NO_QUEUE_BOUND_WITH_RK : OK, null);
+ }
+ else
+ {
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
+ {
+ response = methodRegistry.createExchangeBoundOkBody(queueName.equals(routingKey) ? OK : SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, null);
+ }
+ }
+ }
+ }
+ else
+ {
+ ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString());
+ if (exchange == null)
+ {
+
+
+ response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
+ AMQShortString.validValueOf("Exchange '" + exchangeName + "' not found"));
+ }
+ else if (routingKey == null)
+ {
+ if (queueName == null)
+ {
+ if (exchange.hasBindings())
+ {
+ response = methodRegistry.createExchangeBoundOkBody(OK, null);
+ }
+ else
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode
+ null); // replyText
+ }
+ }
+ else
+ {
+
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
+ {
+ if (exchange.isBound(queue))
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode
+ AMQShortString.validValueOf("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText
+ }
+ }
+ }
+ }
+ else if (queueName != null)
+ {
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
+ {
+ String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString();
+ if (exchange.isBound(bindingKey, queue))
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ String message = "Queue '" + queueName + "' not bound with routing key '" +
+ body.getRoutingKey() + "' to exchange '" + exchangeName + "'";
+
+ response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
+ AMQShortString.validValueOf(message)); // replyText
+ }
+ }
+ }
+ else
+ {
+ if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString()))
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode
+ AMQShortString.validValueOf("No queue bound with routing key '" + body.getRoutingKey() +
+ "' to exchange '" + exchangeName + "'")); // replyText
+ }
+ }
+ }
+ _connection.writeFrame(response.generateFrame(channelId));
return true;
}
public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException
{
- _exchangeDeclareHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ final AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ final AMQShortString exchangeName = body.getExchange();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + exchangeName);
+ }
+
+ ExchangeImpl exchange;
+
+ if(isDefaultExchange(exchangeName))
+ {
+ if(!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(body.getType()))
+ {
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: "
+ + " of type "
+ + ExchangeDefaults.DIRECT_EXCHANGE_CLASS
+ + " to " + body.getType() +".",
+ body.getClazz(), body.getMethod(),
+ _connection.getMethodRegistry(),null);
+ }
+ }
+ else
+ {
+ if (body.getPassive())
+ {
+ exchange = virtualHost.getExchange(exchangeName.toString());
+ if(exchange == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName,
+ _connection.getMethodRegistry());
+ }
+ else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getType().equals(body.getType().asString()))
+ {
+
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
+ exchangeName + " of type " + exchange.getType()
+ + " to " + body.getType() +".",
+ body.getClazz(), body.getMethod(),
+ _connection.getMethodRegistry(),null);
+ }
+
+ }
+ else
+ {
+ try
+ {
+ String name = exchangeName == null ? null : exchangeName.intern().toString();
+ String type = body.getType() == null ? null : body.getType().intern().toString();
+
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ if(body.getArguments() != null)
+ {
+ attributes.putAll(FieldTable.convertToMap(body.getArguments()));
+ }
+ attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME,name);
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type);
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable());
+ attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+ body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ if(!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE))
+ {
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+ }
+ exchange = virtualHost.createExchange(attributes);
+
+ }
+ catch(ReservedExchangeNameException e)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Attempt to declare exchange: " + exchangeName +
+ " which begins with reserved prefix.",
+ _connection.getMethodRegistry());
+
+ }
+ catch(ExchangeExistsException e)
+ {
+ exchange = e.getExistingExchange();
+ if(!new AMQShortString(exchange.getType()).equals(body.getType()))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ + exchangeName + " of type "
+ + exchange.getType()
+ + " to " + body.getType() + ".",
+ _connection.getMethodRegistry());
+ }
+ }
+ catch(NoFactoryForTypeException e)
+ {
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID,
+ "Unknown exchange type '"
+ + e.getType()
+ + "' for exchange '"
+ + exchangeName
+ + "'",
+ _connection.getMethodRegistry());
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+ catch (UnknownConfiguredObjectException e)
+ {
+ // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,
+ "Unknown alternate exchange "
+ + (e.getName() != null
+ ? "name: \"" + e.getName() + "\""
+ : "id: " + e.getId()),
+ _connection.getMethodRegistry());
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID,
+ "Error creating exchange '"
+ + exchangeName
+ + "': "
+ + e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+ }
+ }
+
+ if(!body.getNowait())
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+ channel.sync();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
return true;
}
public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException
{
- _exchangeDeleteHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ final AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ channel.sync();
+ try
+ {
+
+ if(isDefaultExchange(body.getExchange()))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Default Exchange cannot be deleted",
+ _connection.getMethodRegistry());
+ }
+
+ final String exchangeName = body.getExchange().toString();
+
+ final ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
+ if(exchange == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange(),
+ _connection.getMethodRegistry());
+ }
+
+ virtualHost.removeExchange(exchange, !body.getIfUnused());
+
+ ExchangeDeleteOkBody responseBody = _connection.getMethodRegistry().createExchangeDeleteOkBody();
+
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
+
+ catch (ExchangeIsAlternateException e)
+ {
+ throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange",
+ _connection.getMethodRegistry());
+
+ }
+ catch (RequiredExchangeException e)
+ {
+ throw body.getChannelException(AMQConstant.NOT_ALLOWED,
+ "Exchange '" + body.getExchange() + "' cannot be deleted",
+ _connection.getMethodRegistry());
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
return true;
}
+ private boolean isDefaultExchange(final AMQShortString exchangeName)
+ {
+ return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING);
+ }
+
public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
{
- _queueBindHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ final AMQQueue queue;
+ final AMQShortString routingKey;
+
+ final AMQShortString queueName = body.getQueue();
+
+ if (queueName == null)
+ {
+
+ queue = channel.getDefaultQueue();
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND,
+ "No default queue defined on channel and queue was null",
+ _connection.getMethodRegistry());
+ }
+
+ if (body.getRoutingKey() == null)
+ {
+ routingKey = AMQShortString.valueOf(queue.getName());
+ }
+ else
+ {
+ routingKey = body.getRoutingKey().intern();
+ }
+ }
+ else
+ {
+ queue = virtualHost.getQueue(queueName.toString());
+ routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern();
+ }
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.",
+ _connection.getMethodRegistry());
+ }
+
+ if(isDefaultExchange(body.getExchange()))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Cannot bind the queue " + queueName + " to the default exchange",
+ _connection.getMethodRegistry());
+ }
+
+ final String exchangeName = body.getExchange().toString();
+
+ final ExchangeImpl exch = virtualHost.getExchange(exchangeName);
+ if (exch == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.",
+ _connection.getMethodRegistry());
+ }
+
+
+ try
+ {
+
+ Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments());
+ String bindingKey = String.valueOf(routingKey);
+
+ if (!exch.isBound(bindingKey, arguments, queue))
+ {
+
+ if(!exch.addBinding(bindingKey, queue, arguments) && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exch.getType()))
+ {
+ exch.replaceBinding(bindingKey, queue, arguments);
+ }
+ }
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
+ }
+ if (!body.getNowait())
+ {
+ channel.sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
+ }
return true;
}
public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException
{
- _queueDeclareHandler.methodReceived(getConnection(), body, channelId);
+ final AMQSessionModel session = _connection.getChannel(channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+ final AMQShortString queueName;
+
+ // if we aren't given a queue name, we create one which we return to the client
+ if ((body.getQueue() == null) || (body.getQueue().length() == 0))
+ {
+ queueName = new AMQShortString("tmp_" + UUID.randomUUID());
+ }
+ else
+ {
+ queueName = body.getQueue().intern();
+ }
+
+ AMQQueue queue;
+
+ //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
+
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ if(body.getPassive())
+ {
+ queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
+ throw body.getChannelException(AMQConstant.NOT_FOUND, msg, _connection.getMethodRegistry());
+ }
+ else
+ {
+ if (!queue.verifySessionAccess(channel))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this Connection.",
+ _connection.getMethodRegistry());
+ }
+
+ //set this as the default queue on the channel:
+ channel.setDefaultQueue(queue);
+ }
+ }
+ else
+ {
+
+ try
+ {
+
+ queue = createQueue(channel, queueName, body, virtualHost, _connection);
+
+ }
+ catch(QueueExistsException qe)
+ {
+
+ queue = qe.getExistingQueue();
+
+ if (!queue.verifySessionAccess(channel))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this Connection.",
+ _connection.getMethodRegistry());
+ }
+ else if(queue.isExclusive() != body.getExclusive())
+ {
+
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different exclusivity (was: "
+ + queue.isExclusive()
+ + " requested "
+ + body.getExclusive()
+ + ")",
+ _connection.getMethodRegistry());
+ }
+ else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
+ || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT)))
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different lifetime policy (was: "
+ + queue.getLifetimePolicy()
+ + " requested autodelete: "
+ + body.getAutoDelete()
+ + ")",
+ _connection.getMethodRegistry());
+ }
+ else if(queue.isDurable() != body.getDurable())
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different durability (was: "
+ + queue.isDurable()
+ + " requested "
+ + body.getDurable()
+ + ")",
+ _connection.getMethodRegistry());
+ }
+
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+
+ //set this as the default queue on the channel:
+ channel.setDefaultQueue(queue);
+ }
+
+ if (!body.getNowait())
+ {
+ channel.sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ QueueDeclareOkBody responseBody =
+ methodRegistry.createQueueDeclareOkBody(queueName,
+ queue.getQueueDepthMessages(),
+ queue.getConsumerCount());
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
+ _logger.info("Queue " + queueName + " declared successfully");
+ }
return true;
}
+ protected AMQQueue createQueue(final AMQChannel channel, final AMQShortString queueName,
+ QueueDeclareBody body,
+ final VirtualHostImpl virtualHost,
+ final AMQProtocolSession session)
+ throws AMQException, QueueExistsException
+ {
+
+ final boolean durable = body.getDurable();
+ final boolean autoDelete = body.getAutoDelete();
+ final boolean exclusive = body.getExclusive();
+
+
+ Map<String, Object> attributes =
+ QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments()));
+ final String queueNameString = AMQShortString.toString(queueName);
+ attributes.put(Queue.NAME, queueNameString);
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.DURABLE, durable);
+
+ LifetimePolicy lifetimePolicy;
+ ExclusivityPolicy exclusivityPolicy;
+
+ if(exclusive)
+ {
+ lifetimePolicy = autoDelete
+ ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
+ : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
+ exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION;
+ }
+ else
+ {
+ lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
+ exclusivityPolicy = ExclusivityPolicy.NONE;
+ }
+
+ attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+ attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
+
+
+ final AMQQueue queue = virtualHost.createQueue(attributes);
+
+ return queue;
+ }
+
public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException
{
- _queueDeleteHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ channel.sync();
+ AMQQueue queue;
+ if (body.getQueue() == null)
+ {
+
+ //get the default queue on the channel:
+ queue = channel.getDefaultQueue();
+ }
+ else
+ {
+ queue = virtualHost.getQueue(body.getQueue().toString());
+ }
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.",
+ _connection.getMethodRegistry());
+
+ }
+ else
+ {
+ if (body.getIfEmpty() && !queue.isEmpty())
+ {
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.",
+ _connection.getMethodRegistry());
+ }
+ else if (body.getIfUnused() && !queue.isUnused())
+ {
+ // TODO - Error code
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.",
+ _connection.getMethodRegistry());
+ }
+ else
+ {
+ if (!queue.verifySessionAccess(channel))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this Connection.",
+ _connection.getMethodRegistry());
+ }
+
+ int purged = 0;
+ try
+ {
+ purged = virtualHost.removeQueue(queue);
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
+ }
return true;
}
public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException
{
- _queuePurgeHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+ AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ AMQQueue queue;
+ if(body.getQueue() == null)
+ {
+
+ //get the default queue on the channel:
+ queue = channel.getDefaultQueue();
+
+ if(queue == null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "No queue specified.",
+ _connection.getMethodRegistry());
+ }
+ }
+ else
+ {
+ queue = virtualHost.getQueue(body.getQueue().toString());
+ }
+
+ if(queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.",
+ _connection.getMethodRegistry());
+ }
+ else
+ {
+ if (!queue.verifySessionAccess(channel))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.",
+ _connection.getMethodRegistry());
+ }
+
+ long purged = 0;
+ try
+ {
+ purged = queue.clearQueue();
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+
+
+ if(!body.getNowait())
+ {
+ channel.sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
+ }
+ }
return true;
}
- public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException
+ public boolean dispatchTxCommit(TxCommitBody body, final int channelId) throws AMQException
{
- _txCommitHandler.methodReceived(getConnection(), body, channelId);
+ try
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Commit received on channel " + channelId);
+ }
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ channel.commit(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
+ }, true);
+
+
+
+ }
+ catch (AMQException e)
+ {
+ throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage(),
+ _connection.getMethodRegistry());
+ }
return true;
}
- public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException
+ public boolean dispatchTxRollback(TxRollbackBody body, final int channelId) throws AMQException
{
- _txRollbackHandler.methodReceived(getConnection(), body, channelId);
+ try
+ {
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+
+
+ final MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
+
+ Runnable task = new Runnable()
+ {
+
+ public void run()
+ {
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
+ };
+
+ channel.rollback(task);
+
+ //Now resend all the unacknowledged messages back to the original subscribers.
+ //(Must be done after the TxnRollback-ok response).
+ // Why, are we not allowed to send messages back to client before the ok method?
+ channel.resend();
+
+ }
+ catch (AMQException e)
+ {
+ throw body.getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage(),
+ _connection.getMethodRegistry());
+ }
return true;
}
public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
{
- _txSelectHandler.methodReceived(getConnection(), body, channelId);
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ channel.setLocalTransactional();
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ return true;
+ }
+
+ public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
+ {
+ final AMQProtocolSession<?> connection = getConnection();
+
+ _logger.debug("Recover received on protocol session " + connection + " and channel " + channelId);
+ AMQChannel channel = connection.getChannel(channelId);
+
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
+ }
+ channel.sync();
+ channel.resend();
+
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
+ AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
+ connection.writeFrame(recoverOk.generateFrame(channelId));
+
return true;
}
+ public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
@Override
- public boolean dispatchQueueUnbind(final QueueUnbindBody queueUnbindBody, final int channelId) throws AMQException
+ public boolean dispatchChannelAlert(final ChannelAlertBody body, final int channelId)
+ throws AMQException
{
- return false;
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
}
+ public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
+ {
+ final AMQProtocolSession<?> connection = getConnection();
+
+ if (ProtocolVersion.v8_0.equals(connection.getProtocolVersion()))
+ {
+ // 0-8 does not support QueueUnbind
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + connection.getProtocolVersion(), null);
+ }
+
+ VirtualHostImpl virtualHost = connection.getVirtualHost();
+
+ final AMQQueue queue;
+ final AMQShortString routingKey;
+
+
+ AMQChannel channel = connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
+ }
+
+ if (body.getQueue() == null)
+ {
+
+ queue = channel.getDefaultQueue();
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND,
+ "No default queue defined on channel and queue was null",
+ connection.getMethodRegistry());
+ }
+
+ routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(false);
+
+ }
+ else
+ {
+ queue = virtualHost.getQueue(body.getQueue().toString());
+ routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(false);
+ }
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.",
+ connection.getMethodRegistry());
+ }
+
+ if(isDefaultExchange(body.getExchange()))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Cannot unbind the queue "
+ + queue.getName()
+ + " from the default exchange", connection.getMethodRegistry());
+ }
+
+ final ExchangeImpl exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString());
+ if (exch == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.",
+ connection.getMethodRegistry());
+ }
+
+ if(!exch.hasBinding(String.valueOf(routingKey), queue))
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "No such binding", connection.getMethodRegistry());
+ }
+ else
+ {
+ try
+ {
+ exch.deleteBinding(String.valueOf(routingKey), queue);
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ connection.getMethodRegistry());
+ }
+ }
+
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
+ }
+
+
+ final AMQMethodBody responseBody = connection.getMethodRegistry().createQueueUnbindOkBody();
+ channel.sync();
+ connection.writeFrame(responseBody.generateFrame(channelId));
+ return true;
+ }
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java
deleted file mode 100644
index d15d27fcd5..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicRecoverSyncBody;
-import org.apache.qpid.framing.BasicRecoverSyncOkBody;
-import org.apache.qpid.framing.ChannelAlertBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.framing.QueueUnbindBody;
-import org.apache.qpid.framing.QueueUnbindOkBody;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-
-
-
-public class ServerMethodDispatcherImpl_0_9
- extends ServerMethodDispatcherImpl
- implements MethodDispatcher
-
-{
-
- private static final BasicRecoverSyncMethodHandler _basicRecoverSyncMethodHandler =
- BasicRecoverSyncMethodHandler.getInstance();
- private static final QueueUnbindHandler _queueUnbindHandler =
- QueueUnbindHandler.getInstance();
-
-
- public ServerMethodDispatcherImpl_0_9(AMQProtocolSession<?> connection)
- {
- super(connection);
- }
-
- public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
- {
- _basicRecoverSyncMethodHandler.methodReceived(getConnection(), body, channelId);
- return true;
- }
-
- public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- @Override
- public boolean dispatchChannelAlert(final ChannelAlertBody body, final int channelId)
- throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
- {
- _queueUnbindHandler.methodReceived(getConnection(), body,channelId);
- return true;
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java
deleted file mode 100644
index f0f387685a..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicRecoverSyncBody;
-import org.apache.qpid.framing.BasicRecoverSyncOkBody;
-import org.apache.qpid.framing.ChannelAlertBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.framing.QueueUnbindBody;
-import org.apache.qpid.framing.QueueUnbindOkBody;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-
-
-public class ServerMethodDispatcherImpl_0_91
- extends ServerMethodDispatcherImpl
- implements MethodDispatcher
-
-{
-
- private static final BasicRecoverSyncMethodHandler _basicRecoverSyncMethodHandler =
- BasicRecoverSyncMethodHandler.getInstance();
- private static final QueueUnbindHandler _queueUnbindHandler =
- QueueUnbindHandler.getInstance();
-
-
- public ServerMethodDispatcherImpl_0_91(AMQProtocolSession<?> connection)
- {
- super(connection);
- }
-
- public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
- {
- _basicRecoverSyncMethodHandler.methodReceived(getConnection(), body, channelId);
- return true;
- }
-
- public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- @Override
- public boolean dispatchChannelAlert(final ChannelAlertBody body, final int channelId)
- throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
- {
- _queueUnbindHandler.methodReceived(getConnection(), body,channelId);
- return true;
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java
deleted file mode 100644
index 4b8531832b..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicRecoverSyncBody;
-import org.apache.qpid.framing.BasicRecoverSyncOkBody;
-import org.apache.qpid.framing.ChannelAlertBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.framing.QueueUnbindBody;
-import org.apache.qpid.framing.QueueUnbindOkBody;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-
-public class ServerMethodDispatcherImpl_8_0
- extends ServerMethodDispatcherImpl
- implements MethodDispatcher
-{
- public ServerMethodDispatcherImpl_8_0(AMQProtocolSession<?> connection)
- {
- super(connection);
- }
-
- public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- @Override
- public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody queueUnbindOkBody, final int channelId)
- {
- return false;
- }
-
- @Override
- public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody body,
- final int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- @Override
- public boolean dispatchQueueUnbind(final QueueUnbindBody queueUnbindBody, final int channelId) throws AMQException
- {
- return false;
- }
-
- @Override
- public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, final int channelId)
- throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java
deleted file mode 100644
index cb08b1fd4f..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
-{
- private static final Logger _log = Logger.getLogger(TxCommitHandler.class);
-
- private static TxCommitHandler _instance = new TxCommitHandler();
-
- public static TxCommitHandler getInstance()
- {
- return _instance;
- }
-
- private TxCommitHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- TxCommitBody body,
- final int channelId) throws AMQException
- {
- try
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Commit received on channel " + channelId);
- }
- AMQChannel channel = connection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
- channel.commit(new Runnable()
- {
-
- @Override
- public void run()
- {
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
- connection.writeFrame(responseBody.generateFrame(channelId));
- }
- }, true);
-
-
-
- }
- catch (AMQException e)
- {
- throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage(),
- connection.getMethodRegistry());
- }
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java
deleted file mode 100644
index 08c1c2378b..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBody>
-{
- private static TxRollbackHandler _instance = new TxRollbackHandler();
-
- public static TxRollbackHandler getInstance()
- {
- return _instance;
- }
-
- private TxRollbackHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- TxRollbackBody body,
- final int channelId) throws AMQException
- {
- try
- {
- AMQChannel channel = connection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
-
-
-
- final MethodRegistry methodRegistry = connection.getMethodRegistry();
- final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
-
- Runnable task = new Runnable()
- {
-
- public void run()
- {
- connection.writeFrame(responseBody.generateFrame(channelId));
- }
- };
-
- channel.rollback(task);
-
- //Now resend all the unacknowledged messages back to the original subscribers.
- //(Must be done after the TxnRollback-ok response).
- // Why, are we not allowed to send messages back to client before the ok method?
- channel.resend();
-
- }
- catch (AMQException e)
- {
- throw body.getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage(),
- connection.getMethodRegistry());
- }
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java
deleted file mode 100644
index d6ac194b09..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.framing.TxSelectOkBody;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
-{
- private static TxSelectHandler _instance = new TxSelectHandler();
-
- public static TxSelectHandler getInstance()
- {
- return _instance;
- }
-
- private TxSelectHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- TxSelectBody body,
- int channelId) throws AMQException
- {
- AMQChannel channel = connection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
-
- channel.setLocalTransactional();
-
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody();
- connection.writeFrame(responseBody.generateFrame(channelId));
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
index 1e473a0a14..4225e12930 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
@@ -33,17 +33,11 @@ import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
public interface ProtocolOutputConverter
{
void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag);
- interface Factory
- {
- ProtocolOutputConverter newInstance(AMQProtocolSession session);
- }
-
long writeDeliver(final ServerMessage msg,
final InstanceProperties props, int channelId,
long deliveryTag,
@@ -55,10 +49,6 @@ public interface ProtocolOutputConverter
long deliveryTag,
int queueSize);
- byte getProtocolMinorVersion();
-
- byte getProtocolMajorVersion();
-
void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource msgContent, int channelId, int replyCode, AMQShortString replyText);
void writeFrame(AMQDataBlock block);
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
index fe475c1914..dc034f7bd2 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
@@ -35,7 +35,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicGetOkBody;
import org.apache.qpid.framing.BasicReturnBody;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.message.InstanceProperties;
@@ -47,26 +46,19 @@ import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.util.GZIPUtils;
-class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final int BASIC_CLASS_ID = 60;
- private final MethodRegistry _methodRegistry;
private final AMQProtocolSession _protocolSession;
private static final AMQShortString GZIP_ENCODING = AMQShortString.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING);
- ProtocolOutputConverterImpl(AMQProtocolSession session, MethodRegistry methodRegistry)
+ public ProtocolOutputConverterImpl(AMQProtocolSession session)
{
_protocolSession = session;
- _methodRegistry = methodRegistry;
}
- public AMQProtocolSession getProtocolSession()
- {
- return _protocolSession;
- }
-
public long writeDeliver(final ServerMessage m,
final InstanceProperties props, int channelId,
long deliveryTag,
@@ -192,7 +184,7 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
else
{
- int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+ int maxBodySize = (int) _protocolSession.getMaxFrameSize() - AMQFrame.getFrameOverhead();
int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
@@ -326,11 +318,11 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public AMQBody createAMQBody()
{
- return _methodRegistry.createBasicDeliverBody(_consumerTag,
- _deliveryTag,
- _isRedelivered,
- _exchangeName,
- _routingKey);
+ return _protocolSession.getMethodRegistry().createBasicDeliverBody(_consumerTag,
+ _deliveryTag,
+ _isRedelivered,
+ _exchangeName,
+ _routingKey);
}
public byte getFrameType()
@@ -382,35 +374,25 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED));
BasicGetOkBody getOkBody =
- _methodRegistry.createBasicGetOkBody(deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey,
- queueSize);
+ _protocolSession.getMethodRegistry().createBasicGetOkBody(deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey,
+ queueSize);
return getOkBody;
}
- public byte getProtocolMinorVersion()
- {
- return _protocolSession.getProtocolMinorVersion();
- }
-
- public byte getProtocolMajorVersion()
- {
- return getProtocolSession().getProtocolMajorVersion();
- }
-
private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
int replyCode,
AMQShortString replyText)
{
BasicReturnBody basicReturnBody =
- _methodRegistry.createBasicReturnBody(replyCode,
- replyText,
- messagePublishInfo.getExchange(),
- messagePublishInfo.getRoutingKey());
+ _protocolSession.getMethodRegistry().createBasicReturnBody(replyCode,
+ replyText,
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
return basicReturnBody;
@@ -427,14 +409,14 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public void writeFrame(AMQDataBlock block)
{
- getProtocolSession().writeFrame(block);
+ _protocolSession.writeFrame(block);
}
public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
{
- BasicCancelOkBody basicCancelOkBody = _methodRegistry.createBasicCancelOkBody(consumerTag);
+ BasicCancelOkBody basicCancelOkBody = _protocolSession.getMethodRegistry().createBasicCancelOkBody(consumerTag);
writeFrame(basicCancelOkBody.generateFrame(channelId));
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterRegistry.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterRegistry.java
deleted file mode 100644
index d4332b37ee..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterRegistry.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/*
- * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
- * Supported AMQP versions:
- * 8-0
- */
-package org.apache.qpid.server.protocol.v0_8.output;
-
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter.Factory;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class ProtocolOutputConverterRegistry
-{
-
- private static final Map<ProtocolVersion, Factory> _registry =
- new HashMap<ProtocolVersion, Factory>();
-
-
- static
- {
- register(ProtocolVersion.v8_0);
- register(ProtocolVersion.v0_9);
- register(ProtocolVersion.v0_91);
- }
-
- private ProtocolOutputConverterRegistry()
- {
- }
-
- private static void register(ProtocolVersion version)
- {
-
- _registry.put(version,new ConverterFactory(version));
- }
-
-
- public static ProtocolOutputConverter getConverter(AMQProtocolSession session)
- {
- return _registry.get(session.getProtocolVersion()).newInstance(session);
- }
-
- private static class ConverterFactory implements Factory
- {
- private ProtocolVersion _protocolVersion;
- private MethodRegistry _methodRegistry;
- private int _classId;
-
- public ConverterFactory(ProtocolVersion pv)
- {
- _protocolVersion = pv;
-
- }
-
- public synchronized ProtocolOutputConverter newInstance(AMQProtocolSession session)
- {
- if(_methodRegistry == null)
- {
-
- _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
-
- }
- return new ProtocolOutputConverterImpl(session, _methodRegistry);
- }
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/StateAwareMethodListener.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/StateAwareMethodListener.java
deleted file mode 100644
index d767c7e326..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/StateAwareMethodListener.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.state;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-
-/**
- * A frame listener that is informed of the protocol state when invoked and has
- * the opportunity to update state.
- *
- */
-public interface StateAwareMethodListener<B extends AMQMethodBody>
-{
- void methodReceived(final AMQProtocolSession<?> connection, B evt, int channelId) throws AMQException;
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
index 6c35077f8a..44a4c5a19e 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
@@ -98,13 +98,12 @@ public class AMQChannelTest extends QpidTestCase
AMQChannel channel = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore());
channel.setLocalTransactional();
- MessagePublishInfo info = mock(MessagePublishInfo.class);
+ MessagePublishInfo info = new MessagePublishInfo(new AMQShortString("test"), false, false, null);
ExchangeImpl e = mock(ExchangeImpl.class);
ContentHeaderBody contentHeaderBody= mock(ContentHeaderBody.class);
BasicContentHeaderProperties properties = mock(BasicContentHeaderProperties.class);
when(contentHeaderBody.getProperties()).thenReturn(properties);
- when(info.getExchange()).thenReturn(new AMQShortString("test"));
when(properties.getUserId()).thenReturn(new AMQShortString(_protocolSession.getAuthorizedPrincipal().getName() + "_incorrect"));
channel.setPublishFrame(info, e);
@@ -121,13 +120,12 @@ public class AMQChannelTest extends QpidTestCase
AMQChannel channel = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore());
channel.setLocalTransactional();
- MessagePublishInfo info = mock(MessagePublishInfo.class);
+ MessagePublishInfo info = new MessagePublishInfo(new AMQShortString("test"), false, false, null);
ExchangeImpl e = mock(ExchangeImpl.class);
ContentHeaderBody contentHeaderBody= mock(ContentHeaderBody.class);
BasicContentHeaderProperties properties = mock(BasicContentHeaderProperties.class);
when(contentHeaderBody.getProperties()).thenReturn(properties);
- when(info.getExchange()).thenReturn(new AMQShortString("test"));
when(properties.getUserId()).thenReturn(new AMQShortString(_protocolSession.getAuthorizedPrincipal().getName()));
channel.setPublishFrame(info, e);
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index c9f643eac6..564194668d 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -90,34 +90,8 @@ public class AckTest extends QpidTestCase
{
for (int i = 1; i <= count; i++)
{
- MessagePublishInfo publishBody = new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return new AMQShortString("someExchange");
- }
-
- public void setExchange(AMQShortString exchange)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isImmediate()
- {
- return false;
- }
-
- public boolean isMandatory()
- {
- return false;
- }
-
- public AMQShortString getRoutingKey()
- {
- return new AMQShortString("rk");
- }
- };
+ MessagePublishInfo publishBody = new MessagePublishInfo(new AMQShortString("someExchange"), false, false,
+ new AMQShortString("rk"));
BasicContentHeaderProperties b = new BasicContentHeaderProperties();
ContentHeaderBody cb = new ContentHeaderBody();
cb.setProperties(b);
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
index ad9035259c..06b27f0149 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
@@ -29,9 +29,6 @@ import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
public class BrokerTestHelper_0_8 extends BrokerTestHelper
{
@@ -69,9 +66,7 @@ public class BrokerTestHelper_0_8 extends BrokerTestHelper
{
AMQShortString routingKey = new AMQShortString(queueName);
AMQShortString exchangeNameAsShortString = new AMQShortString(exchangeName);
- MessagePublishInfo info = mock(MessagePublishInfo.class);
- when(info.getExchange()).thenReturn(exchangeNameAsShortString);
- when(info.getRoutingKey()).thenReturn(routingKey);
+ MessagePublishInfo info = new MessagePublishInfo(exchangeNameAsShortString, false, false, routingKey);
MessageDestination destination;
if(exchangeName == null || "".equals(exchangeName))
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockMessagePublishInfo.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockMessagePublishInfo.java
deleted file mode 100644
index ffcd01ba20..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockMessagePublishInfo.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.MessagePublishInfo;
-
-public class MockMessagePublishInfo implements MessagePublishInfo
-{
- public AMQShortString getExchange()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setExchange(AMQShortString exchange)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isImmediate()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isMandatory()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public AMQShortString getRoutingKey()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
index 8410e2cf5b..beba87d754 100755
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
@@ -41,7 +41,7 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData>
public MockStoredMessage(long messageId, String headerName, Object headerValue)
{
- this(messageId, new MockMessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties(), 60), headerName, headerValue);
+ this(messageId, new MessagePublishInfo(null, false, false, null), new ContentHeaderBody(new BasicContentHeaderProperties(), 60), headerName, headerValue);
}
public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb)
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
index 669028e6f4..92fe3d2e58 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v0_8;
import java.util.UUID;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MessagePublishInfo;
@@ -57,36 +56,7 @@ public class ReferenceCountingTest extends QpidTestCase
{
ContentHeaderBody chb = createPersistentContentHeader();
- MessagePublishInfo info = new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return null;
- }
-
- public void setExchange(AMQShortString exchange)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isImmediate()
- {
- return false;
- }
-
- public boolean isMandatory()
- {
- return false;
- }
-
- public AMQShortString getRoutingKey()
- {
- return null;
- }
- };
-
-
+ MessagePublishInfo info = new MessagePublishInfo(null, false, false, null);
final MessageMetaData mmd = new MessageMetaData(info, chb);
@@ -124,34 +94,7 @@ public class ReferenceCountingTest extends QpidTestCase
public void testMessageRemains() throws AMQException
{
- MessagePublishInfo info = new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return null;
- }
-
- public void setExchange(AMQShortString exchange)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isImmediate()
- {
- return false;
- }
-
- public boolean isMandatory()
- {
- return false;
- }
-
- public AMQShortString getRoutingKey()
- {
- return null;
- }
- };
+ MessagePublishInfo info = new MessagePublishInfo(null, false, false, null);
final ContentHeaderBody chb = createPersistentContentHeader();
diff --git a/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
index 7c7284ff96..b63ca11a40 100644
--- a/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
+++ b/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
@@ -256,38 +256,7 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra
final boolean immediate = delvProps != null && delvProps.getImmediate();
final boolean mandatory = delvProps != null && !delvProps.getDiscardUnroutable();
- return new MessagePublishInfo()
- {
- @Override
- public AMQShortString getExchange()
- {
- return exchangeName;
- }
-
- @Override
- public void setExchange(AMQShortString exchange)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isImmediate()
- {
- return immediate;
- }
-
- @Override
- public boolean isMandatory()
- {
- return mandatory;
- }
-
- @Override
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- };
+ return new MessagePublishInfo(exchangeName, immediate, mandatory, routingKey);
}
@Override
diff --git a/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index ea33a9e763..4789543003 100644
--- a/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -137,45 +137,13 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
{
final MessageMetaData_1_0.MessageHeader_1_0 header = serverMsg.getMessageHeader();
+ String key = header.getTo();
+ if(key == null)
+ {
+ key = header.getSubject();
+ }
- MessagePublishInfo publishInfo = new MessagePublishInfo()
- {
- @Override
- public AMQShortString getExchange()
- {
- return null;
- }
-
- @Override
- public void setExchange(final AMQShortString amqShortString)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isImmediate()
- {
- return false;
- }
-
- @Override
- public boolean isMandatory()
- {
- return false;
- }
-
- @Override
- public AMQShortString getRoutingKey()
- {
- String key = header.getTo();
- if(key == null)
- {
- key = header.getSubject();
- }
-
- return AMQShortString.valueOf(key);
- }
- };
+ MessagePublishInfo publishInfo = new MessagePublishInfo(null, false, false, AMQShortString.valueOf(key));
final BasicContentHeaderProperties props = new BasicContentHeaderProperties();
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
index 8144fd1258..e6eb2d814f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
@@ -140,6 +140,13 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
return false;
}
+ @Override
+ public boolean dispatchChannelAlert(final ChannelAlertBody channelAlertBody, final int channelId)
+ throws AMQException
+ {
+ return false;
+ }
+
public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
{
_basicCancelOkMethodHandler.methodReceived(_session, body, channelId);
@@ -386,6 +393,13 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
return false;
}
+ @Override
+ public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody basicRecoverSyncBody, final int channelId)
+ throws AMQException
+ {
+ return false;
+ }
+
public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
{
_exchangeBoundOkMethodHandler.methodReceived(_session, body, channelId);
diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
index 9626bb017f..ec17f224c1 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
@@ -597,7 +597,7 @@ public class VirtualHostMessageStoreTest extends QpidTestCase
headers.setString("Test", "MST");
properties.setHeaders(headers);
- MessagePublishInfo messageInfo = new TestMessagePublishInfo(exchange, false, false, routingKey);
+ MessagePublishInfo messageInfo = new MessagePublishInfo(new AMQShortString(exchange.getName()), false, false, new AMQShortString(routingKey));
ContentHeaderBody headerBody = new ContentHeaderBody(BasicConsumeBody.CLASS_ID,0,properties,0l);
@@ -824,52 +824,4 @@ public class VirtualHostMessageStoreTest extends QpidTestCase
assertEquals("Incorrect Message count on queue:" + queueName, messageCount, queue.getQueueDepthMessages());
}
-
- private class TestMessagePublishInfo implements MessagePublishInfo
- {
-
- ExchangeImpl<?> _exchange;
- boolean _immediate;
- boolean _mandatory;
- String _routingKey;
-
- TestMessagePublishInfo(ExchangeImpl<?> exchange, boolean immediate, boolean mandatory, String routingKey)
- {
- _exchange = exchange;
- _immediate = immediate;
- _mandatory = mandatory;
- _routingKey = routingKey;
- }
-
- @Override
- public AMQShortString getExchange()
- {
- return new AMQShortString(_exchange.getName());
- }
-
- @Override
- public void setExchange(AMQShortString exchange)
- {
- //no-op
- }
-
- @Override
- public boolean isImmediate()
- {
- return _immediate;
- }
-
- @Override
- public boolean isMandatory()
- {
- return _mandatory;
- }
-
- @Override
- public AMQShortString getRoutingKey()
- {
- return new AMQShortString(_routingKey);
- }
- }
-
}