/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.qpid.amqp_1_0.jms.impl; import org.apache.qpid.amqp_1_0.client.AcknowledgeMode; import org.apache.qpid.amqp_1_0.client.Message; import org.apache.qpid.amqp_1_0.client.Receiver; import org.apache.qpid.amqp_1_0.client.Transaction; import org.apache.qpid.amqp_1_0.jms.MessageConsumer; import org.apache.qpid.amqp_1_0.jms.QueueReceiver; import org.apache.qpid.amqp_1_0.jms.Queue; import org.apache.qpid.amqp_1_0.jms.Session; import org.apache.qpid.amqp_1_0.jms.TemporaryDestination; import org.apache.qpid.amqp_1_0.jms.Topic; import org.apache.qpid.amqp_1_0.jms.TopicSubscriber; import org.apache.qpid.amqp_1_0.type.AmqpErrorException; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.Symbol; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; import org.apache.qpid.amqp_1_0.type.messaging.Filter; import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter; import org.apache.qpid.amqp_1_0.type.messaging.Modified; import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter; import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.Error; import javax.jms.Destination; import javax.jms.IllegalStateException; import javax.jms.InvalidDestinationException; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.MessageListener; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, TopicSubscriber { private static final Symbol NO_LOCAL_FILTER_NAME = Symbol.valueOf("no-local"); private static final Symbol JMS_SELECTOR_FILTER_NAME = Symbol.valueOf("jms-selector"); private String _selector; private boolean _noLocal; private DestinationImpl _destination; private SessionImpl _session; private Receiver _receiver; private Binary _lastUnackedMessage; MessageListener _messageListener; private boolean _isQueueConsumer; private boolean _isTopicSubscriber; private boolean _closed = false; private String _linkName; private boolean _durable; private Collection _txnMsgs = Collections.synchronizedCollection(new ArrayList()); private Binary _lastTxnUpdate; private final List _recoverReplayMessages = new ArrayList(); private final List _replaymessages = new ArrayList(); MessageConsumerImpl(final Destination destination, final SessionImpl session, final String selector, final boolean noLocal) throws JMSException { this(destination,session,selector,noLocal,null,false); } MessageConsumerImpl(final Destination destination, final SessionImpl session, final String selector, final boolean noLocal, final String linkName, final boolean durable) throws JMSException { _selector = selector; _noLocal = noLocal; _linkName = linkName; _durable = durable; if(destination instanceof DestinationImpl) { _destination = (DestinationImpl) destination; if(destination instanceof javax.jms.Queue) { _isQueueConsumer = true; } else if(destination instanceof javax.jms.Topic) { _isTopicSubscriber = true; } if(destination instanceof TemporaryDestination) { ((TemporaryDestination)destination).addConsumer(this); } } else { throw new InvalidDestinationException("Invalid destination class " + destination.getClass().getName()); } _session = session; _receiver = createClientReceiver(); } protected Receiver createClientReceiver() throws JMSException { try { return _session.getClientSession(). createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO, _linkName, _durable, getFilters(), null); } catch (AmqpErrorException e) { Error error = e.getError(); if(AmqpError.INVALID_FIELD.equals(error.getCondition())) { throw new InvalidSelectorException(e.getMessage()); } else { throw new JMSException(e.getMessage(), error.getCondition().getValue().toString()); } } } Map getFilters() { if(_selector == null || _selector.trim().equals("")) { if(_noLocal) { return Collections.singletonMap(NO_LOCAL_FILTER_NAME, (Filter) NoLocalFilter.INSTANCE); } else { return null; } } else if(_noLocal) { Map filters = new HashMap(); filters.put(NO_LOCAL_FILTER_NAME, NoLocalFilter.INSTANCE); filters.put(JMS_SELECTOR_FILTER_NAME, new JMSSelectorFilter(_selector)); return filters; } else { return Collections.singletonMap(JMS_SELECTOR_FILTER_NAME, (Filter)new JMSSelectorFilter(_selector)); } } public String getMessageSelector() throws JMSException { checkClosed(); return _selector; } public MessageListener getMessageListener() throws IllegalStateException { checkClosed(); return _messageListener; } public void setMessageListener(final MessageListener messageListener) throws JMSException { checkClosed(); _messageListener = messageListener; _session.messageListenerSet( this ); _receiver.setMessageArrivalListener(new Receiver.MessageArrivalListener() { public void messageArrived(final Receiver receiver) { _session.messageArrived(MessageConsumerImpl.this); } }); } public MessageImpl receive() throws JMSException { checkClosed(); return receiveImpl(-1L); } public MessageImpl receive(final long timeout) throws JMSException { checkClosed(); // TODO - validate timeout > 0 return receiveImpl(timeout); } public MessageImpl receiveNoWait() throws JMSException { checkClosed(); return receiveImpl(0L); } private MessageImpl receiveImpl(long timeout) throws IllegalStateException { org.apache.qpid.amqp_1_0.client.Message msg; boolean redelivery; if(_replaymessages.isEmpty()) { msg = receive0(timeout); redelivery = false; } else { msg = _replaymessages.remove(0); redelivery = true; } if(msg != null) { preReceiveAction(msg); } return createJMSMessage(msg, redelivery); } Message receive0(final long timeout) { Message message = _receiver.receive(timeout); if(_session.getAckModeEnum() == Session.AcknowledgeMode.CLIENT_ACKNOWLEDGE) { _recoverReplayMessages.add(message); } return message; } void acknowledge(final org.apache.qpid.amqp_1_0.client.Message msg) { _receiver.acknowledge(msg.getDeliveryTag(), _session.getTxn()); } MessageImpl createJMSMessage(final Message msg, boolean redelivery) { if(msg != null) { MessageFactory factory = _session.getMessageFactory(); final MessageImpl message = factory.createMessage(_destination, msg); message.setFromQueue(_isQueueConsumer); message.setFromTopic(_isTopicSubscriber); if(redelivery) { if(!message.getJMSRedelivered()) { message.setJMSRedelivered(true); } } return message; } else { return null; } } public void close() throws JMSException { if(!_closed) { _closed = true; closeUnderlyingReceiver(_receiver); if(_destination instanceof TemporaryDestination) { ((TemporaryDestination)_destination).removeConsumer(this); } } } protected void closeUnderlyingReceiver(Receiver receiver) { receiver.close(); } private void checkClosed() throws IllegalStateException { if(_closed) { throw new javax.jms.IllegalStateException("Closed"); } } void setLastUnackedMessage(final Binary deliveryTag) { _lastUnackedMessage = deliveryTag; } void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg) { int acknowledgeMode = _session.getAckModeEnum().ordinal(); if(acknowledgeMode == Session.AUTO_ACKNOWLEDGE || acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE || acknowledgeMode == Session.SESSION_TRANSACTED) { acknowledge(msg); if(acknowledgeMode == Session.SESSION_TRANSACTED) { _txnMsgs.add(msg.getDeliveryTag()); } } else if(acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) { setLastUnackedMessage(msg.getDeliveryTag()); } } void acknowledgeAll() { if(_lastUnackedMessage != null) { Transaction txn = _session.getTxn(); _receiver.acknowledgeAll(_lastUnackedMessage, txn, null); if(txn != null) { _lastTxnUpdate = _lastUnackedMessage; } _lastUnackedMessage = null; } _recoverReplayMessages.clear(); if(!_replaymessages.isEmpty()) { _recoverReplayMessages.addAll(_replaymessages); } } void postRollback() { if(_lastTxnUpdate != null) { final Modified outcome = new Modified(); outcome.setDeliveryFailed(true); _receiver.updateAll(outcome, _lastTxnUpdate); _lastTxnUpdate = null; } for(Binary tag : _txnMsgs) { _receiver.modified(tag); } _txnMsgs.clear(); } void postCommit() { _lastTxnUpdate = null; _txnMsgs.clear(); } public DestinationImpl getDestination() throws IllegalStateException { checkClosed(); return _destination; } public SessionImpl getSession() throws IllegalStateException { checkClosed(); return _session; } public boolean getNoLocal() throws IllegalStateException { checkClosed(); return _noLocal; } public void start() { _receiver.setCredit(UnsignedInteger.valueOf(100), true); } public Queue getQueue() throws JMSException { return (Queue) getDestination(); } public Topic getTopic() throws JMSException { return (Topic) getDestination(); } void setQueueConsumer(final boolean queueConsumer) { _isQueueConsumer = queueConsumer; } void setTopicSubscriber(final boolean topicSubscriber) { _isTopicSubscriber = topicSubscriber; } String getLinkName() { return _linkName; } boolean isDurable() { return _durable; } void doRecover() { _replaymessages.clear(); if(!_recoverReplayMessages.isEmpty()) { _replaymessages.addAll(_recoverReplayMessages); for(Message msg : _replaymessages) { _session.messageArrived(this); } } } }