/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.protocol.v1_0; import java.text.MessageFormat; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SessionEventListener; import org.apache.qpid.amqp_1_0.type.*; import org.apache.qpid.amqp_1_0.type.messaging.*; import org.apache.qpid.amqp_1_0.type.messaging.Source; import org.apache.qpid.amqp_1_0.type.messaging.Target; import org.apache.qpid.amqp_1_0.type.transaction.Coordinator; import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability; import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.transport.ServerConnection; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.*; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject { private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); private IApplicationRegistry _appRegistry; private VirtualHost _vhost; private AutoCommitTransaction _transaction; private final LinkedHashMap _openTransactions = new LinkedHashMap(); private final Connection_1_0 _connection; private UUID _id = UUID.randomUUID(); public Session_1_0(VirtualHost vhost, IApplicationRegistry appRegistry, final Connection_1_0 connection) { _appRegistry = appRegistry; _vhost = vhost; _transaction = new AutoCommitTransaction(vhost.getMessageStore()); _connection = connection; } public void remoteLinkCreation(final LinkEndpoint endpoint) { Destination destination; Link_1_0 link = null; Error error = null; final LinkRegistry linkRegistry = _vhost.getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId()); if(endpoint.getRole() == Role.SENDER) { SendingLink_1_0 previousLink = linkRegistry.getDurableSendingLink(endpoint.getName()); if(previousLink == null) { Target target = (Target) endpoint.getTarget(); Source source = (Source) endpoint.getSource(); if(source != null) { if(Boolean.TRUE.equals(source.getDynamic())) { AMQQueue tempQueue = createTemporaryQueue(source.getDynamicNodeProperties()); source.setAddress(tempQueue.getName()); } String addr = source.getAddress(); AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr); if(queue != null) { destination = new QueueDestination(queue); } else { Exchange exchg = _vhost.getExchangeRegistry().getExchange(addr); if(exchg != null) { destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy()); } else { endpoint.setSource(null); destination = null; } } } else { destination = null; } if(destination != null) { final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint; try { final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint), _vhost, (SendingDestination) destination ); sendingLinkEndpoint.setLinkEventListener(sendingLink); link = sendingLink; if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())) { linkRegistry.registerSendingLink(endpoint.getName(), sendingLink); } } catch(AmqpErrorException e) { e.printStackTrace(); destination = null; sendingLinkEndpoint.setSource(null); error = e.getError(); } } } else { Source newSource = (Source) endpoint.getSource(); Source oldSource = (Source) previousLink.getEndpoint().getSource(); final TerminusDurability newSourceDurable = newSource == null ? null : newSource.getDurable(); if(newSourceDurable != null) { oldSource.setDurable(newSourceDurable); if(newSourceDurable.equals(TerminusDurability.NONE)) { linkRegistry.unregisterSendingLink(endpoint.getName()); } } endpoint.setSource(oldSource); SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint; previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint)); sendingLinkEndpoint.setLinkEventListener(previousLink); link = previousLink; endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap()); } } else { if(endpoint.getTarget() instanceof Coordinator) { Coordinator coordinator = (Coordinator) endpoint.getTarget(); TxnCapability[] capabilities = coordinator.getCapabilities(); boolean localTxn = false; boolean multiplePerSession = false; if(capabilities != null) { for(TxnCapability capability : capabilities) { if(capability.equals(TxnCapability.LOCAL_TXN)) { localTxn = true; } else if(capability.equals(TxnCapability.MULTI_TXNS_PER_SSN)) { multiplePerSession = true; } else { error = new Error(); error.setCondition(AmqpError.NOT_IMPLEMENTED); error.setDescription("Unsupported capability: " + capability); break; } } } /* if(!localTxn) { capabilities.add(TxnCapabilities.LOCAL_TXN); }*/ final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; final TxnCoordinatorLink_1_0 coordinatorLink = new TxnCoordinatorLink_1_0(_vhost, this, receivingLinkEndpoint, _openTransactions); receivingLinkEndpoint.setLinkEventListener(coordinatorLink); link = coordinatorLink; } else { ReceivingLink_1_0 previousLink = linkRegistry.getDurableReceivingLink(endpoint.getName()); if(previousLink == null) { Target target = (Target) endpoint.getTarget(); if(target != null) { if(Boolean.TRUE.equals(target.getDynamic())) { AMQQueue tempQueue = createTemporaryQueue(target.getDynamicNodeProperties()); target.setAddress(tempQueue.getName()); } String addr = target.getAddress(); Exchange exchg = _vhost.getExchangeRegistry().getExchange(addr); if(exchg != null) { destination = new ExchangeDestination(exchg, target.getDurable(), target.getExpiryPolicy()); } else { AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr); if(queue != null) { destination = new QueueDestination(queue); } else { endpoint.setTarget(null); destination = null; } } } else { destination = null; } if(destination != null) { final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint), _vhost, (ReceivingDestination) destination); receivingLinkEndpoint.setLinkEventListener(receivingLink); link = receivingLink; if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())) { linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink); } } } else { ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint)); receivingLinkEndpoint.setLinkEventListener(previousLink); link = previousLink; endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap()); } } } endpoint.attach(); if(link == null) { if(error == null) { error = new Error(); error.setCondition(AmqpError.NOT_FOUND); } endpoint.detach(error); } else { link.start(); } } private AMQQueue createTemporaryQueue(Map properties) { final String queueName = UUID.randomUUID().toString(); AMQQueue queue = null; try { LifetimePolicy lifetimePolicy = properties == null ? null : (LifetimePolicy) properties.get(LIFETIME_POLICY); final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()), queueName, false, // durable null, // owner false, // autodelete false, // exclusive _vhost, properties); if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose) { final Connection_1_0.Task deleteQueueTask = new Connection_1_0.Task() { public void doTask(Connection_1_0 session) { if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue) { try { tempQueue.delete(); } catch (AMQException e) { e.printStackTrace(); //TODO. } } } }; _connection.addConnectionCloseTask(deleteQueueTask); queue.addQueueDeleteTask(new AMQQueue.Task() { public void doTask(AMQQueue queue) { _connection.removeConnectionCloseTask(deleteQueueTask); } }); } else if(lifetimePolicy instanceof DeleteOnNoLinks) { } else if(lifetimePolicy instanceof DeleteOnNoMessages) { } else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages) { } } catch (AMQSecurityException e) { e.printStackTrace(); //TODO. } catch (AMQException e) { e.printStackTrace(); //TODO } return queue; } public ServerTransaction getTransaction(Binary transactionId) { // TODO should treat invalid id differently to null ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId)); return transaction == null ? _transaction : transaction; } public void remoteEnd(End end) { Iterator> iter = _openTransactions.entrySet().iterator(); while(iter.hasNext()) { Map.Entry entry = iter.next(); entry.getValue().rollback(); iter.remove(); } _connection.sessionEnded(this); } Integer binaryToInteger(final Binary txnId) { if(txnId == null) { return null; } if(txnId.getLength() > 4) throw new IllegalArgumentException(); int id = 0; byte[] data = txnId.getArray(); for(int i = 0; i < txnId.getLength(); i++) { id <<= 8; id += data[i+txnId.getArrayOffset()]; } return id; } Binary integerToBinary(final int txnId) { byte[] data = new byte[4]; data[3] = (byte) (txnId & 0xff); data[2] = (byte) ((txnId & 0xff00) >> 8); data[1] = (byte) ((txnId & 0xff0000) >> 16); data[0] = (byte) ((txnId & 0xff000000) >> 24); return new Binary(data); } public void forceEnd() { } @Override public UUID getQMFId() { return _id; } @Override public AMQConnectionModel getConnectionModel() { return _connection.getModel(); } @Override public String getClientID() { // TODO return ""; } @Override public void close() throws AMQException { // TODO - required for AMQSessionModel / management initiated closing } @Override public LogSubject getLogSubject() { return this; } @Override public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException { // TODO - required for AMQSessionModel / long running transaction detection } @Override public void block(AMQQueue queue) { // TODO - required for AMQSessionModel / producer side flow control } @Override public void unblock(AMQQueue queue) { // TODO - required for AMQSessionModel / producer side flow control } @Override public void block() { // TODO - required for AMQSessionModel / producer side flow control } @Override public void unblock() { // TODO - required for AMQSessionModel / producer side flow control } @Override public boolean getBlocking() { // TODO return false; } @Override public boolean onSameConnection(InboundMessage inbound) { // TODO return false; } @Override public int getUnacknowledgedMessageCount() { // TODO return 0; } @Override public Long getTxnCount() { // TODO return 0l; } @Override public Long getTxnStart() { // TODO return 0l; } @Override public Long getTxnCommits() { // TODO return 0l; } @Override public Long getTxnRejects() { // TODO return 0l; } @Override public int getChannelId() { // TODO return 0; } @Override public int getConsumerCount() { // TODO return 0; } @Override public int compareTo(AMQSessionModel o) { return getQMFId().compareTo(o.getQMFId()); } public String toLogString() { long connectionId = getConnectionModel().getConnectionId(); String remoteAddress = getConnectionModel().getRemoteAddressString(); return "[" + MessageFormat.format(CHANNEL_FORMAT, connectionId, getClientID(), remoteAddress, _vhost.getName(), // TODO - virtual host 0) // TODO - channel) + "] "; } }