summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java123
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java32
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java19
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java18
9 files changed, 147 insertions, 93 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index e71782b116..8c7b374791 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -105,11 +105,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
{
ExecutionException ex = new ExecutionException();
- ex.setErrorCode(ExecutionErrorCode.get(cause.getCode()));
+ ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR;
+ try
+ {
+ code = ExecutionErrorCode.get(cause.getCode());
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // Ignore, already set to INTERNAL_ERROR
+ }
+ ex.setErrorCode(code);
ex.setDescription(message);
((ServerSession)session).invoke(ex);
((ServerSession)session).close();
}
-
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index d24ad46512..430a4bd9e9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -361,8 +361,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
if (!nowait)
{
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
}
@@ -382,9 +381,21 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
flushTask = null;
}
flushAcknowledgments();
- getQpidSession().sync();
- getQpidSession().close();
- getCurrentException();
+ try
+ {
+ getQpidSession().sync();
+ getQpidSession().close();
+ }
+ catch (SessionException se)
+ {
+ setCurrentException(se);
+ }
+
+ AMQException amqe = getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
}
@@ -403,7 +414,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getQpidSession().setAutoSync(false);
}
// We need to sync so that we get notify of an error.
- getCurrentException();
+ sync();
}
/**
@@ -426,8 +437,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
autoDelete ? Option.AUTO_DELETE : Option.NONE,
exclusive ? Option.EXCLUSIVE : Option.NONE);
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
/**
@@ -451,8 +461,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
@@ -566,7 +575,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
try
{
boolean isTopic;
-
+
if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL)
{
isTopic = consumer.getDestination() instanceof AMQTopic ||
@@ -583,7 +592,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
(isTopic || consumer.getMessageSelector() == null ||
consumer.getMessageSelector().equals(""));
}
-
+
getQpidSession().messageSubscribe
(queueName.toString(), String.valueOf(tag),
getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
@@ -607,7 +616,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
Option.UNRELIABLE);
-
+
if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch))
{
// set the flow
@@ -619,11 +628,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
if (!nowait)
{
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
}
-
+
private long getCapacity(AMQDestination destination)
{
long capacity = 0;
@@ -677,8 +685,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// We need to sync so that we get notify of an error.
if (!nowait)
{
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
}
@@ -710,7 +717,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
queueName = amqd.getAMQQueueName();
}
-
+
if (amqd.getDestSyntax() == DestSyntax.BURL)
{
Map<String,Object> arguments = new HashMap<String,Object>();
@@ -718,7 +725,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
arguments.put("no-local", true);
}
-
+
getQpidSession().queueDeclare(queueName.toString(), "" , arguments,
amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
amqd.isDurable() ? Option.DURABLE : Option.NONE,
@@ -733,13 +740,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
node.isDurable() ? Option.DURABLE : Option.NONE,
node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
-
+
// passive --> false
if (!nowait)
{
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
return queueName;
}
@@ -753,8 +759,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// ifEmpty --> false
// ifUnused --> false
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
/**
@@ -807,8 +812,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
@@ -816,8 +820,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
getQpidSession().txRollback();
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
//------ Private methods
@@ -835,19 +838,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
/**
* Get the latest thrown exception.
*
- * @throws org.apache.qpid.AMQException get the latest thrown error.
+ * @throws SessionException get the latest thrown error.
*/
- public void getCurrentException() throws AMQException
+ public AMQException getCurrentException()
{
+ AMQException amqe = null;
synchronized (_currentExceptionLock)
{
if (_currentException != null)
{
- AMQException amqe = _currentException;
+ amqe = _currentException;
_currentException = null;
- throw amqe;
}
}
+ return amqe;
}
public void opened(Session ssn) {}
@@ -872,22 +876,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void exception(Session ssn, SessionException exc)
{
- synchronized (_currentExceptionLock)
- {
- ExecutionException ee = exc.getException();
- int code;
- if (ee == null)
- {
- code = AMQConstant.INTERNAL_ERROR.getCode();
- }
- else
- {
- code = ee.getErrorCode().getValue();
- }
- AMQException amqe = new AMQException(AMQConstant.getConstant(code), exc.getMessage(), exc.getCause());
- _connection.exceptionReceived(amqe);
- _currentException = amqe;
- }
+ setCurrentException(exc);
}
public void closed(Session ssn) {}
@@ -1041,11 +1030,41 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
return Serial.lt((int) currentMark, (int) deliveryTag);
}
-
+
public void sync() throws AMQException
{
- _qpidSession.sync();
- getCurrentException();
+ try
+ {
+ getQpidSession().sync();
+ }
+ catch (SessionException se)
+ {
+ setCurrentException(se);
+ }
+
+ AMQException amqe = getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
+ }
+
+ public void setCurrentException(SessionException se)
+ {
+ synchronized (_currentExceptionLock)
+ {
+ ExecutionException ee = se.getException();
+ int code = AMQConstant.INTERNAL_ERROR.getCode();
+ if (ee != null)
+ {
+ code = ee.getErrorCode().getValue();
+ }
+ AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause());
+
+ _connection.exceptionReceived(amqe);
+
+ _currentException = amqe;
+ }
}
public AMQMessageDelegateFactory getMessageDelegateFactory()
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index a942d808a9..eddaa1a6bb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -168,16 +168,26 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
*/
@Override void sendCancel() throws AMQException
{
- ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString());
- ((AMQSession_0_10) getSession()).getQpidSession().sync();
- // confirm cancel
- getSession().confirmConsumerCancelled(getConsumerTag());
- ((AMQSession_0_10) getSession()).getCurrentException();
+ _0_10session.getQpidSession().messageCancel(getConsumerTagString());
+ try
+ {
+ _0_10session.getQpidSession().sync();
+ getSession().confirmConsumerCancelled(getConsumerTag()); // confirm cancel
+ }
+ catch (SessionException se)
+ {
+ _0_10session.setCurrentException(se);
+ }
+
+ AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
}
@Override void notifyMessage(UnprocessedMessage_0_10 messageFrame)
{
-
super.notifyMessage(messageFrame);
}
@@ -285,7 +295,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_0_10session.messageAcknowledge
(ranges,
_acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
- _0_10session.getCurrentException();
+
+ AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
}
}
@@ -302,7 +317,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
RangeSet ranges = new RangeSet();
ranges.add((int) message.getDeliveryTag());
_0_10session.getQpidSession().messageRelease(ranges);
- _0_10session.getCurrentException();
+ _0_10session.sync();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index df59be25d0..14e1601993 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -266,7 +266,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
return _destination;
}
- public void close() throws JMSException
+ public void close()
{
_closed.set(true);
_session.deregisterProducer(_producerId);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index ed6f00a51c..13b8e461d4 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -56,7 +56,7 @@ public class Connection extends ConnectionInvoker
implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
{
- private static final Logger log = Logger.get(Connection.class);
+ protected static final Logger log = Logger.get(Connection.class);
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
index 29389df99a..88dd2d6afa 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
@@ -99,5 +99,4 @@ public abstract class ConnectionDelegate
ssn.closed();
}
}
-
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 5e40527c2f..9b84ff422b 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -280,7 +280,7 @@ public class Session extends SessionInvoker
{
if (m != null)
{
- System.out.println(m);
+ log.debug("%s", m);
}
}
}
@@ -732,8 +732,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
{
- log.debug("%s waiting for[%d]: %d, %s", this, point,
- maxComplete, commands);
+ log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands);
w.await();
}
@@ -741,16 +740,23 @@ public class Session extends SessionInvoker
{
if (state != CLOSED)
{
- throw new SessionException
- (String.format
- ("timed out waiting for sync: complete = %s, point = %s", maxComplete, point));
+ throw new SessionException(
+ String.format("timed out waiting for sync: complete = %s, point = %s",
+ maxComplete, point));
+ }
+ else
+ {
+ ExecutionException ee = getException();
+ if (ee != null)
+ {
+ throw new SessionException(ee);
+ }
}
}
}
}
- private Map<Integer,ResultFuture<?>> results =
- new HashMap<Integer,ResultFuture<?>>();
+ private Map<Integer,ResultFuture<?>> results = new HashMap<Integer,ResultFuture<?>>();
private ExecutionException exception = null;
void result(int command, Struct result)
@@ -769,9 +775,8 @@ public class Session extends SessionInvoker
{
if (exception != null)
{
- throw new IllegalStateException
- (String.format
- ("too many exceptions: %s, %s", exception, exc));
+ throw new IllegalStateException(
+ String.format("too many exceptions: %s, %s", exception, exc));
}
exception = exc;
}
@@ -849,8 +854,8 @@ public class Session extends SessionInvoker
}
else
{
- throw new SessionException
- (String.format("%s timed out waiting for result: %s",
+ throw new SessionException(
+ String.format("%s timed out waiting for result: %s",
Session.this, this));
}
}
@@ -961,5 +966,4 @@ public class Session extends SessionInvoker
{
return String.format("ssn:%s", name);
}
-
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index 15539c1d07..5d8e4d5565 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -33,11 +33,15 @@ public class SessionDelegate
extends MethodDelegate<Session>
implements ProtocolDelegate<Session>
{
- private static final Logger log = Logger.get(SessionDelegate.class);
+ protected static final Logger log = Logger.get(SessionDelegate.class);
- public void init(Session ssn, ProtocolHeader hdr) { }
+ public void init(Session ssn, ProtocolHeader hdr)
+ {
+ log.warn("INIT: [%s] %s", ssn, hdr);
+ }
- public void control(Session ssn, Method method) {
+ public void control(Session ssn, Method method)
+ {
method.dispatch(ssn, this);
}
@@ -50,7 +54,10 @@ public class SessionDelegate
}
}
- public void error(Session ssn, ProtocolError error) { }
+ public void error(Session ssn, ProtocolError error)
+ {
+ log.warn("ERROR: [%s] %s", ssn, error);
+ }
public void handle(Session ssn, Method method)
{
@@ -195,9 +202,11 @@ public class SessionDelegate
public void closed(Session session)
{
+ log.warn("CLOSED: [%s]", session);
}
public void detached(Session session)
- {
+ {
+ log.warn("DETACHED: [%s]", session);
}
}
diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
index bdd3a0c93b..375a326654 100644
--- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
@@ -424,10 +424,6 @@ public class ConnectionTest extends QpidTestCase implements SessionListener
}
}
- /**
- * The 0-10 {@code executionSync} command should set the exception status in the session,
- * so that the client session object can then throw it as an {@link AMQException}.
- */
public void testExecutionExceptionSync() throws Exception
{
startServer();
@@ -436,11 +432,15 @@ public class ConnectionTest extends QpidTestCase implements SessionListener
conn.connect("localhost", port, null, "guest", "guest");
Session ssn = conn.createSession();
send(ssn, "EXCP 0", true);
- ExecutionException before = ssn.getException();
- assertNull("There should not be an exception stored in the session", before);
- ssn.sync();
- ExecutionException after = ssn.getException();
- assertNotNull("There should be an exception stored in the session", after);
+ try
+ {
+ ssn.sync();
+ fail("this should have failed");
+ }
+ catch (SessionException exc)
+ {
+ assertNotNull(exc.getException());
+ }
}
}