diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Connection.h')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 276 |
1 files changed, 276 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h new file mode 100644 index 0000000000..a0da9efbb8 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -0,0 +1,276 @@ +#ifndef QPID_CLUSTER_CONNECTION_H +#define QPID_CLUSTER_CONNECTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "OutputInterceptor.h" +#include "McastFrameHandler.h" +#include "UpdateReceiver.h" + +#include "qpid/RefCounted.h" +#include "qpid/broker/Connection.h" +#include "qpid/broker/SecureConnection.h" +#include "qpid/broker/SemanticState.h" +#include "qpid/amqp_0_10/Connection.h" +#include "qpid/sys/AtomicValue.h" +#include "qpid/sys/ConnectionInputHandler.h" +#include "qpid/sys/ConnectionOutputHandler.h" +#include "qpid/sys/SecuritySettings.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/FrameDecoder.h" + +#include <iosfwd> + +namespace qpid { + +namespace framing { class AMQFrame; } + +namespace broker { +class SemanticState; +struct QueuedMessage; +class TxBuffer; +class TxAccept; +} + +namespace cluster { +class Cluster; +class Event; +struct EventFrame; + +/** Intercept broker::Connection calls for shadow and local cluster connections. */ +class Connection : + public RefCounted, + public sys::ConnectionInputHandler, + public framing::AMQP_AllOperations::ClusterConnectionHandler, + private broker::Connection::ErrorListener + +{ + public: + + /** Local connection. */ + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, MemberId, bool catchUp, bool isLink, + const qpid::sys::SecuritySettings& external); + /** Shadow connection. */ + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id, + const qpid::sys::SecuritySettings& external); + ~Connection(); + + ConnectionId getId() const { return self; } + broker::Connection* getBrokerConnection() { return connection.get(); } + const broker::Connection* getBrokerConnection() const { return connection.get(); } + + /** Local connections may be clients or catch-up connections */ + bool isLocal() const; + + bool isLocalClient() const { return isLocal() && !isCatchUp(); } + + /** True for connections that are shadowing remote broker connections */ + bool isShadow() const; + + /** True if the connection is in "catch-up" mode: building initial broker state. */ + bool isCatchUp() const { return catchUp; } + + /** True if the connection is a completed shared update connection */ + bool isUpdated() const; + + Cluster& getCluster() { return cluster; } + + // ConnectionInputHandler methods + void received(framing::AMQFrame&); + void closed(); + bool doOutput(); + void idleOut() { if (connection.get()) connection->idleOut(); } + void idleIn() { if (connection.get()) connection->idleIn(); } + + // ConnectionCodec methods - called by IO layer with a read buffer. + size_t decode(const char* buffer, size_t size); + + // Called for data delivered from the cluster. + void deliveredFrame(const EventFrame&); + + void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position); + + // ==== Used in catch-up mode to build initial state. + // + // State update methods. + void shadowPrepare(const std::string&); + + void shadowSetUser(const std::string&); + + void sessionState(const framing::SequenceNumber& replayStart, + const framing::SequenceNumber& sendCommandPoint, + const framing::SequenceSet& sentIncomplete, + const framing::SequenceNumber& expected, + const framing::SequenceNumber& received, + const framing::SequenceSet& unknownCompleted, + const SequenceSet& receivedIncomplete); + + void outputTask(uint16_t channel, const std::string& name); + + void shadowReady(uint64_t memberId, + uint64_t connectionId, + const std::string& managementId, + const std::string& username, + const std::string& fragment, + uint32_t sendMax); + + void membership(const framing::FieldTable&, const framing::FieldTable&, + const framing::SequenceNumber& frameSeq); + + void retractOffer(); + + void deliveryRecord(const std::string& queue, + const framing::SequenceNumber& position, + const std::string& tag, + const framing::SequenceNumber& id, + bool acquired, + bool accepted, + bool cancelled, + bool completed, + bool ended, + bool windowing, + bool enqueued, + uint32_t credit); + + void queuePosition(const std::string&, const framing::SequenceNumber&); + void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count); + void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&); + void expiryId(uint64_t); + + void txStart(); + void txAccept(const framing::SequenceSet&); + void txDequeue(const std::string&); + void txEnqueue(const std::string&); + void txPublish(const framing::Array&, bool); + void txEnd(); + void accumulatedAck(const framing::SequenceSet&); + + // Encoded exchange replication. + void exchange(const std::string& encoded); + + void giveReadCredit(int credit); + void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, + bool nodict, const std::string& username, + const std::string& initFrames); + void close(); + void abort(); + void deliverClose(); + + OutputInterceptor& getOutput() { return output; } + + void addQueueListener(const std::string& queue, uint32_t listener); + void managementSetupState(uint64_t objectNum, + uint16_t bootSequence, + const framing::Uuid&, + const std::string& vendor, + const std::string& product, + const std::string& instance); + + void config(const std::string& encoded); + + void setSecureConnection ( broker::SecureConnection * sc ); + + void doCatchupIoCallbacks(); + + private: + struct NullFrameHandler : public framing::FrameHandler { + void handle(framing::AMQFrame&) {} + }; + + // Arguments to construct a broker::Connection + struct ConnectionCtor { + sys::ConnectionOutputHandler* out; + broker::Broker& broker; + std::string mgmtId; + qpid::sys::SecuritySettings external; + bool isLink; + uint64_t objectId; + bool shadow; + bool delayManagement; + + ConnectionCtor( + sys::ConnectionOutputHandler* out_, + broker::Broker& broker_, + const std::string& mgmtId_, + const qpid::sys::SecuritySettings& external_, + bool isLink_=false, + uint64_t objectId_=0, + bool shadow_=false, + bool delayManagement_=false + ) : out(out_), broker(broker_), mgmtId(mgmtId_), external(external_), + isLink(isLink_), objectId(objectId_), shadow(shadow_), + delayManagement(delayManagement_) + {} + + std::auto_ptr<broker::Connection> construct() { + return std::auto_ptr<broker::Connection>( + new broker::Connection( + out, broker, mgmtId, external, isLink, objectId, + shadow, delayManagement) + ); + } + }; + + static NullFrameHandler nullFrameHandler; + + // Error listener functions + void connectionError(const std::string&); + void sessionError(uint16_t channel, const std::string&); + + void init(); + bool checkUnsupported(const framing::AMQBody& body); + void deliverDoOutput(uint32_t limit); + + bool checkProtocolHeader(const char*& data, size_t size); + void processInitialFrames(const char*& data, size_t size); + boost::shared_ptr<broker::Queue> findQueue(const std::string& qname); + broker::SessionState& sessionState(); + broker::SemanticState& semanticState(); + broker::QueuedMessage getUpdateMessage(); + void closeUpdated(); + + Cluster& cluster; + ConnectionId self; + bool catchUp; + bool announced; + OutputInterceptor output; + framing::FrameDecoder localDecoder; + ConnectionCtor connectionCtor; + std::auto_ptr<broker::Connection> connection; + framing::SequenceNumber deliverSeq; + framing::ChannelId currentChannel; + boost::shared_ptr<broker::TxBuffer> txBuffer; + bool expectProtocolHeader; + McastFrameHandler mcastFrameHandler; + UpdateReceiver& updateIn; + qpid::broker::SecureConnection* secureConnection; + std::string initialFrames; + + static qpid::sys::AtomicValue<uint64_t> catchUpId; + + friend std::ostream& operator<<(std::ostream&, const Connection&); +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_CONNECTION_H*/ |