diff options
Diffstat (limited to 'cpp/common/framing/src')
-rw-r--r-- | cpp/common/framing/src/AMQContentBody.cpp | 35 | ||||
-rw-r--r-- | cpp/common/framing/src/AMQFrame.cpp | 147 | ||||
-rw-r--r-- | cpp/common/framing/src/AMQHeaderBody.cpp | 60 | ||||
-rw-r--r-- | cpp/common/framing/src/AMQMethodBody.cpp | 43 | ||||
-rw-r--r-- | cpp/common/framing/src/BasicHeaderProperties.cpp | 102 | ||||
-rw-r--r-- | cpp/common/framing/src/BodyHandler.cpp | 49 | ||||
-rw-r--r-- | cpp/common/framing/src/Buffer.cpp | 167 | ||||
-rw-r--r-- | cpp/common/framing/src/FieldTable.cpp | 127 | ||||
-rw-r--r-- | cpp/common/framing/src/NamedValue.cpp | 67 | ||||
-rw-r--r-- | cpp/common/framing/src/ProtocolInitiation.cpp | 53 | ||||
-rw-r--r-- | cpp/common/framing/src/Value.cpp | 57 |
11 files changed, 907 insertions, 0 deletions
diff --git a/cpp/common/framing/src/AMQContentBody.cpp b/cpp/common/framing/src/AMQContentBody.cpp new file mode 100644 index 0000000000..c8aadc8108 --- /dev/null +++ b/cpp/common/framing/src/AMQContentBody.cpp @@ -0,0 +1,35 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "AMQContentBody.h" + +qpid::framing::AMQContentBody::AMQContentBody(){ +} + +qpid::framing::AMQContentBody::AMQContentBody(string& _data) : data(_data){ +} + +u_int32_t qpid::framing::AMQContentBody::size() const{ + return data.size(); +} +void qpid::framing::AMQContentBody::encode(Buffer& buffer) const{ + buffer.putRawData(data); +} +void qpid::framing::AMQContentBody::decode(Buffer& buffer, u_int32_t size){ + buffer.getRawData(data, size); +} + diff --git a/cpp/common/framing/src/AMQFrame.cpp b/cpp/common/framing/src/AMQFrame.cpp new file mode 100644 index 0000000000..70f71010ff --- /dev/null +++ b/cpp/common/framing/src/AMQFrame.cpp @@ -0,0 +1,147 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "AMQFrame.h" +#include "QpidError.h" + +using namespace qpid::framing; + +AMQFrame::AMQFrame(){} + +AMQFrame::AMQFrame(u_int16_t _channel, AMQBody* _body) : channel(_channel), body(_body){} + +AMQFrame::AMQFrame(u_int16_t _channel, AMQBody::shared_ptr& _body) : channel(_channel), body(_body){} + +AMQFrame::~AMQFrame(){ +} + +u_int16_t AMQFrame::getChannel(){ + return channel; +} + +AMQBody::shared_ptr& AMQFrame::getBody(){ + return body; +} + +void AMQFrame::encode(Buffer& buffer) +{ + buffer.putOctet(body->type()); + buffer.putShort(channel); + buffer.putLong(body->size()); + body->encode(buffer); + buffer.putOctet(0xCE); +} + +AMQBody::shared_ptr createMethodBody(Buffer& buffer){ + u_int16_t classId = buffer.getShort(); + u_int16_t methodId = buffer.getShort(); + AMQBody::shared_ptr body(createAMQMethodBody(classId, methodId)); + return body; +} + +u_int32_t AMQFrame::size() const{ + if(!body.get()) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to get size of frame with no body set!"); + return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + body->size() + 1/*0xCE*/; +} + +bool AMQFrame::decode(Buffer& buffer) +{ + if(buffer.available() < 7) return false; + buffer.record(); + u_int8_t type = buffer.getOctet(); + channel = buffer.getShort(); + u_int32_t size = buffer.getLong(); + if(buffer.available() < size + 1){ + buffer.restore(); + return false; + } + switch(type) + { + case METHOD_BODY: + body = createMethodBody(buffer); + break; + case HEADER_BODY: + body = AMQBody::shared_ptr(new AMQHeaderBody()); + break; + case CONTENT_BODY: + body = AMQBody::shared_ptr(new AMQContentBody()); + break; + case HEARTBEAT_BODY: + body = AMQBody::shared_ptr(new AMQHeartbeatBody()); + break; + default: + string msg("Unknown body type: "); + msg += type; + THROW_QPID_ERROR(FRAMING_ERROR, msg); + } + body->decode(buffer, size); + u_int8_t end = buffer.getOctet(); + if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found"); + return true; +} + +u_int32_t AMQFrame::decodeHead(Buffer& buffer){ + type = buffer.getOctet(); + channel = buffer.getShort(); + return buffer.getLong(); +} + +void AMQFrame::decodeBody(Buffer& buffer, uint32_t size) +{ + switch(type) + { + case METHOD_BODY: + body = createMethodBody(buffer); + break; + case HEADER_BODY: + body = AMQBody::shared_ptr(new AMQHeaderBody()); + break; + case CONTENT_BODY: + body = AMQBody::shared_ptr(new AMQContentBody()); + break; + case HEARTBEAT_BODY: + body = AMQBody::shared_ptr(new AMQHeartbeatBody()); + break; + default: + string msg("Unknown body type: "); + msg += type; + THROW_QPID_ERROR(FRAMING_ERROR, msg); + } + body->decode(buffer, size); +} + +std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t){ + out << "Frame[channel=" << t.channel << "; "; + if(t.body.get() == 0){ + out << "empty"; + }else if(t.body->type() == METHOD_BODY){ + (dynamic_cast<AMQMethodBody*>(t.body.get()))->print(out); + }else if(t.body->type() == HEADER_BODY){ + out << "header, content_size=" << + (dynamic_cast<AMQHeaderBody*>(t.body.get()))->getContentSize() + << " (" << t.body->size() << " bytes)"; + }else if(t.body->type() == CONTENT_BODY){ + out << "content (" << t.body->size() << " bytes)"; + }else if(t.body->type() == HEARTBEAT_BODY){ + out << "heartbeat"; + }else{ + out << "unknown type, " << t.body->type(); + } + out << "]"; + return out; +} + diff --git a/cpp/common/framing/src/AMQHeaderBody.cpp b/cpp/common/framing/src/AMQHeaderBody.cpp new file mode 100644 index 0000000000..4bf1626a8a --- /dev/null +++ b/cpp/common/framing/src/AMQHeaderBody.cpp @@ -0,0 +1,60 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "AMQHeaderBody.h" +#include "QpidError.h" +#include "BasicHeaderProperties.h" + +qpid::framing::AMQHeaderBody::AMQHeaderBody(int classId) : weight(0), contentSize(0){ + createProperties(classId); +} + +qpid::framing::AMQHeaderBody::AMQHeaderBody() : properties(0), weight(0), contentSize(0){ +} + +qpid::framing::AMQHeaderBody::~AMQHeaderBody(){ + delete properties; +} + +u_int32_t qpid::framing::AMQHeaderBody::size() const{ + return 12 + properties->size(); +} + +void qpid::framing::AMQHeaderBody::encode(Buffer& buffer) const { + buffer.putShort(properties->classId()); + buffer.putShort(weight); + buffer.putLongLong(contentSize); + properties->encode(buffer); +} + +void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, u_int32_t size){ + u_int16_t classId = buffer.getShort(); + weight = buffer.getShort(); + contentSize = buffer.getLongLong(); + createProperties(classId); + properties->decode(buffer, size - 12); +} + +void qpid::framing::AMQHeaderBody::createProperties(int classId){ + switch(classId){ + case BASIC: + properties = new qpid::framing::BasicHeaderProperties(); + break; + default: + THROW_QPID_ERROR(FRAMING_ERROR, "Unknown header class"); + } +} diff --git a/cpp/common/framing/src/AMQMethodBody.cpp b/cpp/common/framing/src/AMQMethodBody.cpp new file mode 100644 index 0000000000..73862bb2bf --- /dev/null +++ b/cpp/common/framing/src/AMQMethodBody.cpp @@ -0,0 +1,43 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "AMQMethodBody.h" +#include "QpidError.h" + +void qpid::framing::AMQMethodBody::encode(Buffer& buffer) const{ + buffer.putShort(amqpClassId()); + buffer.putShort(amqpMethodId()); + encodeContent(buffer); +} + +void qpid::framing::AMQMethodBody::decode(Buffer& buffer, u_int32_t size){ + decodeContent(buffer); +} + +bool qpid::framing::AMQMethodBody::match(AMQMethodBody* other) const{ + return other != 0 && other->amqpClassId() == amqpClassId() && other->amqpMethodId() == amqpMethodId(); +} + +void qpid::framing::AMQMethodBody::invoke(AMQP_ServerOperations& target, u_int16_t channel){ + THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server."); +} + + +std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQMethodBody& m){ + m.print(out); + return out; +} diff --git a/cpp/common/framing/src/BasicHeaderProperties.cpp b/cpp/common/framing/src/BasicHeaderProperties.cpp new file mode 100644 index 0000000000..4219d33a8f --- /dev/null +++ b/cpp/common/framing/src/BasicHeaderProperties.cpp @@ -0,0 +1,102 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "BasicHeaderProperties.h" + +//TODO: This could be easily generated from the spec + +qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(0), priority(0), timestamp(0){} +qpid::framing::BasicHeaderProperties::~BasicHeaderProperties(){} + +u_int32_t qpid::framing::BasicHeaderProperties::size() const{ + u_int32_t size = 2;//flags + if(contentType.length() > 0) size += contentType.length() + 1; + if(contentEncoding.length() > 0) size += contentEncoding.length() + 1; + if(headers.count() > 0) size += headers.size(); + if(deliveryMode != 0) size += 1; + if(priority != 0) size += 1; + if(correlationId.length() > 0) size += correlationId.length() + 1; + if(replyTo.length() > 0) size += replyTo.length() + 1; + if(expiration.length() > 0) size += expiration.length() + 1; + if(messageId.length() > 0) size += messageId.length() + 1; + if(timestamp != 0) size += 8; + if(type.length() > 0) size += type.length() + 1; + if(userId.length() > 0) size += userId.length() + 1; + if(appId.length() > 0) size += appId.length() + 1; + if(clusterId.length() > 0) size += clusterId.length() + 1; + + return size; +} + +void qpid::framing::BasicHeaderProperties::encode(qpid::framing::Buffer& buffer) const{ + u_int16_t flags = getFlags(); + buffer.putShort(flags); + + if(contentType.length() > 0) buffer.putShortString(contentType); + if(contentEncoding.length() > 0) buffer.putShortString(contentEncoding); + if(headers.count() > 0) buffer.putFieldTable(headers); + if(deliveryMode != 0) buffer.putOctet(deliveryMode); + if(priority != 0) buffer.putOctet(priority); + if(correlationId.length() > 0) buffer.putShortString(correlationId); + if(replyTo.length() > 0) buffer.putShortString(replyTo); + if(expiration.length() > 0) buffer.putShortString(expiration); + if(messageId.length() > 0) buffer.putShortString(messageId); + if(timestamp != 0) buffer.putLongLong(timestamp);; + if(type.length() > 0) buffer.putShortString(type); + if(userId.length() > 0) buffer.putShortString(userId); + if(appId.length() > 0) buffer.putShortString(appId); + if(clusterId.length() > 0) buffer.putShortString(clusterId); +} + +void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer, u_int32_t size){ + u_int16_t flags = buffer.getShort(); + int shift = 15; + if(flags & (1 << 15)) buffer.getShortString(contentType); + if(flags & (1 << 14)) buffer.getShortString(contentEncoding); + if(flags & (1 << 13)) buffer.getFieldTable(headers); + if(flags & (1 << 12)) deliveryMode = buffer.getOctet(); + if(flags & (1 << 11)) priority = buffer.getOctet(); + if(flags & (1 << 10)) buffer.getShortString(correlationId); + if(flags & (1 << 9)) buffer.getShortString(replyTo); + if(flags & (1 << 8)) buffer.getShortString(expiration); + if(flags & (1 << 7)) buffer.getShortString(messageId); + if(flags & (1 << 6)) timestamp = buffer.getLongLong(); + if(flags & (1 << 5)) buffer.getShortString(type); + if(flags & (1 << 4)) buffer.getShortString(userId); + if(flags & (1 << 3)) buffer.getShortString(appId); + if(flags & (1 << 2)) buffer.getShortString(clusterId); +} + +u_int16_t qpid::framing::BasicHeaderProperties::getFlags() const{ + u_int16_t flags(0); + int shift = 15; + if(contentType.length() > 0) flags |= (1 << 15); + if(contentEncoding.length() > 0) flags |= (1 << 14); + if(headers.count() > 0) flags |= (1 << 13); + if(deliveryMode != 0) flags |= (1 << 12); + if(priority != 0) flags |= (1 << 11); + if(correlationId.length() > 0) flags |= (1 << 10); + if(replyTo.length() > 0) flags |= (1 << 9); + if(expiration.length() > 0) flags |= (1 << 8); + if(messageId.length() > 0) flags |= (1 << 7); + if(timestamp != 0) flags |= (1 << 6); + if(type.length() > 0) flags |= (1 << 5); + if(userId.length() > 0) flags |= (1 << 4); + if(appId.length() > 0) flags |= (1 << 3); + if(clusterId.length() > 0) flags |= (1 << 2); + return flags; +} diff --git a/cpp/common/framing/src/BodyHandler.cpp b/cpp/common/framing/src/BodyHandler.cpp new file mode 100644 index 0000000000..4e4e2e02f7 --- /dev/null +++ b/cpp/common/framing/src/BodyHandler.cpp @@ -0,0 +1,49 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "memory.h" +#include "BodyHandler.h" + +using namespace qpid::framing; +using namespace std::tr1; + +void BodyHandler::handleBody(AMQBody::shared_ptr& body){ + + switch(body->type()) + { + + case METHOD_BODY: + handleMethod(dynamic_pointer_cast<AMQMethodBody, AMQBody>(body)); + break; + + case HEADER_BODY: + handleHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body)); + break; + + case CONTENT_BODY: + handleContent(dynamic_pointer_cast<AMQContentBody, AMQBody>(body)); + break; + + case HEARTBEAT_BODY: + handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body)); + break; + + default: + throw UnknownBodyType(body->type()); + } + +} diff --git a/cpp/common/framing/src/Buffer.cpp b/cpp/common/framing/src/Buffer.cpp new file mode 100644 index 0000000000..5264491980 --- /dev/null +++ b/cpp/common/framing/src/Buffer.cpp @@ -0,0 +1,167 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Buffer.h" + +qpid::framing::Buffer::Buffer(int _size) : size(_size), position(0), limit(_size){ + data = new char[size]; +} + +qpid::framing::Buffer::~Buffer(){ + delete[] data; +} + +void qpid::framing::Buffer::flip(){ + limit = position; + position = 0; +} + +void qpid::framing::Buffer::clear(){ + limit = size; + position = 0; +} + +void qpid::framing::Buffer::compact(){ + int p = limit - position; + //copy p chars from position to 0 + memmove(data, data + position, p); + limit = size; + position = p; +} + +void qpid::framing::Buffer::record(){ + r_position = position; + r_limit = limit; +} + +void qpid::framing::Buffer::restore(){ + position = r_position; + limit = r_limit; +} + +int qpid::framing::Buffer::available(){ + return limit - position; +} + +char* qpid::framing::Buffer::start(){ + return data + position; +} + +void qpid::framing::Buffer::move(int bytes){ + position += bytes; +} + +void qpid::framing::Buffer::putOctet(u_int8_t i){ + data[position++] = i; +} + +void qpid::framing::Buffer::putShort(u_int16_t i){ + u_int16_t b = i; + data[position++] = (u_int8_t) (0xFF & (b >> 8)); + data[position++] = (u_int8_t) (0xFF & b); +} + +void qpid::framing::Buffer::putLong(u_int32_t i){ + u_int32_t b = i; + data[position++] = (u_int8_t) (0xFF & (b >> 24)); + data[position++] = (u_int8_t) (0xFF & (b >> 16)); + data[position++] = (u_int8_t) (0xFF & (b >> 8)); + data[position++] = (u_int8_t) (0xFF & b); +} + +void qpid::framing::Buffer::putLongLong(u_int64_t i){ + u_int32_t hi = i >> 32; + u_int32_t lo = i; + putLong(hi); + putLong(lo); +} + +u_int8_t qpid::framing::Buffer::getOctet(){ + return (u_int8_t) data[position++]; +} + +u_int16_t qpid::framing::Buffer::getShort(){ + u_int16_t hi = (unsigned char) data[position++]; + hi = hi << 8; + hi |= (unsigned char) data[position++]; + return hi; +} + +u_int32_t qpid::framing::Buffer::getLong(){ + u_int32_t a = (unsigned char) data[position++]; + u_int32_t b = (unsigned char) data[position++]; + u_int32_t c = (unsigned char) data[position++]; + u_int32_t d = (unsigned char) data[position++]; + a = a << 24; + a |= b << 16; + a |= c << 8; + a |= d; + return a; +} + +u_int64_t qpid::framing::Buffer::getLongLong(){ + u_int64_t hi = getLong(); + u_int64_t lo = getLong(); + hi = hi << 32; + return hi | lo; +} + + +void qpid::framing::Buffer::putShortString(const string& s){ + u_int8_t size = s.length(); + putOctet(size); + s.copy(data + position, size); + position += size; +} + +void qpid::framing::Buffer::putLongString(const string& s){ + u_int32_t size = s.length(); + putLong(size); + s.copy(data + position, size); + position += size; +} + +void qpid::framing::Buffer::getShortString(string& s){ + u_int8_t size = getOctet(); + s.assign(data + position, size); + position += size; +} + +void qpid::framing::Buffer::getLongString(string& s){ + u_int32_t size = getLong(); + s.assign(data + position, size); + position += size; +} + +void qpid::framing::Buffer::putFieldTable(const FieldTable& t){ + t.encode(*this); +} + +void qpid::framing::Buffer::getFieldTable(FieldTable& t){ + t.decode(*this); +} + +void qpid::framing::Buffer::putRawData(const string& s){ + u_int32_t size = s.length(); + s.copy(data + position, size); + position += size; +} + +void qpid::framing::Buffer::getRawData(string& s, u_int32_t size){ + s.assign(data + position, size); + position += size; +} diff --git a/cpp/common/framing/src/FieldTable.cpp b/cpp/common/framing/src/FieldTable.cpp new file mode 100644 index 0000000000..048cefa83c --- /dev/null +++ b/cpp/common/framing/src/FieldTable.cpp @@ -0,0 +1,127 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "FieldTable.h" +#include "NamedValue.h" +#include "QpidError.h" +#include "Buffer.h" +#include "Value.h" + +qpid::framing::FieldTable::~FieldTable(){ + int count(values.size()); + for(int i = 0; i < count; i++){ + delete values[i]; + } +} + +u_int32_t qpid::framing::FieldTable::size() const { + u_int32_t size(4); + int count(values.size()); + for(int i = 0; i < count; i++){ + size += values[i]->size(); + } + return size; +} + +int qpid::framing::FieldTable::count() const { + return values.size(); +} + +std::ostream& qpid::framing::operator<<(std::ostream& out, const FieldTable& t){ + out << "field_table{}"; + return out; +} + +void qpid::framing::FieldTable::setString(const std::string& name, const std::string& value){ + setValue(name, new StringValue(value)); +} + +void qpid::framing::FieldTable::setInt(const std::string& name, int value){ + setValue(name, new IntegerValue(value)); +} + +void qpid::framing::FieldTable::setTimestamp(const std::string& name, u_int64_t value){ + setValue(name, new TimeValue(value)); +} + +void qpid::framing::FieldTable::setTable(const std::string& name, const FieldTable& value){ + setValue(name, new FieldTableValue(value)); +} + +std::string qpid::framing::FieldTable::getString(const std::string& name){ + StringValue* val = dynamic_cast<StringValue*>(getValue(name)); + return (val == 0 ? "" : val->getValue()); +} + +int qpid::framing::FieldTable::getInt(const std::string& name){ + IntegerValue* val = dynamic_cast<IntegerValue*>(getValue(name)); + return (val == 0 ? 0 : val->getValue()); +} + +u_int64_t qpid::framing::FieldTable::getTimestamp(const std::string& name){ + TimeValue* val = dynamic_cast<TimeValue*>(getValue(name)); + return (val == 0 ? 0 : val->getValue()); +} + +void qpid::framing::FieldTable::getTable(const std::string& name, FieldTable& value){ + FieldTableValue* val = dynamic_cast<FieldTableValue*>(getValue(name)); + if(val != 0) value = val->getValue(); +} + +qpid::framing::NamedValue* qpid::framing::FieldTable::find(const std::string& name) const{ + int count(values.size()); + for(int i = 0; i < count; i++){ + if(values[i]->getName() == name) return values[i]; + } + return 0; +} + +qpid::framing::Value* qpid::framing::FieldTable::getValue(const std::string& name) const{ + NamedValue* val = find(name); + return val == 0 ? 0 : val->getValue(); +} + +void qpid::framing::FieldTable::setValue(const std::string& name, Value* value){ + NamedValue* val = find(name); + if(val == 0){ + val = new NamedValue(name, value); + values.push_back(val); + }else{ + Value* old = val->getValue(); + if(old != 0) delete old; + val->setValue(value); + } +} + +void qpid::framing::FieldTable::encode(Buffer& buffer) const{ + buffer.putLong(size() - 4); + int count(values.size()); + for(int i = 0; i < count; i++){ + values[i]->encode(buffer); + } +} + +void qpid::framing::FieldTable::decode(Buffer& buffer){ + u_int32_t size = buffer.getLong(); + int leftover = buffer.available() - size; + + while(buffer.available() > leftover){ + NamedValue* value = new NamedValue(); + value->decode(buffer); + values.push_back(value); + } +} diff --git a/cpp/common/framing/src/NamedValue.cpp b/cpp/common/framing/src/NamedValue.cpp new file mode 100644 index 0000000000..e80aea433c --- /dev/null +++ b/cpp/common/framing/src/NamedValue.cpp @@ -0,0 +1,67 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "NamedValue.h" +#include "QpidError.h" +#include "Buffer.h" +#include "FieldTable.h" + +qpid::framing::NamedValue::NamedValue() : value(0){} + +qpid::framing::NamedValue::NamedValue(const string& n, Value* v) : name(n), value(v){} + +qpid::framing::NamedValue::~NamedValue(){ + if(value != 0){ + delete value; + } +} + +u_int32_t qpid::framing::NamedValue::size() const{ + return value ? 1/*size of name*/ + name.length() + 1/*type char*/ + value->size() : 0; +} + +void qpid::framing::NamedValue::encode(Buffer& buffer){ + buffer.putShortString(name); + u_int8_t type = value->getType(); + buffer.putOctet(type); + value->encode(buffer); +} + +void qpid::framing::NamedValue::decode(Buffer& buffer){ + buffer.getShortString(name); + u_int8_t type = buffer.getOctet(); + switch(type){ + case 'S': + value = new StringValue(); + break; + case 'I': + value = new IntegerValue(); + break; + case 'D': + value = new DecimalValue(); + break; + case 'T': + value = new TimeValue(); + break; + case 'F': + value = new FieldTableValue(); + break; + default: + THROW_QPID_ERROR(FRAMING_ERROR, "Unknown field table value type"); + } + value->decode(buffer); +} diff --git a/cpp/common/framing/src/ProtocolInitiation.cpp b/cpp/common/framing/src/ProtocolInitiation.cpp new file mode 100644 index 0000000000..6806d73b55 --- /dev/null +++ b/cpp/common/framing/src/ProtocolInitiation.cpp @@ -0,0 +1,53 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "ProtocolInitiation.h" + +qpid::framing::ProtocolInitiation::ProtocolInitiation(){} + +qpid::framing::ProtocolInitiation::ProtocolInitiation(u_int8_t _major, u_int8_t _minor) : pmajor(_major), pminor(_minor){} + +qpid::framing::ProtocolInitiation::~ProtocolInitiation(){} + +void qpid::framing::ProtocolInitiation::encode(Buffer& buffer){ + buffer.putOctet('A'); + buffer.putOctet('M'); + buffer.putOctet('Q'); + buffer.putOctet('P'); + buffer.putOctet(1);//class + buffer.putOctet(1);//instance + buffer.putOctet(pmajor); + buffer.putOctet(pminor); +} + +bool qpid::framing::ProtocolInitiation::decode(Buffer& buffer){ + if(buffer.available() >= 8){ + buffer.getOctet();//A + buffer.getOctet();//M + buffer.getOctet();//Q + buffer.getOctet();//P + buffer.getOctet();//class + buffer.getOctet();//instance + pmajor = buffer.getOctet(); + pminor = buffer.getOctet(); + return true; + }else{ + return false; + } +} + +//TODO: this should prbably be generated from the spec at some point to keep the version numbers up to date diff --git a/cpp/common/framing/src/Value.cpp b/cpp/common/framing/src/Value.cpp new file mode 100644 index 0000000000..240b086696 --- /dev/null +++ b/cpp/common/framing/src/Value.cpp @@ -0,0 +1,57 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Value.h" +#include "Buffer.h" +#include "FieldTable.h" + +void qpid::framing::StringValue::encode(Buffer& buffer){ + buffer.putLongString(value); +} +void qpid::framing::StringValue::decode(Buffer& buffer){ + buffer.getLongString(value); +} + +void qpid::framing::IntegerValue::encode(Buffer& buffer){ + buffer.putLong((u_int32_t) value); +} +void qpid::framing::IntegerValue::decode(Buffer& buffer){ + value = buffer.getLong(); +} + +void qpid::framing::TimeValue::encode(Buffer& buffer){ + buffer.putLongLong(value); +} +void qpid::framing::TimeValue::decode(Buffer& buffer){ + value = buffer.getLongLong(); +} + +void qpid::framing::DecimalValue::encode(Buffer& buffer){ + buffer.putOctet(decimals); + buffer.putLong(value); +} +void qpid::framing::DecimalValue::decode(Buffer& buffer){ + decimals = buffer.getOctet(); + value = buffer.getLong(); +} + +void qpid::framing::FieldTableValue::encode(Buffer& buffer){ + buffer.putFieldTable(value); +} +void qpid::framing::FieldTableValue::decode(Buffer& buffer){ + buffer.getFieldTable(value); +} |