diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2009-08-27 14:39:51 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2009-08-27 14:39:51 +0000 |
commit | 9f72bbe223c7b0cbb587cf4208d50fa6c7081499 (patch) | |
tree | d42f5c17bf81e0961fc1606b556f18a2ee2891bc | |
parent | a6b385bef5206a45d22e227c3a6576b25939025c (diff) | |
download | qpid-python-9f72bbe223c7b0cbb587cf4208d50fa6c7081499.tar.gz |
More updates
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@808436 13f79535-47bb-0310-9956-ffa450edef68
2 files changed, 79 insertions, 25 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index e14dc936fd..6b0cf89b95 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -62,7 +62,7 @@ public class DefaultExchangeFactory implements ExchangeFactory public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) throws AMQException { - ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(type); + ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(new AMQShortString(type)); if (exchType == null) { @@ -106,7 +106,7 @@ public class DefaultExchangeFactory implements ExchangeFactory return; } Class<? extends ExchangeType> exchangeTypeClass = exchangeType.getClass(); - ExchangeType type = exchangeTypeClass.newInstance(); + ExchangeType<? extends ExchangeType> type = exchangeTypeClass.newInstance(); registerExchangeType(type); } catch (ClassCastException classCastEx) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index edd419ef9e..9f08434fe8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -102,33 +102,65 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageSubscribe(Session session, MessageSubscribe method) { - String destination = method.getDestination(); - String queueName = method.getQueue(); - QueueRegistry queueRegistry = getQueueRegistry(session); - AMQQueue queue = queueRegistry.getQueue(queueName); + //TODO - work around broken Python tests + if(!method.hasAcceptMode()) + { + method.setAcceptMode(MessageAcceptMode.EXPLICIT); + } + if(!method.hasAcquireMode()) + { + method.setAcquireMode(MessageAcquireMode.PRE_ACQUIRED); - //TODO null check + } + /* if(!method.hasAcceptMode()) + { + exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "Accept-mode not supplied"); + } + else if(!method.hasAcquireMode()) + { + exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "Acquire-mode not supplied"); + } + else */if(!method.hasQueue()) + { + exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not supplied"); + } + else + { + String destination = method.getDestination(); + String queueName = method.getQueue(); + QueueRegistry queueRegistry = getQueueRegistry(session); - FlowCreditManager_0_10 creditManager = new CreditCreditManager(0L,0L); - // TODO filters + AMQQueue queue = queueRegistry.getQueue(queueName); - Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination,method.getAcceptMode(),method.getAcquireMode(), creditManager, null); + if(queue == null) + { + exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found"); + } + else + { - ((ServerSession)session).register(destination, sub); - try - { - queue.registerSubscription(sub, method.getExclusive()); - } - catch (AMQException e) - { - // TODO - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - throw new RuntimeException(e); - } + FlowCreditManager_0_10 creditManager = new CreditCreditManager(0L,0L); + + // TODO filters + Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination,method.getAcceptMode(),method.getAcquireMode(), creditManager, null); + + ((ServerSession)session).register(destination, sub); + try + { + queue.registerSubscription(sub, method.getExclusive()); + } + catch (AMQException e) + { + // TODO + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + throw new RuntimeException(e); + } + } + } } @@ -266,11 +298,13 @@ public class ServerSessionDelegate extends SessionDelegate } - else + else if(exchange == null) { ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); + + try { @@ -293,6 +327,10 @@ public class ServerSessionDelegate extends SessionDelegate } } + else + { + // TODO check same as declared + } } } @@ -371,7 +409,23 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void exchangeQuery(Session session, ExchangeQuery method) { - super.exchangeQuery(session, method); + + ExchangeQueryResult result = new ExchangeQueryResult(); + + Exchange exchange = getExchange(session, method.getName()); + + if(exchange != null) + { + result.setDurable(exchange.isDurable()); + result.setType(exchange.getType().toString()); + result.setNotFound(false); + } + else + { + result.setNotFound(true); + } + + session.executionResult((int) method.getId(), result); } @Override @@ -462,7 +516,7 @@ public class ServerSessionDelegate extends SessionDelegate session.executionResult((int) method.getId(), result); - super.exchangeBound(session, method); + } private AMQQueue getQueue(Session session, String queue) @@ -551,7 +605,7 @@ public class ServerSessionDelegate extends SessionDelegate } } } - else if (queue.getOwner() != null && !((ServerSession)session).getPrincipal().getName().equals(queue.getOwner())) + else if (method.getExclusive() && (queue.getOwner() != null && !queue.getOwner().equals(((ServerSession)session).getPrincipal().getName()))) { String description = "Cannot declare queue('" + queueName + "')," |