diff options
author | Gordon Sim <gsim@apache.org> | 2008-02-01 18:27:23 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-02-01 18:27:23 +0000 |
commit | eea6348014bd53b882d1c6247580e6c6a07a1abd (patch) | |
tree | d33fed595f804c5f76d52f9c965d5c28000af37b /cpp | |
parent | 5fbca6a4f6e471596782d2fb82678740500b1aa0 (diff) | |
download | qpid-python-eea6348014bd53b882d1c6247580e6c6a07a1abd.tar.gz |
Missed a couple of new files in previous commit.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@617594 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 98 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Bridge.h | 64 |
2 files changed, 162 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp new file mode 100644 index 0000000000..6c1d8e7ca5 --- /dev/null +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Bridge.h" +#include "Connection.h" + +#include "qpid/management/ManagementAgent.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/Uuid.h" + +using qpid::framing::FieldTable; +using qpid::framing::Uuid; + +namespace qpid { +namespace broker { + +Bridge::Bridge(framing::ChannelId id, Connection& c, CancellationListener l, const management::ArgsLinkBridge& _args) : + args(_args), channel(id, &(c.getOutput())), peer(channel), + mgmtObject(new management::Bridge(this, &c, id, args.i_src, args.i_dest, args.i_key, args.i_src_is_queue, args.i_src_is_local)), + connection(c), listener(l) +{ + management::ManagementAgent::getAgent()->addObject(mgmtObject); +} + +Bridge::~Bridge() +{ + mgmtObject->resourceDestroy(); +} + +void Bridge::create() +{ + framing::AMQP_ServerProxy::Session session(channel); + session.open(0); + + //peer.getSession().open(0); + + if (args.i_src_is_local) { + //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received() + } else { + if (args.i_src_is_queue) { + peer.getMessage().subscribe(0, args.i_src, args.i_dest, false, 0, 0, false, FieldTable()); + peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + } else { + string queue = "bridge_queue_"; + queue += Uuid(true).str(); + peer.getQueue().declare(0, queue, "", false, false, true, true, FieldTable()); + peer.getQueue().bind(0, queue, args.i_dest, args.i_key, FieldTable()); + peer.getMessage().subscribe(0, queue, args.i_dest, false, 0, 0, false, FieldTable()); + peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + } + } + +} + +void Bridge::cancel() +{ + peer.getMessage().cancel(args.i_dest); + peer.getSession().close(); +} + +management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const +{ + return dynamic_pointer_cast<management::ManagementObject>(mgmtObject); +} + +management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, management::Args& /*args*/) +{ + if (methodId == management::Bridge::METHOD_CLOSE) { + //notify that we are closed + listener(this); + //request time on the connections io thread + connection.getOutput().activateOutput(); + return management::Manageable::STATUS_OK; + } else { + return management::Manageable::STATUS_UNKNOWN_METHOD; + } +} + +}} diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h new file mode 100644 index 0000000000..8506325ddb --- /dev/null +++ b/cpp/src/qpid/broker/Bridge.h @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Bridge_ +#define _Bridge_ + +#include "qpid/framing/AMQP_ServerProxy.h" +#include "qpid/framing/ChannelHandler.h" +#include "qpid/management/Manageable.h" +#include "qpid/management/ArgsLinkBridge.h" +#include "qpid/management/Bridge.h" + +#include <boost/function.hpp> + +namespace qpid { +namespace broker { + +class Connection; + +class Bridge : public management::Manageable +{ +public: + typedef boost::function<void(Bridge*)> CancellationListener; + + Bridge(framing::ChannelId id, Connection& c, CancellationListener l, + const management::ArgsLinkBridge& args); + ~Bridge(); + + void create(); + void cancel(); + + management::ManagementObject::shared_ptr GetManagementObject() const; + management::Manageable::status_t ManagementMethod(uint32_t methodId, management::Args& args); + +private: + management::ArgsLinkBridge args; + framing::ChannelHandler channel; + framing::AMQP_ServerProxy peer; + management::Bridge::shared_ptr mgmtObject; + Connection& connection; + CancellationListener listener; +}; + + +}} + +#endif |