diff options
9 files changed, 147 insertions, 93 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index e71782b116..8c7b374791 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -105,11 +105,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException { ExecutionException ex = new ExecutionException(); - ex.setErrorCode(ExecutionErrorCode.get(cause.getCode())); + ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; + try + { + code = ExecutionErrorCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore, already set to INTERNAL_ERROR + } + ex.setErrorCode(code); ex.setDescription(message); ((ServerSession)session).invoke(ex); ((ServerSession)session).close(); } - } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index d24ad46512..430a4bd9e9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -361,8 +361,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (!nowait) { // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } } @@ -382,9 +381,21 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic flushTask = null; } flushAcknowledgments(); - getQpidSession().sync(); - getQpidSession().close(); - getCurrentException(); + try + { + getQpidSession().sync(); + getQpidSession().close(); + } + catch (SessionException se) + { + setCurrentException(se); + } + + AMQException amqe = getCurrentException(); + if (amqe != null) + { + throw amqe; + } } @@ -403,7 +414,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getQpidSession().setAutoSync(false); } // We need to sync so that we get notify of an error. - getCurrentException(); + sync(); } /** @@ -426,8 +437,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic autoDelete ? Option.AUTO_DELETE : Option.NONE, exclusive ? Option.EXCLUSIVE : Option.NONE); // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } /** @@ -451,8 +461,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } @@ -566,7 +575,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic try { boolean isTopic; - + if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL) { isTopic = consumer.getDestination() instanceof AMQTopic || @@ -583,7 +592,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic (isTopic || consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")); } - + getQpidSession().messageSubscribe (queueName.toString(), String.valueOf(tag), getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, @@ -607,7 +616,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, Option.UNRELIABLE); - + if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch)) { // set the flow @@ -619,11 +628,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (!nowait) { - getQpidSession().sync(); - getCurrentException(); + sync(); } } - + private long getCapacity(AMQDestination destination) { long capacity = 0; @@ -677,8 +685,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // We need to sync so that we get notify of an error. if (!nowait) { - getQpidSession().sync(); - getCurrentException(); + sync(); } } @@ -710,7 +717,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { queueName = amqd.getAMQQueueName(); } - + if (amqd.getDestSyntax() == DestSyntax.BURL) { Map<String,Object> arguments = new HashMap<String,Object>(); @@ -718,7 +725,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { arguments.put("no-local", true); } - + getQpidSession().queueDeclare(queueName.toString(), "" , arguments, amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, amqd.isDurable() ? Option.DURABLE : Option.NONE, @@ -733,13 +740,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic node.isDurable() ? Option.DURABLE : Option.NONE, node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } - + // passive --> false if (!nowait) { // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } return queueName; } @@ -753,8 +759,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // ifEmpty --> false // ifUnused --> false // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } /** @@ -807,8 +812,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } @@ -816,8 +820,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { getQpidSession().txRollback(); // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } //------ Private methods @@ -835,19 +838,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * Get the latest thrown exception. * - * @throws org.apache.qpid.AMQException get the latest thrown error. + * @throws SessionException get the latest thrown error. */ - public void getCurrentException() throws AMQException + public AMQException getCurrentException() { + AMQException amqe = null; synchronized (_currentExceptionLock) { if (_currentException != null) { - AMQException amqe = _currentException; + amqe = _currentException; _currentException = null; - throw amqe; } } + return amqe; } public void opened(Session ssn) {} @@ -872,22 +876,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void exception(Session ssn, SessionException exc) { - synchronized (_currentExceptionLock) - { - ExecutionException ee = exc.getException(); - int code; - if (ee == null) - { - code = AMQConstant.INTERNAL_ERROR.getCode(); - } - else - { - code = ee.getErrorCode().getValue(); - } - AMQException amqe = new AMQException(AMQConstant.getConstant(code), exc.getMessage(), exc.getCause()); - _connection.exceptionReceived(amqe); - _currentException = amqe; - } + setCurrentException(exc); } public void closed(Session ssn) {} @@ -1041,11 +1030,41 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { return Serial.lt((int) currentMark, (int) deliveryTag); } - + public void sync() throws AMQException { - _qpidSession.sync(); - getCurrentException(); + try + { + getQpidSession().sync(); + } + catch (SessionException se) + { + setCurrentException(se); + } + + AMQException amqe = getCurrentException(); + if (amqe != null) + { + throw amqe; + } + } + + public void setCurrentException(SessionException se) + { + synchronized (_currentExceptionLock) + { + ExecutionException ee = se.getException(); + int code = AMQConstant.INTERNAL_ERROR.getCode(); + if (ee != null) + { + code = ee.getErrorCode().getValue(); + } + AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause()); + + _connection.exceptionReceived(amqe); + + _currentException = amqe; + } } public AMQMessageDelegateFactory getMessageDelegateFactory() diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index a942d808a9..eddaa1a6bb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -168,16 +168,26 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM */ @Override void sendCancel() throws AMQException { - ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString()); - ((AMQSession_0_10) getSession()).getQpidSession().sync(); - // confirm cancel - getSession().confirmConsumerCancelled(getConsumerTag()); - ((AMQSession_0_10) getSession()).getCurrentException(); + _0_10session.getQpidSession().messageCancel(getConsumerTagString()); + try + { + _0_10session.getQpidSession().sync(); + getSession().confirmConsumerCancelled(getConsumerTag()); // confirm cancel + } + catch (SessionException se) + { + _0_10session.setCurrentException(se); + } + + AMQException amqe = _0_10session.getCurrentException(); + if (amqe != null) + { + throw amqe; + } } @Override void notifyMessage(UnprocessedMessage_0_10 messageFrame) { - super.notifyMessage(messageFrame); } @@ -285,7 +295,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _0_10session.messageAcknowledge (ranges, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); - _0_10session.getCurrentException(); + + AMQException amqe = _0_10session.getCurrentException(); + if (amqe != null) + { + throw amqe; + } } } @@ -302,7 +317,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM RangeSet ranges = new RangeSet(); ranges.add((int) message.getDeliveryTag()); _0_10session.getQpidSession().messageRelease(ranges); - _0_10session.getCurrentException(); + _0_10session.sync(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index df59be25d0..14e1601993 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -266,7 +266,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac return _destination; } - public void close() throws JMSException + public void close() { _closed.set(true); _session.deregisterProducer(_producerId); diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index ed6f00a51c..13b8e461d4 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -56,7 +56,7 @@ public class Connection extends ConnectionInvoker implements Receiver<ProtocolEvent>, Sender<ProtocolEvent> { - private static final Logger log = Logger.get(Connection.class); + protected static final Logger log = Logger.get(Connection.class); public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java index 29389df99a..88dd2d6afa 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java @@ -99,5 +99,4 @@ public abstract class ConnectionDelegate ssn.closed(); } } - } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 5e40527c2f..9b84ff422b 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -280,7 +280,7 @@ public class Session extends SessionInvoker { if (m != null) { - System.out.println(m); + log.debug("%s", m); } } } @@ -732,8 +732,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commands, timeout); while (w.hasTime() && state != CLOSED && lt(maxComplete, point)) { - log.debug("%s waiting for[%d]: %d, %s", this, point, - maxComplete, commands); + log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands); w.await(); } @@ -741,16 +740,23 @@ public class Session extends SessionInvoker { if (state != CLOSED) { - throw new SessionException - (String.format - ("timed out waiting for sync: complete = %s, point = %s", maxComplete, point)); + throw new SessionException( + String.format("timed out waiting for sync: complete = %s, point = %s", + maxComplete, point)); + } + else + { + ExecutionException ee = getException(); + if (ee != null) + { + throw new SessionException(ee); + } } } } } - private Map<Integer,ResultFuture<?>> results = - new HashMap<Integer,ResultFuture<?>>(); + private Map<Integer,ResultFuture<?>> results = new HashMap<Integer,ResultFuture<?>>(); private ExecutionException exception = null; void result(int command, Struct result) @@ -769,9 +775,8 @@ public class Session extends SessionInvoker { if (exception != null) { - throw new IllegalStateException - (String.format - ("too many exceptions: %s, %s", exception, exc)); + throw new IllegalStateException( + String.format("too many exceptions: %s, %s", exception, exc)); } exception = exc; } @@ -849,8 +854,8 @@ public class Session extends SessionInvoker } else { - throw new SessionException - (String.format("%s timed out waiting for result: %s", + throw new SessionException( + String.format("%s timed out waiting for result: %s", Session.this, this)); } } @@ -961,5 +966,4 @@ public class Session extends SessionInvoker { return String.format("ssn:%s", name); } - } diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index 15539c1d07..5d8e4d5565 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -33,11 +33,15 @@ public class SessionDelegate extends MethodDelegate<Session> implements ProtocolDelegate<Session> { - private static final Logger log = Logger.get(SessionDelegate.class); + protected static final Logger log = Logger.get(SessionDelegate.class); - public void init(Session ssn, ProtocolHeader hdr) { } + public void init(Session ssn, ProtocolHeader hdr) + { + log.warn("INIT: [%s] %s", ssn, hdr); + } - public void control(Session ssn, Method method) { + public void control(Session ssn, Method method) + { method.dispatch(ssn, this); } @@ -50,7 +54,10 @@ public class SessionDelegate } } - public void error(Session ssn, ProtocolError error) { } + public void error(Session ssn, ProtocolError error) + { + log.warn("ERROR: [%s] %s", ssn, error); + } public void handle(Session ssn, Method method) { @@ -195,9 +202,11 @@ public class SessionDelegate public void closed(Session session) { + log.warn("CLOSED: [%s]", session); } public void detached(Session session) - { + { + log.warn("DETACHED: [%s]", session); } } diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index bdd3a0c93b..375a326654 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -424,10 +424,6 @@ public class ConnectionTest extends QpidTestCase implements SessionListener } } - /** - * The 0-10 {@code executionSync} command should set the exception status in the session, - * so that the client session object can then throw it as an {@link AMQException}. - */ public void testExecutionExceptionSync() throws Exception { startServer(); @@ -436,11 +432,15 @@ public class ConnectionTest extends QpidTestCase implements SessionListener conn.connect("localhost", port, null, "guest", "guest"); Session ssn = conn.createSession(); send(ssn, "EXCP 0", true); - ExecutionException before = ssn.getException(); - assertNull("There should not be an exception stored in the session", before); - ssn.sync(); - ExecutionException after = ssn.getException(); - assertNotNull("There should be an exception stored in the session", after); + try + { + ssn.sync(); + fail("this should have failed"); + } + catch (SessionException exc) + { + assertNotNull(exc.getException()); + } } } |