/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.qpid.client; import java.util.ArrayList; import java.util.List; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; import junit.framework.TestCase; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.transport.Binary; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.Connection.SessionFactory; import org.apache.qpid.transport.Connection.State; import org.apache.qpid.transport.ExchangeBound; import org.apache.qpid.transport.ExchangeBoundResult; import org.apache.qpid.transport.ExchangeDeclare; import org.apache.qpid.transport.ExchangeDelete; import org.apache.qpid.transport.ExchangeQuery; import org.apache.qpid.transport.ExchangeQueryResult; import org.apache.qpid.transport.ExecutionErrorCode; import org.apache.qpid.transport.ExecutionException; import org.apache.qpid.transport.ExecutionResult; import org.apache.qpid.transport.ExecutionSync; import org.apache.qpid.transport.Future; import org.apache.qpid.transport.MessageCancel; import org.apache.qpid.transport.MessageFlow; import org.apache.qpid.transport.MessageRelease; import org.apache.qpid.transport.MessageSubscribe; import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.Option; import org.apache.qpid.transport.ProtocolEvent; import org.apache.qpid.transport.QueueDelete; import org.apache.qpid.transport.QueueQuery; import org.apache.qpid.transport.QueueQueryResult; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionAttach; import org.apache.qpid.transport.SessionDelegate; import org.apache.qpid.transport.SessionDetach; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.SessionRequestTimeout; import org.apache.qpid.transport.TxCommit; import org.apache.qpid.transport.TxRollback; import org.apache.qpid.transport.TxSelect; /** * Tests AMQSession_0_10 methods. *
* The main purpose of the tests in this test suite is to check that
* {@link SessionException} is not thrown from methods of
* {@link AMQSession_0_10}.
*/
public class AMQSession_0_10Test extends TestCase
{
public void testExceptionOnCommit()
{
AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
try
{
session.commit();
fail("JMSException should be thrown");
}
catch (Exception e)
{
assertTrue("JMSException is expected", e instanceof JMSException);
assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
}
}
public void testExceptionOnCreateMessageProducer()
{
AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
try
{
session.createMessageProducer(createDestination(), true, true, true, 1l);
fail("JMSException should be thrown");
}
catch (Exception e)
{
assertTrue("JMSException is expected but got:" + e, e instanceof JMSException);
assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
}
}
public void testExceptionOnRollback()
{
AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
try
{
session.rollback();
fail("JMSException should be thrown");
}
catch (Exception e)
{
assertTrue("JMSException is expected", e instanceof JMSException);
}
}
public void testExceptionOnRecover()
{
AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(javax.jms.Session.AUTO_ACKNOWLEDGE);
try
{
session.recover();
fail("JMSException should be thrown");
}
catch (Exception e)
{
assertTrue("JMSException is expected", e instanceof JMSException);
}
}
public void testExceptionOnCreateBrowser()
{
AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
AMQQueue destination = createQueue();
try
{
session.createBrowser(destination);
fail("JMSException should be thrown");
}
catch (Exception e)
{
assertTrue("JMSException is expected", e instanceof JMSException);
assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
}
}
public void testExceptionOnCreateConsumer()
{
AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
AMQAnyDestination destination = createDestination();
try
{
session.createConsumer(destination);
fail("JMSException should be thrown");
}
catch (Exception e)
{
assertTrue("JMSException is expected", e instanceof JMSException);
assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
}
}
public void testExceptionOnCreateSubscriber()
{
AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
AMQAnyDestination destination = createDestination();
try
{
session.createSubscriber(destination);
fail("JMSException should be thrown");
}
catch (Exception e)
{
assertTrue("JMSException is expected", e instanceof JMSException);
assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
}
}
public void testExceptionOnUnsubscribe()
{
AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
try
{
session.unsubscribe("whatever");
fail("JMSExceptiuon should be thrown");
}
catch (Exception e)
{
assertTrue("JMSException is expected", e instanceof JMSException);
assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
}
}
public void testCommit()
{
AMQSession_0_10 session = createAMQSession_0_10();
try
{
session.commit();
}
catch (Exception e)
{
fail("Unexpected exception is cought:" + e.getMessage());
}
ProtocolEvent event = findSentProtocolEventOfClass(session, TxCommit.class, false);
assertNotNull("TxCommit was not sent", event);
}
public void testRollback()
{
AMQSession_0_10 session = createAMQSession_0_10();
try
{
session.rollback();
}
catch (Exception e)
{
fail("Unexpected exception is cought:" + e.getMessage());
}
ProtocolEvent event = findSentProtocolEventOfClass(session, TxRollback.class, false);
assertNotNull("TxRollback was not sent", event);
}
public void testRecover()
{
AMQSession_0_10 session = createAMQSession_0_10(javax.jms.Session.AUTO_ACKNOWLEDGE);
try
{
session.recover();
}
catch (Exception e)
{
fail("Unexpected exception is cought:" + e.getMessage());
}
ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false);
assertNotNull("MessageRelease was not sent", event);
}
public void testCreateProducer()
{
AMQSession_0_10 session = createAMQSession_0_10();
try
{
session.createProducer(createQueue());
}
catch (Exception e)
{
fail("Unexpected exception is cought:" + e.getMessage());
}
ProtocolEvent event = findSentProtocolEventOfClass(session, ExchangeDeclare.class, false);
assertNotNull("ExchangeDeclare was not sent", event);
}
public void testCreateConsumer()
{
AMQSession_0_10 session = createAMQSession_0_10();
try
{
session.createConsumer(createQueue());
}
catch (Exception e)
{
fail("Unexpected exception is cought:" + e.getMessage());
}
ProtocolEvent event = findSentProtocolEventOfClass(session, MessageSubscribe.class, false);
assertNotNull("MessageSubscribe was not sent", event);
}
public void testSync()
{
AMQSession_0_10 session = createAMQSession_0_10();
try
{
session.sync();
}
catch (Exception e)
{
fail("Unexpected exception is cought:" + e.getMessage());
}
ProtocolEvent event = findSentProtocolEventOfClass(session, ExecutionSync.class, false);
assertNotNull("ExecutionSync was not sent", event);
}
public void testRejectMessage()
{
AMQSession_0_10 session = createAMQSession_0_10();
session.rejectMessage(1l, true);
ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false);
assertNotNull("MessageRelease event was not sent", event);
}
public void testReleaseForRollback()
{
AMQSession_0_10 session = createAMQSession_0_10();
try
{
session.releaseForRollback();
}
catch (Exception e)
{
fail("Unexpected exception is cought:" + e.getMessage());
}
ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false);
assertNotNull("MessageRelease event was not sent", event);
}
public void testSendQueueDelete()
{
AMQSession_0_10 session = createAMQSession_0_10();
try
{
session.sendQueueDelete(new AMQShortString("test"));
}
catch (Exception e)
{
fail("Unexpected exception is cought:" + e.getMessage());
}
ProtocolEvent event = findSentProtocolEventOfClass(session, QueueDelete.class, false);
assertNotNull("QueueDelete event was not sent", event);
QueueDelete exchangeDelete = (QueueDelete) event;
assertEquals("test", exchangeDelete.getQueue());
}
public void testSendConsume()
{
AMQSession_0_10 session = createAMQSession_0_10();
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
null, new FieldTable(), false, true);
session.sendConsume(consumer, new AMQShortString("test"), null, true, null, 1);
}
catch (Exception e)
{
fail("Unexpected exception is cought:" + e.getMessage());
}
ProtocolEvent event = findSentProtocolEventOfClass(session, MessageSubscribe.class, false);
assertNotNull("MessageSubscribe event was not sent", event);
}
public void testCreateMessageProducer()
{
AMQSession_0_10 session = createAMQSession_0_10();
try
{
session.createMessageProducer(createDestination(), true, true, true, 1l);
}
catch (Exception e)
{
fail("Unexpected exception is cought:" + e.getMessage());
}
ProtocolEvent event = findSentProtocolEventOfClass(session, ExchangeDeclare.class, false);
assertNotNull("ExchangeDeclare event was not sent", event);
}
public void testSendExchangeDelete()
{
AMQSession_0_10 session = createAMQSession_0_10();
try
{
session.sendExchangeDelete("test", true);
}
catch (Exception e)
{
fail("Unexpected exception is cought:" + e.getMessage());
}
ProtocolEvent event = findSentProtocolEventOfClass(session, ExchangeDelete.class, false);
assertNotNull("ExchangeDelete event was not sent", event);
ExchangeDelete exchangeDelete = (ExchangeDelete) event;
assertEquals("test", exchangeDelete.getExchange());
}
public void testExceptionOnMessageConsumerReceive()
{
AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
null, new FieldTable(), false, true);
session.start();
consumer.receive(1);
fail("JMSException should be thrown");
}
catch (Exception e)
{
assertTrue("JMSException is expected", e instanceof JMSException);
assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
}
}
public void testMessageConsumerReceive()
{
AMQSession_0_10 session = createAMQSession_0_10();
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
null, new FieldTable(), false, true);
session.start();
consumer.receive(1);
}
catch (Exception e)
{
fail("Unexpected exception is cought:" + e.getMessage());
}
ProtocolEvent event = findSentProtocolEventOfClass(session, MessageFlow.class, false);
assertNotNull("MessageFlow event was not sent", event);
}
public void testExceptionOnMessageConsumerReceiveNoWait()
{
AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
null, new FieldTable(), false, true);
session.start();
consumer.receiveNoWait();
fail("JMSException should be thrown");
}
catch (Exception e)
{
assertTrue("JMSException is expected", e instanceof JMSException);
assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
}
}
public void testExceptionOnMessageConsumerSetMessageListener()
{
AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
null, new FieldTable(), false, true);
consumer.setMessageListener(new MockMessageListener());
fail("JMSException should be thrown");
}
catch (Exception e)
{
assertTrue("JMSException is expected", e instanceof JMSException);
assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
}
}
public void testMessageConsumerSetMessageListener()
{
AMQSession_0_10 session = createAMQSession_0_10();
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
null, new FieldTable(), false, true);
consumer.setMessageListener(new MockMessageListener());
}
catch (Exception e)
{
fail("Unexpected exception is cought:" + e.getMessage());
}
ProtocolEvent event = findSentProtocolEventOfClass(session, MessageFlow.class, false);
assertNotNull("MessageFlow event was not sent", event);
}
public void testMessageConsumerClose()
{
AMQSession_0_10 session = createAMQSession_0_10();
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
null, new FieldTable(), false, true);
consumer.close();
}
catch (Exception e)
{
fail("Unexpected exception is cought:" + e.getMessage());
}
ProtocolEvent event = findSentProtocolEventOfClass(session, MessageCancel.class, false);
assertNotNull("MessageCancel event was not sent", event);
}
public void testExceptionOnMessageConsumerClose()
{
AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
null, new FieldTable(), false, true);
consumer.close();
fail("JMSException should be thrown");
}
catch (Exception e)
{
assertTrue("JMSException is expected", e instanceof JMSException);
assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode());
}
}
public void testMessageProducerSend()
{
AMQSession_0_10 session = createAMQSession_0_10();
try
{
MessageProducer producer = session.createProducer(createQueue());
producer.send(session.createTextMessage("Test"));
session.commit();
}
catch (Exception e)
{
fail("Unexpected exception is cought:" + e.getMessage());
}
ProtocolEvent event = findSentProtocolEventOfClass(session, MessageTransfer.class, false);
assertNotNull("MessageTransfer event was not sent", event);
event = findSentProtocolEventOfClass(session, ExchangeDeclare.class, false);
assertNotNull("ExchangeDeclare event was not sent", event);
}
private AMQAnyDestination createDestination()
{
AMQAnyDestination destination = null;
try
{
destination = new AMQAnyDestination(new AMQShortString("amq.direct"), new AMQShortString("direct"),
new AMQShortString("test"), false, true, new AMQShortString("test"), true, null);
}
catch (Exception e)
{
fail("Failued to create destination:" + e.getMessage());
}
return destination;
}
private AMQQueue createQueue()
{
AMQQueue destination = null;
try
{
destination = new AMQQueue(new AMQShortString("amq.direct"), new AMQShortString("test"),
new AMQShortString("test"));
}
catch (Exception e)
{
fail("Failued to create destination:" + e.getMessage());
}
return destination;
}
private AMQSession_0_10 createThrowingExceptionAMQSession_0_10()
{
return createAMQSession_0_10(true, javax.jms.Session.SESSION_TRANSACTED);
}
private AMQSession_0_10 createThrowingExceptionAMQSession_0_10(int akcnowledgeMode)
{
return createAMQSession_0_10(true, akcnowledgeMode);
}
private ProtocolEvent findSentProtocolEventOfClass(AMQSession_0_10 session, Class extends ProtocolEvent> class1,
boolean isLast)
{
ProtocolEvent found = null;
List