/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.protocol.v1_0; import java.text.MessageFormat; import org.apache.log4j.Logger; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; import org.apache.qpid.amqp_1_0.transport.SessionEventListener; import org.apache.qpid.amqp_1_0.type.*; import org.apache.qpid.amqp_1_0.type.LifetimePolicy; import org.apache.qpid.amqp_1_0.type.messaging.*; import org.apache.qpid.amqp_1_0.type.messaging.Source; import org.apache.qpid.amqp_1_0.type.messaging.Target; import org.apache.qpid.amqp_1_0.type.transaction.Coordinator; import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability; import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.server.model.*; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.QueueExistsException; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject { private static final Logger _logger = Logger.getLogger(Session_1_0.class); private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); private final SessionEndpoint _endpoint; private VirtualHost _vhost; private AutoCommitTransaction _transaction; private final LinkedHashMap _openTransactions = new LinkedHashMap(); private final CopyOnWriteArrayList> _taskList = new CopyOnWriteArrayList>(); private final Connection_1_0 _connection; private UUID _id = UUID.randomUUID(); private AtomicBoolean _closed = new AtomicBoolean(); public Session_1_0(VirtualHost vhost, final Connection_1_0 connection, final SessionEndpoint endpoint) { _vhost = vhost; _endpoint = endpoint; _transaction = new AutoCommitTransaction(vhost.getMessageStore()); _connection = connection; } public void remoteLinkCreation(final LinkEndpoint endpoint) { Destination destination; Link_1_0 link = null; Error error = null; final LinkRegistry linkRegistry = _vhost.getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId()); if(endpoint.getRole() == Role.SENDER) { SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName()); if(previousLink == null) { Target target = (Target) endpoint.getTarget(); Source source = (Source) endpoint.getSource(); if(source != null) { if(Boolean.TRUE.equals(source.getDynamic())) { AMQQueue tempQueue = createTemporaryQueue(source.getDynamicNodeProperties()); source.setAddress(tempQueue.getName()); } String addr = source.getAddress(); MessageSource queue = _vhost.getMessageSource(addr); if(queue != null) { destination = new MessageSourceDestination(queue); } else { Exchange exchg = _vhost.getExchange(addr); if(exchg != null) { destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy()); } else { endpoint.setSource(null); destination = null; } } } else { destination = null; } if(destination != null) { final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint; try { final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint), _vhost, (SendingDestination) destination ); sendingLinkEndpoint.setLinkEventListener(sendingLink); link = sendingLink; if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())) { linkRegistry.registerSendingLink(endpoint.getName(), sendingLink); } } catch(AmqpErrorException e) { _logger.error("Error creating sending link", e); destination = null; sendingLinkEndpoint.setSource(null); error = e.getError(); } } } else { Source newSource = (Source) endpoint.getSource(); Source oldSource = (Source) previousLink.getEndpoint().getSource(); final TerminusDurability newSourceDurable = newSource == null ? null : newSource.getDurable(); if(newSourceDurable != null) { oldSource.setDurable(newSourceDurable); if(newSourceDurable.equals(TerminusDurability.NONE)) { linkRegistry.unregisterSendingLink(endpoint.getName()); } } endpoint.setSource(oldSource); SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint; previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint)); sendingLinkEndpoint.setLinkEventListener(previousLink); link = previousLink; endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap()); } } else { if(endpoint.getTarget() instanceof Coordinator) { Coordinator coordinator = (Coordinator) endpoint.getTarget(); TxnCapability[] capabilities = coordinator.getCapabilities(); boolean localTxn = false; boolean multiplePerSession = false; if(capabilities != null) { for(TxnCapability capability : capabilities) { if(capability.equals(TxnCapability.LOCAL_TXN)) { localTxn = true; } else if(capability.equals(TxnCapability.MULTI_TXNS_PER_SSN)) { multiplePerSession = true; } else { error = new Error(); error.setCondition(AmqpError.NOT_IMPLEMENTED); error.setDescription("Unsupported capability: " + capability); break; } } } /* if(!localTxn) { capabilities.add(TxnCapabilities.LOCAL_TXN); }*/ final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; final TxnCoordinatorLink_1_0 coordinatorLink = new TxnCoordinatorLink_1_0(_vhost, this, receivingLinkEndpoint, _openTransactions); receivingLinkEndpoint.setLinkEventListener(coordinatorLink); link = coordinatorLink; } else { ReceivingLink_1_0 previousLink = (ReceivingLink_1_0) linkRegistry.getDurableReceivingLink(endpoint.getName()); if(previousLink == null) { Target target = (Target) endpoint.getTarget(); if(target != null) { if(Boolean.TRUE.equals(target.getDynamic())) { AMQQueue tempQueue = createTemporaryQueue(target.getDynamicNodeProperties()); target.setAddress(tempQueue.getName()); } String addr = target.getAddress(); MessageDestination messageDestination = _vhost.getMessageDestination(addr); if(messageDestination != null) { destination = new NodeReceivingDestination(messageDestination, target.getDurable(), target.getExpiryPolicy()); } else { AMQQueue queue = _vhost.getQueue(addr); if(queue != null) { destination = new QueueDestination(queue); } else { endpoint.setTarget(null); destination = null; } } } else { destination = null; } if(destination != null) { final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint), _vhost, (ReceivingDestination) destination); receivingLinkEndpoint.setLinkEventListener(receivingLink); link = receivingLink; if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())) { linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink); } } } else { ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint)); receivingLinkEndpoint.setLinkEventListener(previousLink); link = previousLink; endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap()); } } } endpoint.attach(); if(link == null) { if(error == null) { error = new Error(); error.setCondition(AmqpError.NOT_FOUND); } endpoint.detach(error); } else { link.start(); } } private AMQQueue createTemporaryQueue(Map properties) { final String queueName = UUID.randomUUID().toString(); AMQQueue queue = null; try { LifetimePolicy lifetimePolicy = properties == null ? null : (LifetimePolicy) properties.get(LIFETIME_POLICY); Map attributes = new HashMap(); attributes.put(org.apache.qpid.server.model.Queue.ID, UUIDGenerator.generateQueueUUID(queueName, _vhost.getName())); attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName); attributes.put(org.apache.qpid.server.model.Queue.DURABLE, false); if(lifetimePolicy instanceof DeleteOnNoLinks) { attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_LINKS); } else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages) { attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, org.apache.qpid.server.model.LifetimePolicy.IN_USE); } else if(lifetimePolicy instanceof DeleteOnClose) { attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE); } else if(lifetimePolicy instanceof DeleteOnNoMessages) { attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, org.apache.qpid.server.model.LifetimePolicy.IN_USE); } else { attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE); } // TODO convert AMQP 1-0 node properties to queue attributes final AMQQueue tempQueue = queue = _vhost.createQueue(this, attributes ); } catch (QpidSecurityException e) { //TODO _logger.info("Security error", e); throw new ConnectionScopedRuntimeException(e); } catch (QueueExistsException e) { _logger.error("A temporary queue was created with a name which collided with an existing queue name"); throw new ConnectionScopedRuntimeException(e); } return queue; } public ServerTransaction getTransaction(Binary transactionId) { // TODO should treat invalid id differently to null ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId)); return transaction == null ? _transaction : transaction; } public void remoteEnd(End end) { Iterator> iter = _openTransactions.entrySet().iterator(); while(iter.hasNext()) { Map.Entry entry = iter.next(); entry.getValue().rollback(); iter.remove(); } _connection.sessionEnded(this); } Integer binaryToInteger(final Binary txnId) { if(txnId == null) { return null; } if(txnId.getLength() > 4) throw new IllegalArgumentException(); int id = 0; byte[] data = txnId.getArray(); for(int i = 0; i < txnId.getLength(); i++) { id <<= 8; id += data[i+txnId.getArrayOffset()]; } return id; } Binary integerToBinary(final int txnId) { byte[] data = new byte[4]; data[3] = (byte) (txnId & 0xff); data[2] = (byte) ((txnId & 0xff00) >> 8); data[1] = (byte) ((txnId & 0xff0000) >> 16); data[0] = (byte) ((txnId & 0xff000000) >> 24); return new Binary(data); } @Override public UUID getId() { return _id; } @Override public Connection_1_0 getConnectionModel() { return _connection; } @Override public String getClientID() { // TODO return ""; } @Override public void close() { performCloseTasks(); _endpoint.end(); } protected void performCloseTasks() { if(_closed.compareAndSet(false, true)) { List> taskList = new ArrayList>(_taskList); _taskList.clear(); for(Action task : taskList) { task.performAction(this); } } } @Override public void close(AMQConstant cause, String message) { performCloseTasks(); final End end = new End(); final Error theError = new Error(); theError.setDescription(message); theError.setCondition(ConnectionError.CONNECTION_FORCED); end.setError(theError); _endpoint.end(end); } @Override public LogSubject getLogSubject() { return this; } @Override public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) { // TODO - required for AMQSessionModel / long running transaction detection } @Override public void block(AMQQueue queue) { // TODO - required for AMQSessionModel / producer side flow control } @Override public void unblock(AMQQueue queue) { // TODO - required for AMQSessionModel / producer side flow control } @Override public void block() { // TODO - required for AMQSessionModel / producer side flow control } @Override public void unblock() { // TODO - required for AMQSessionModel / producer side flow control } @Override public boolean getBlocking() { // TODO return false; } @Override public Object getConnectionReference() { return getConnection().getReference(); } @Override public int getUnacknowledgedMessageCount() { // TODO return 0; } @Override public Long getTxnCount() { // TODO return 0l; } @Override public Long getTxnStart() { // TODO return 0l; } @Override public Long getTxnCommits() { // TODO return 0l; } @Override public Long getTxnRejects() { // TODO return 0l; } @Override public int getChannelId() { return _endpoint.getSendingChannel(); } @Override public int getConsumerCount() { // TODO return 0; } public String toLogString() { long connectionId = getConnectionModel().getConnectionId(); String remoteAddress = getConnectionModel().getRemoteAddressString(); return "[" + MessageFormat.format(CHANNEL_FORMAT, connectionId, getClientID(), remoteAddress, _vhost.getName(), _endpoint.getSendingChannel()) + "] "; } @Override public int compareTo(Session_1_0 o) { return getId().compareTo(o.getId()); } public Connection_1_0 getConnection() { return _connection; } @Override public void addDeleteTask(final Action task) { if(!_closed.get()) { _taskList.add(task); } } @Override public void removeDeleteTask(final Action task) { _taskList.remove(task); } }