summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-08-27 14:39:51 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-08-27 14:39:51 +0000
commit9f72bbe223c7b0cbb587cf4208d50fa6c7081499 (patch)
treed42f5c17bf81e0961fc1606b556f18a2ee2891bc
parenta6b385bef5206a45d22e227c3a6576b25939025c (diff)
downloadqpid-python-9f72bbe223c7b0cbb587cf4208d50fa6c7081499.tar.gz
More updates
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@808436 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java100
2 files changed, 79 insertions, 25 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index e14dc936fd..6b0cf89b95 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -62,7 +62,7 @@ public class DefaultExchangeFactory implements ExchangeFactory
public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete)
throws AMQException
{
- ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(type);
+ ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(new AMQShortString(type));
if (exchType == null)
{
@@ -106,7 +106,7 @@ public class DefaultExchangeFactory implements ExchangeFactory
return;
}
Class<? extends ExchangeType> exchangeTypeClass = exchangeType.getClass();
- ExchangeType type = exchangeTypeClass.newInstance();
+ ExchangeType<? extends ExchangeType> type = exchangeTypeClass.newInstance();
registerExchangeType(type);
}
catch (ClassCastException classCastEx)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index edd419ef9e..9f08434fe8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -102,33 +102,65 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void messageSubscribe(Session session, MessageSubscribe method)
{
- String destination = method.getDestination();
- String queueName = method.getQueue();
- QueueRegistry queueRegistry = getQueueRegistry(session);
- AMQQueue queue = queueRegistry.getQueue(queueName);
+ //TODO - work around broken Python tests
+ if(!method.hasAcceptMode())
+ {
+ method.setAcceptMode(MessageAcceptMode.EXPLICIT);
+ }
+ if(!method.hasAcquireMode())
+ {
+ method.setAcquireMode(MessageAcquireMode.PRE_ACQUIRED);
- //TODO null check
+ }
+ /* if(!method.hasAcceptMode())
+ {
+ exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "Accept-mode not supplied");
+ }
+ else if(!method.hasAcquireMode())
+ {
+ exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "Acquire-mode not supplied");
+ }
+ else */if(!method.hasQueue())
+ {
+ exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not supplied");
+ }
+ else
+ {
+ String destination = method.getDestination();
+ String queueName = method.getQueue();
+ QueueRegistry queueRegistry = getQueueRegistry(session);
- FlowCreditManager_0_10 creditManager = new CreditCreditManager(0L,0L);
- // TODO filters
+ AMQQueue queue = queueRegistry.getQueue(queueName);
- Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination,method.getAcceptMode(),method.getAcquireMode(), creditManager, null);
+ if(queue == null)
+ {
+ exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
+ }
+ else
+ {
- ((ServerSession)session).register(destination, sub);
- try
- {
- queue.registerSubscription(sub, method.getExclusive());
- }
- catch (AMQException e)
- {
- // TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- throw new RuntimeException(e);
- }
+ FlowCreditManager_0_10 creditManager = new CreditCreditManager(0L,0L);
+
+ // TODO filters
+ Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination,method.getAcceptMode(),method.getAcquireMode(), creditManager, null);
+
+ ((ServerSession)session).register(destination, sub);
+ try
+ {
+ queue.registerSubscription(sub, method.getExclusive());
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
@@ -266,11 +298,13 @@ public class ServerSessionDelegate extends SessionDelegate
}
- else
+ else if(exchange == null)
{
ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+
+
try
{
@@ -293,6 +327,10 @@ public class ServerSessionDelegate extends SessionDelegate
}
}
+ else
+ {
+ // TODO check same as declared
+ }
}
}
@@ -371,7 +409,23 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void exchangeQuery(Session session, ExchangeQuery method)
{
- super.exchangeQuery(session, method);
+
+ ExchangeQueryResult result = new ExchangeQueryResult();
+
+ Exchange exchange = getExchange(session, method.getName());
+
+ if(exchange != null)
+ {
+ result.setDurable(exchange.isDurable());
+ result.setType(exchange.getType().toString());
+ result.setNotFound(false);
+ }
+ else
+ {
+ result.setNotFound(true);
+ }
+
+ session.executionResult((int) method.getId(), result);
}
@Override
@@ -462,7 +516,7 @@ public class ServerSessionDelegate extends SessionDelegate
session.executionResult((int) method.getId(), result);
- super.exchangeBound(session, method);
+
}
private AMQQueue getQueue(Session session, String queue)
@@ -551,7 +605,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
}
}
- else if (queue.getOwner() != null && !((ServerSession)session).getPrincipal().getName().equals(queue.getOwner()))
+ else if (method.getExclusive() && (queue.getOwner() != null && !queue.getOwner().equals(((ServerSession)session).getPrincipal().getName())))
{
String description = "Cannot declare queue('" + queueName + "'),"