summaryrefslogtreecommitdiff
path: root/java/client/src/test
diff options
context:
space:
mode:
authorRupert Smith <rupertlssmith@apache.org>2007-06-01 14:33:07 +0000
committerRupert Smith <rupertlssmith@apache.org>2007-06-01 14:33:07 +0000
commit3b5d4734b777b54b52ce2710f404143aca8c5c6e (patch)
treed436e7a5239ec6be725852c12e7ccae975892745 /java/client/src/test
parent566e08caa331629a15bedca1d8cfc896886b0497 (diff)
downloadqpid-python-3b5d4734b777b54b52ce2710f404143aca8c5c6e.tar.gz
QPID-402: FailoverException falling through to client. All blocking operations now wrapped in failover support wrappers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@543496 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/test')
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java136
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java74
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java31
-rw-r--r--java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java28
4 files changed, 126 insertions, 143 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
index 51bbe7d0e6..c201e88104 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
@@ -14,42 +14,43 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.test.unit.client.channelclose;
import junit.framework.TestCase;
-import javax.jms.Connection;
-import javax.jms.Session;
-
-import javax.jms.JMSException;
-import javax.jms.ExceptionListener;
-import javax.jms.MessageProducer;
-import javax.jms.MessageConsumer;
-import javax.jms.Message;
-import javax.jms.TextMessage;
-import javax.jms.Queue;
+import org.apache.log4j.Logger;
-import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQTimeoutException;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.jms.ConnectionListener;
-import org.apache.log4j.Logger;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
public class ChannelCloseTest extends TestCase implements ExceptionListener, ConnectionListener
{
@@ -73,15 +74,14 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
TransportConnection.killAllVMBrokers();
}
-
/*
close channel, use chanel with same id ensure error.
- */
- public void testReusingChannelAfterFullClosure()
+ */
+ public void testReusingChannelAfterFullClosure() throws Exception
{
_connection = newConnection();
- //Create Producer
+ // Create Producer
try
{
_connection.start();
@@ -113,6 +113,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
{
_logger.info("Exception occured was:" + e.getErrorCode());
}
+
assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode());
_connection = newConnection();
@@ -134,29 +135,27 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
/*
close channel and send guff then send ok no errors
*/
- public void testSendingMethodsAfterClose()
+ public void testSendingMethodsAfterClose() throws Exception
{
try
{
- _connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='"
- + _brokerlist + "'");
+ _connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'");
((AMQConnection) _connection).setConnectionListener(this);
-
_connection.setExceptionListener(this);
- //Change the StateManager for one that doesn't respond with Close-OKs
+ // Change the StateManager for one that doesn't respond with Close-OKs
AMQStateManager oldStateManager = ((AMQConnection) _connection).getProtocolHandler().getStateManager();
_session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
_connection.start();
- //Test connection
+ // Test connection
checkSendingMessage();
- //Set StateManager to manager that ignores Close-oks
+ // Set StateManager to manager that ignores Close-oks
AMQProtocolSession protocolSession = ((AMQConnection) _connection).getProtocolHandler().getProtocolSession();
AMQStateManager newStateManager = new NoCloseOKStateManager(protocolSession);
newStateManager.changeState(oldStateManager.getCurrentState());
@@ -214,7 +213,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
createChannelAndTest(TEST_CHANNEL);
- //Test connection is still ok
+ // Test connection is still ok
checkSendingMessage();
@@ -248,9 +247,9 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
}
}
- private void createChannelAndTest(int channel)
+ private void createChannelAndTest(int channel) throws FailoverException
{
- //Create A channel
+ // Create A channel
try
{
createChannel(channel);
@@ -274,14 +273,14 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
private void sendClose(int channel)
{
- AMQFrame frame = ChannelCloseOkBody.createAMQFrame(channel,
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion());
+ AMQFrame frame =
+ ChannelCloseOkBody.createAMQFrame(channel,
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion());
((AMQConnection) _connection).getProtocolHandler().writeFrame(frame);
}
-
private void checkSendingMessage() throws JMSException
{
TEST++;
@@ -307,8 +306,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
AMQConnection connection = null;
try
{
- connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='"
- + _brokerlist + "'");
+ connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'");
connection.setConnectionListener(this);
@@ -330,24 +328,24 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
fail("Creating new connection when:" + e.getMessage());
}
-
return connection;
}
- private void declareExchange(int channelId, String _type, String _name, boolean nowait) throws AMQException
+ private void declareExchange(int channelId, String _type, String _name, boolean nowait)
+ throws AMQException, FailoverException
{
- AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(channelId,
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(),
- null, // arguments
- false, // autoDelete
- false, // durable
- new AMQShortString(_name), // exchange
- false, // internal
- nowait, // nowait
- true, // passive
- 0, // ticket
- new AMQShortString(_type)); // type
+ AMQFrame exchangeDeclare =
+ ExchangeDeclareBody.createAMQFrame(channelId,
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null, // arguments
+ false, // autoDelete
+ false, // durable
+ new AMQShortString(_name), // exchange
+ false, // internal
+ nowait, // nowait
+ true, // passive
+ 0, // ticket
+ new AMQShortString(_type)); // type
if (nowait)
{
@@ -355,36 +353,31 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
}
else
{
- ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT);
+ ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class,
+ SYNC_TIMEOUT);
}
}
- private void createChannel(int channelId) throws AMQException
+ private void createChannel(int channelId) throws AMQException, FailoverException
{
- ((AMQConnection) _connection).getProtocolHandler().syncWrite(
- ChannelOpenBody.createAMQFrame(channelId,
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(),
- null), // outOfBand
- ChannelOpenOkBody.class);
+ ((AMQConnection) _connection).getProtocolHandler().syncWrite(ChannelOpenBody.createAMQFrame(channelId,
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null), // outOfBand
+ ChannelOpenOkBody.class);
}
-
public void onException(JMSException jmsException)
{
- //_logger.info("CCT" + jmsException);
+ // _logger.info("CCT" + jmsException);
fail(jmsException.getMessage());
}
public void bytesSent(long count)
- {
- }
+ { }
public void bytesReceived(long count)
- {
-
- }
+ { }
public boolean preFailover(boolean redirect)
{
@@ -397,6 +390,5 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
}
public void failoverComplete()
- {
- }
+ { }
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
index d52707d965..58ac8294f2 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -22,29 +22,29 @@ package org.apache.qpid.test.unit.close;
import junit.framework.TestCase;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.testutil.QpidClientConnection;
+import org.apache.qpid.url.URLSyntaxException;
-import javax.jms.ExceptionListener;
-import javax.jms.Session;
import javax.jms.Connection;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.MessageProducer;
import javax.jms.Message;
-import javax.jms.TextMessage;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.testutil.QpidClientConnection;
-import org.apache.log4j.Logger;
-import org.apache.log4j.Level;
+import java.util.concurrent.atomic.AtomicInteger;
public class MessageRequeueTest extends TestCase
{
@@ -86,7 +86,7 @@ public class MessageRequeueTest extends TestCase
{
super.tearDown();
- if (!passed) // clean up
+ if (!passed) // clean up
{
QpidClientConnection conn = new QpidClientConnection(BROKER);
@@ -96,6 +96,7 @@ public class MessageRequeueTest extends TestCase
conn.disconnect();
}
+
TransportConnection.killVMBroker(1);
}
@@ -117,7 +118,7 @@ public class MessageRequeueTest extends TestCase
final MessageConsumer consumer = conn.getSession().createConsumer(q);
int messagesReceived = 0;
- long messageLog[] = new long[numTestMessages + 1];
+ long[] messageLog = new long[numTestMessages + 1];
_logger.info("consuming...");
Message msg = consumer.receive(1000);
@@ -130,15 +131,13 @@ public class MessageRequeueTest extends TestCase
int msgindex = msg.getIntProperty("index");
if (messageLog[msgindex] != 0)
{
- _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() +
- ") more than once.");
+ _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag()
+ + ") more than once.");
}
if (_logger.isInfoEnabled())
{
- _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " +
- "DT:" + dt +
- "IN:" + msgindex);
+ _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt + "IN:" + msgindex);
}
if (dt == 0)
@@ -148,7 +147,7 @@ public class MessageRequeueTest extends TestCase
messageLog[msgindex] = dt;
- //get Next message
+ // get Next message
msg = consumer.receive(1000);
}
@@ -163,7 +162,7 @@ public class MessageRequeueTest extends TestCase
for (long b : messageLog)
{
- if (b == 0 && index != 0) //delivery tag of zero shouldn't exist
+ if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist
{
_logger.error("Index: " + index + " was not received.");
list.append(" ");
@@ -175,6 +174,7 @@ public class MessageRequeueTest extends TestCase
index++;
}
+
assertEquals(list.toString(), 0, failed);
_logger.info("consumed: " + messagesReceived);
conn.disconnect();
@@ -199,7 +199,7 @@ public class MessageRequeueTest extends TestCase
t1.start();
t2.start();
t3.start();
-// t4.start();
+ // t4.start();
try
{
@@ -228,7 +228,7 @@ public class MessageRequeueTest extends TestCase
for (long b : receieved)
{
- if (b == 0 && index != 0) //delivery tag of zero shouldn't exist (and we don't have msg 0)
+ if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist (and we don't have msg 0)
{
_logger.error("Index: " + index + " was not received.");
list.append(" ");
@@ -237,8 +237,10 @@ public class MessageRequeueTest extends TestCase
list.append(b);
failed++;
}
+
index++;
}
+
assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed);
assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed);
passed = true;
@@ -278,15 +280,14 @@ public class MessageRequeueTest extends TestCase
int msgindex = result.getIntProperty("index");
if (receieved[msgindex] != 0)
{
- _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) result).getDeliveryTag() +
- ") more than once.");
+ _logger.error("Received Message(" + msgindex + ":"
+ + ((AbstractJMSMessage) result).getDeliveryTag() + ") more than once.");
}
if (_logger.isInfoEnabled())
{
- _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " +
- "DT:" + dt +
- "IN:" + msgindex);
+ _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt
+ + "IN:" + msgindex);
}
if (dt == 0)
@@ -297,9 +298,8 @@ public class MessageRequeueTest extends TestCase
receieved[msgindex] = dt;
}
-
count++;
- if (count % 100 == 0)
+ if ((count % 100) == 0)
{
_logger.info("consumer-" + id + ": got " + result + ", new count is " + count);
}
@@ -328,11 +328,10 @@ public class MessageRequeueTest extends TestCase
}
}
-
public void testRequeue() throws JMSException, AMQException, URLSyntaxException
{
int run = 0;
-// while (run < 10)
+ // while (run < 10)
{
run++;
@@ -359,7 +358,6 @@ public class MessageRequeueTest extends TestCase
assertNotNull("Message should not be null", msg);
-
// As we have not ack'd message will be requeued.
_logger.debug("Close Consumer");
consumer.close();
@@ -369,4 +367,4 @@ public class MessageRequeueTest extends TestCase
}
}
-} \ No newline at end of file
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index 0e718da19b..8d96977df2 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -21,18 +21,20 @@
package org.apache.qpid.test.unit.transacted;
import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.AMQException;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.log4j.Logger;
-import javax.jms.Session;
-import javax.jms.MessageProducer;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
import javax.jms.TextMessage;
/**
@@ -62,10 +64,10 @@ public class CommitRollbackTest extends TestCase
{
TransportConnection.createVMBroker(1);
}
+
testMethod++;
queue += testMethod;
-
newConnection();
}
@@ -106,7 +108,6 @@ public class CommitRollbackTest extends TestCase
assertTrue("session is not transacted", _session.getTransacted());
assertTrue("session is not transacted", _pubSession.getTransacted());
-
_logger.info("sending test message");
String MESSAGE_TEXT = "testPutThenDisconnect";
_publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
@@ -119,7 +120,7 @@ public class CommitRollbackTest extends TestCase
_logger.info("receiving result");
Message result = _consumer.receive(1000);
- //commit to ensure message is removed from queue
+ // commit to ensure message is removed from queue
_session.commit();
assertNull("test message was put and disconnected before commit, but is still present", result);
@@ -135,7 +136,6 @@ public class CommitRollbackTest extends TestCase
assertTrue("session is not transacted", _session.getTransacted());
assertTrue("session is not transacted", _pubSession.getTransacted());
-
_logger.info("sending test message");
String MESSAGE_TEXT = "testPutThenDisconnect";
_publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
@@ -151,7 +151,7 @@ public class CommitRollbackTest extends TestCase
_logger.info("receiving result");
Message result = _consumer.receive(1000);
- //commit to ensure message is removed from queue
+ // commit to ensure message is removed from queue
_session.commit();
assertNull("test message was put and disconnected before commit, but is still present", result);
@@ -168,7 +168,6 @@ public class CommitRollbackTest extends TestCase
assertTrue("session is not transacted", _session.getTransacted());
assertTrue("session is not transacted", _pubSession.getTransacted());
-
_logger.info("sending test message");
String MESSAGE_TEXT = "testPutThenRollback";
_publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
@@ -335,13 +334,12 @@ public class CommitRollbackTest extends TestCase
assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
}
-
/**
* Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order
*
* @throws Exception On error
*/
- public void testSend2ThenRollback() throws Exception
+ /*public void testSend2ThenRollback() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
assertTrue("session is not transacted", _pubSession.getTransacted());
@@ -391,7 +389,7 @@ public class CommitRollbackTest extends TestCase
}
assertNull("test message should be null", result);
- }
+ }*/
public void testSend2ThenCloseAfter1andTryAgain() throws Exception
{
@@ -428,7 +426,7 @@ public class CommitRollbackTest extends TestCase
{
assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
}
- else // or it will be msg 2 arriving the first time due to latency.
+ else // or it will be msg 2 arriving the first time due to latency.
{
_logger.info("Message 2 wasn't prefetched so wasn't rejected");
assertEquals("2", ((TextMessage) result).getText());
@@ -445,7 +443,6 @@ public class CommitRollbackTest extends TestCase
}
-
public void testPutThenRollbackThenGet() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
index 195ed79dab..d52da06f76 100644
--- a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
+++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
@@ -1,25 +1,25 @@
package org.apache.qpid.testutil;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.log4j.Logger;
-import javax.jms.ExceptionListener;
-import javax.jms.Session;
import javax.jms.Connection;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.MessageProducer;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
import javax.jms.TextMessage;
public class QpidClientConnection implements ExceptionListener
{
-
private static final Logger _logger = Logger.getLogger(QpidClientConnection.class);
private boolean transacted = true;
@@ -40,17 +40,16 @@ public class QpidClientConnection implements ExceptionListener
setPrefetch(5000);
}
-
public void connect() throws JMSException
{
if (!connected)
{
/*
- * amqp://[user:pass@][clientid]/virtualhost?
- * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
- * [&failover='method[?option='value'[&option='value']]']
- * [&option='value']"
- */
+ * amqp://[user:pass@][clientid]/virtualhost?
+ * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+ * [&failover='method[?option='value'[&option='value']]']
+ * [&option='value']"
+ */
String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
try
{
@@ -63,7 +62,6 @@ public class QpidClientConnection implements ExceptionListener
session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
-
_logger.info("starting connection");
connection.start();
@@ -124,7 +122,6 @@ public class QpidClientConnection implements ExceptionListener
this.prefetch = prefetch;
}
-
/** override as necessary */
public void onException(JMSException exception)
{
@@ -266,4 +263,3 @@ public class QpidClientConnection implements ExceptionListener
_logger.info("consumed: " + messagesReceived);
}
}
-