diff options
author | Alan Conway <aconway@apache.org> | 2007-01-16 20:17:50 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-01-16 20:17:50 +0000 |
commit | bc84e62cc549ac2d751a45d61a867354c84c60d6 (patch) | |
tree | 160824086ea1edfd2d28f153626d378d69d0f516 /cpp/lib/broker/Connection.cpp | |
parent | 0df54842626c3cc065cad1a2595458f54253a178 (diff) | |
download | qpid-python-bc84e62cc549ac2d751a45d61a867354c84c60d6.tar.gz |
* Renamed Session* classes to Connection* to align with AMQP spec
- broker::SessionHandlerImpl -> broker::Connection
- broker::SessionHandlerImplFactory -> broker::ConnectionFactory
- sys::SessionHandler -> ConnectionInputHandler
- sys::SessionHandlerFactory -> ConnectionInputHandlerFactory
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496848 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/Connection.cpp')
-rw-r--r-- | cpp/lib/broker/Connection.cpp | 700 |
1 files changed, 700 insertions, 0 deletions
diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp new file mode 100644 index 0000000000..c391ff6db5 --- /dev/null +++ b/cpp/lib/broker/Connection.cpp @@ -0,0 +1,700 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <assert.h> + +#include "Connection.h" + +#include "FanOutExchange.h" +#include "HeadersExchange.h" + +#include "Requester.h" +#include "Responder.h" + +using namespace boost; +using namespace qpid::sys; +using namespace qpid::framing; +using namespace qpid::sys; + +namespace qpid { +namespace broker { + +Connection::Connection( + SessionContext* _context, Broker& broker) : + + context(_context), + client(0), + queues(broker.getQueues()), + exchanges(broker.getExchanges()), + cleaner(broker.getCleaner()), + settings(broker.getTimeout(), broker.getStagingThreshold()), + requester(broker.getRequester()), + responder(broker.getResponder()), + basicHandler(new BasicHandlerImpl(this)), + channelHandler(new ChannelHandlerImpl(this)), + connectionHandler(new ConnectionHandlerImpl(this)), + exchangeHandler(new ExchangeHandlerImpl(this)), + queueHandler(new QueueHandlerImpl(this)), + txHandler(new TxHandlerImpl(this)), + messageHandler(new MessageHandlerImpl(this)), + framemax(65536), + heartbeat(0) +{} + +Connection::~Connection(){ + + if (client != NULL) + delete client; + +} + +Channel* Connection::getChannel(u_int16_t channel){ + channel_iterator i = channels.find(channel); + if(i == channels.end()){ + throw ConnectionException(504, "Unknown channel: " + channel); + } + return i->second; +} + +Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){ + Queue::shared_ptr queue; + if (name.empty()) { + queue = getChannel(channel)->getDefaultQueue(); + if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); + } else { + queue = queues.find(name); + if (queue == 0) { + throw ChannelException( 404, "Queue not found: " + name); + } + } + return queue; +} + + +Exchange::shared_ptr Connection::findExchange(const string& name){ + return exchanges.get(name); +} + +void Connection::handleMethod( + u_int16_t channel, qpid::framing::AMQBody::shared_ptr body) +{ + AMQMethodBody::shared_ptr method = + shared_polymorphic_cast<AMQMethodBody, AMQBody>(body); + try{ + method->invoke(*this, channel); + }catch(ChannelException& e){ + channels[channel]->close(); + channels.erase(channel); + client->getChannel().close( + channel, e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + }catch(ConnectionException& e){ + client->getConnection().close( + 0, e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + }catch(std::exception& e){ + client->getConnection().close( + 0, 541/*internal error*/, e.what(), + method->amqpClassId(), method->amqpMethodId()); + } +} + +void Connection::received(qpid::framing::AMQFrame* frame){ + u_int16_t channel = frame->getChannel(); + AMQBody::shared_ptr body = frame->getBody(); + switch(body->type()) + { + case REQUEST_BODY: + responder.received(AMQRequestBody::getData(body)); + handleMethod(channel, body); + break; + case RESPONSE_BODY: + // Must process responses before marking them received. + handleMethod(channel, body); + requester.processed(AMQResponseBody::getData(body)); + break; + // TODO aconway 2007-01-15: Leftover from 0-8 support, remove. + case METHOD_BODY: + handleMethod(channel, body); + break; + case HEADER_BODY: + handleHeader( + channel, shared_polymorphic_cast<AMQHeaderBody>(body)); + break; + + case CONTENT_BODY: + handleContent( + channel, shared_polymorphic_cast<AMQContentBody>(body)); + break; + + case HEARTBEAT_BODY: + assert(channel == 0); + handleHeartbeat( + shared_polymorphic_cast<AMQHeartbeatBody>(body)); + break; + } +} + +/** + * An OutputHandler that does request/response procssing before + * delgating to another OutputHandler. + */ +Connection::Sender::Sender( + OutputHandler& oh, Requester& req, Responder& resp) + : out(oh), requester(req), responder(resp) +{} + +void Connection::Sender::send(AMQFrame* frame) { + AMQBody::shared_ptr body = frame->getBody(); + u_int16_t type = body->type(); + if (type == REQUEST_BODY) + requester.sending(AMQRequestBody::getData(body)); + else if (type == RESPONSE_BODY) + responder.sending(AMQResponseBody::getData(body)); + out.send(frame); +} + +void Connection::initiated(qpid::framing::ProtocolInitiation* header){ + + if (client == 0) + { + client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor()); + + + std::cout << "---------------" << this << std::endl; + + //send connection start + FieldTable properties; + string mechanisms("PLAIN"); + string locales("en_US"); // channel, majour, minor + client->getConnection().start(0, header->getMajor(), header->getMinor(), properties, mechanisms, locales); + } +} + + +void Connection::idleOut(){ + +} + +void Connection::idleIn(){ + +} + +void Connection::closed(){ + try { + for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){ + Channel* c = i->second; + channels.erase(i); + c->close(); + delete c; + } + for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){ + string name = (*i)->getName(); + queues.destroy(name); + exclusiveQueues.erase(i); + } + } catch(std::exception& e) { + std::cout << "Caught unhandled exception while closing session: " << e.what() << std::endl; + } +} + +void Connection::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ + getChannel(channel)->handleHeader(body); +} + +void Connection::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ + getChannel(channel)->handleContent(body); +} + +void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ + std::cout << "Connection::handleHeartbeat()" << std::endl; +} + +void Connection::ConnectionHandlerImpl::startOk( + u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, + const string& /*response*/, const string& /*locale*/){ + parent->client->getConnection().tune(0, 100, parent->framemax, parent->heartbeat); +} + +void Connection::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){} + +void Connection::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ + parent->framemax = framemax; + parent->heartbeat = heartbeat; +} + +void Connection::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ + string knownhosts; + parent->client->getConnection().openOk(0, knownhosts); +} + +void Connection::ConnectionHandlerImpl::close( + u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, + u_int16_t /*classId*/, u_int16_t /*methodId*/) +{ + parent->client->getConnection().closeOk(0); + parent->context->close(); +} + +void Connection::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ + parent->context->close(); +} + + + +void Connection::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){ + + + parent->channels[channel] = new Channel( + parent->client->getProtocolVersion() , parent->context, channel, + parent->framemax, parent->queues.getStore(), + parent->settings.stagingThreshold); + + // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9 + parent->client->getChannel().openOk(channel, std::string()/* ID */); +} + +void Connection::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} +void Connection::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} + +void Connection::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, + u_int16_t /*classId*/, u_int16_t /*methodId*/){ + Channel* c = parent->getChannel(channel); + if(c){ + parent->channels.erase(channel); + c->close(); + delete c; + parent->client->getChannel().closeOk(channel); + } +} + +void Connection::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} + + + +void Connection::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, + bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, + const FieldTable& /*arguments*/){ + + if(passive){ + if(!parent->exchanges.get(exchange)){ + throw ChannelException(404, "Exchange not found: " + exchange); + } + }else{ + try{ + std::pair<Exchange::shared_ptr, bool> response = parent->exchanges.declare(exchange, type); + if(!response.second && response.first->getType() != type){ + throw ConnectionException(507, "Exchange already declared to be of type " + + response.first->getType() + ", requested " + type); + } + }catch(UnknownExchangeTypeException& e){ + throw ConnectionException(503, "Exchange type not implemented: " + type); + } + } + if(!nowait){ + parent->client->getExchange().declareOk(channel); + } +} + + +void Connection::ExchangeHandlerImpl::unbind( + u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*exchange*/, + const string& /*routingKey*/, + const qpid::framing::FieldTable& /*arguments*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + + + +void Connection::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, + const string& exchange, bool /*ifUnused*/, bool nowait){ + + //TODO: implement unused + parent->exchanges.destroy(exchange); + if(!nowait) parent->client->getExchange().deleteOk(channel); +} + +void Connection::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ + Queue::shared_ptr queue; + if (passive && !name.empty()) { + queue = parent->getQueue(name, channel); + } else { + std::pair<Queue::shared_ptr, bool> queue_created = + parent->queues.declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0); + queue = queue_created.first; + assert(queue); + if (queue_created.second) { // This is a new queue + parent->getChannel(channel)->setDefaultQueue(queue); + + //apply settings & create persistent record if required + queue_created.first->create(arguments); + + //add default binding: + parent->exchanges.getDefault()->bind(queue, name, 0); + if (exclusive) { + parent->exclusiveQueues.push_back(queue); + } else if(autoDelete){ + parent->cleaner.add(queue); + } + } + } + if (exclusive && !queue->isExclusiveOwner(parent)) { + throw ChannelException(405, "Cannot grant exclusive access to queue"); + } + if (!nowait) { + string queueName = queue->getName(); + parent->client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount()); + } +} + +void Connection::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, + const string& exchangeName, const string& routingKey, bool nowait, + const FieldTable& arguments){ + + Queue::shared_ptr queue = parent->getQueue(queueName, channel); + Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName); + if(exchange){ + // kpvdr - cannot use this any longer as routingKey is now const + // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); + // exchange->bind(queue, routingKey, &arguments); + string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; + exchange->bind(queue, exchangeRoutingKey, &arguments); + if(!nowait) parent->client->getQueue().bindOk(channel); + }else{ + throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); + } +} + +void Connection::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ + + Queue::shared_ptr queue = parent->getQueue(queueName, channel); + int count = queue->purge(); + if(!nowait) parent->client->getQueue().purgeOk(channel, count); +} + +void Connection::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, + bool ifUnused, bool ifEmpty, bool nowait){ + ChannelException error(0, ""); + int count(0); + Queue::shared_ptr q = parent->getQueue(queue, channel); + if(ifEmpty && q->getMessageCount() > 0){ + throw ChannelException(406, "Queue not empty."); + }else if(ifUnused && q->getConsumerCount() > 0){ + throw ChannelException(406, "Queue in use."); + }else{ + //remove the queue from the list of exclusive queues if necessary + if(q->isExclusiveOwner(parent)){ + queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q); + if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i); + } + count = q->getMessageCount(); + q->destroy(); + parent->queues.destroy(queue); + } + + if(!nowait) parent->client->getQueue().deleteOk(channel, count); +} + + + + +void Connection::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ + //TODO: handle global + parent->getChannel(channel)->setPrefetchSize(prefetchSize); + parent->getChannel(channel)->setPrefetchCount(prefetchCount); + parent->client->getBasic().qosOk(channel); +} + +void Connection::BasicHandlerImpl::consume( + u_int16_t channelId, u_int16_t /*ticket*/, + const string& queueName, const string& consumerTag, + bool noLocal, bool noAck, bool exclusive, + bool nowait, const FieldTable& fields) +{ + + Queue::shared_ptr queue = parent->getQueue(queueName, channelId); + Channel* channel = parent->channels[channelId]; + if(!consumerTag.empty() && channel->exists(consumerTag)){ + throw ConnectionException(530, "Consumer tags must be unique"); + } + + try{ + string newTag = consumerTag; + channel->consume( + newTag, queue, !noAck, exclusive, noLocal ? parent : 0, &fields); + + if(!nowait) parent->client->getBasic().consumeOk(channelId, newTag); + + //allow messages to be dispatched if required as there is now a consumer: + queue->dispatch(); + }catch(ExclusiveAccessException& e){ + if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); + else throw ChannelException(403, "Access would violate previously granted exclusivity"); + } + +} + +void Connection::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){ + parent->getChannel(channel)->cancel(consumerTag); + + if(!nowait) parent->client->getBasic().cancelOk(channel, consumerTag); +} + +void Connection::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, + const string& exchangeName, const string& routingKey, + bool mandatory, bool immediate){ + + Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges.getDefault() : parent->exchanges.get(exchangeName); + if(exchange){ + Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate); + parent->getChannel(channel)->handlePublish(msg, exchange); + }else{ + throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); + } +} + +void Connection::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){ + Queue::shared_ptr queue = parent->getQueue(queueName, channelId); + if(!parent->getChannel(channelId)->get(queue, !noAck)){ + string clusterId;//not used, part of an imatix hack + + parent->client->getBasic().getEmpty(channelId, clusterId); + } +} + +void Connection::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ + try{ + parent->getChannel(channel)->ack(deliveryTag, multiple); + }catch(InvalidAckException& e){ + throw ConnectionException(530, "Received ack for unrecognised delivery tag"); + } +} + +void Connection::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} + +void Connection::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ + parent->getChannel(channel)->recover(requeue); +} + +void Connection::TxHandlerImpl::select(u_int16_t channel){ + parent->getChannel(channel)->begin(); + parent->client->getTx().selectOk(channel); +} + +void Connection::TxHandlerImpl::commit(u_int16_t channel){ + parent->getChannel(channel)->commit(); + parent->client->getTx().commitOk(channel); +} + +void Connection::TxHandlerImpl::rollback(u_int16_t channel){ + + parent->getChannel(channel)->rollback(); + parent->client->getTx().rollbackOk(channel); + parent->getChannel(channel)->recover(false); +} + +void +Connection::QueueHandlerImpl::unbind( + u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*exchange*/, + const string& /*routingKey*/, + const qpid::framing::FieldTable& /*arguments*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + +void +Connection::ChannelHandlerImpl::ok( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + +void +Connection::ChannelHandlerImpl::ping( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + +void +Connection::ChannelHandlerImpl::pong( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + +void +Connection::ChannelHandlerImpl::resume( + u_int16_t /*channel*/, + const string& /*channelId*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + +// Message class method handlers +void +Connection::MessageHandlerImpl::append( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*bytes*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + + +void +Connection::MessageHandlerImpl::cancel( u_int16_t /*channel*/, + const string& /*destination*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*identifier*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::close( u_int16_t /*channel*/, + const string& /*reference*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::consume( u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*destination*/, + bool /*noLocal*/, + bool /*noAck*/, + bool /*exclusive*/, + const qpid::framing::FieldTable& /*filter*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::get( u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*destination*/, + bool /*noAck*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::offset( u_int16_t /*channel*/, + u_int64_t /*value*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::open( u_int16_t /*channel*/, + const string& /*reference*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::qos( u_int16_t /*channel*/, + u_int32_t /*prefetchSize*/, + u_int16_t /*prefetchCount*/, + bool /*global*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::recover( u_int16_t /*channel*/, + bool /*requeue*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::reject( u_int16_t /*channel*/, + u_int16_t /*code*/, + const string& /*text*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::resume( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*identifier*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::transfer( u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*destination*/, + bool /*redelivered*/, + bool /*immediate*/, + u_int64_t /*ttl*/, + u_int8_t /*priority*/, + u_int64_t /*timestamp*/, + u_int8_t /*deliveryMode*/, + u_int64_t /*expiration*/, + const string& /*exchange*/, + const string& /*routingKey*/, + const string& /*messageId*/, + const string& /*correlationId*/, + const string& /*replyTo*/, + const string& /*contentType*/, + const string& /*contentEncoding*/, + const string& /*userId*/, + const string& /*appId*/, + const string& /*transactionId*/, + const string& /*securityToken*/, + const qpid::framing::FieldTable& /*applicationHeaders*/, + qpid::framing::Content /*body*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +}} + |