diff options
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp')
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp | 337 |
1 files changed, 337 insertions, 0 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp new file mode 100644 index 0000000000..f2cb5740d3 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp @@ -0,0 +1,337 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/AMQFrame.h" + +#include "MessageBodyStream.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; +using namespace System::Threading; +using namespace msclr; + +using namespace qpid::client; +using namespace qpid::framing; + +// Thefolowing def must match "Frames" private typedef. +// TODO: make "Frames" publicly visible. +typedef qpid::InlineVector<AMQFrame, 4> FrameSetFrames; + +using namespace std; + +static void ThrowIfBadArgs (array<unsigned char>^ buffer, int offset, int count) +{ + if (buffer == nullptr) + throw gcnew ArgumentNullException("buffer"); + + if (offset < 0) + throw gcnew ArgumentOutOfRangeException("offset"); + + if (count < 0) + throw gcnew ArgumentOutOfRangeException("count"); + + if ((offset + count) > buffer->Length) + throw gcnew ArgumentException("offset + count"); +} + + +// Input stream constructor + +MessageBodyStream::MessageBodyStream(FrameSet::shared_ptr *fspp) +{ + isInputStream = true; + frameSetpp = fspp; + fragmentCount = 0; + length = 0; + position = 0; + currentFramep = NULL; + + const std::string *datap; // pointer to the fragment's string variable that holds the content + + for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) { + if (i->getBody()->type() == CONTENT_BODY) { + fragmentCount++; + datap = &(i->castBody<AMQContentBody>()->getData()); + length += datap->size(); + } + } + + // fragmentCount can be zero for an empty message + + fragmentIndex = 0; + fragmentPosition = 0; + + if (fragmentCount == 0) { + currentFragment = NULL; + fragmentLength = 0; + } + else if (fragmentCount == 1) { + currentFragment = datap->data(); + fragmentLength = (int) length; + } + else { + fragments = gcnew array<IntPtr>(fragmentCount); + fragmentIndex = 0; + for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) { + if (i->getBody()->type() == CONTENT_BODY) { + datap = &(i->castBody<AMQContentBody>()->getData()); + fragments[fragmentIndex++] = (IntPtr) (void *) datap; + } + } + fragmentIndex = 0; + datap = (const std::string *) fragments[0].ToPointer(); + currentFragment = datap->data(); + fragmentLength = datap->size(); + } +} + + +int MessageBodyStream::Read(array<unsigned char>^ buffer, int offset, int count) +{ + if (!isInputStream) + throw gcnew NotSupportedException(); + if (disposed) + throw gcnew ObjectDisposedException("Stream"); + if (count == 0) + return 0; + ThrowIfBadArgs(buffer, offset, count); + + int nRead = 0; + int remaining = count; + + while (nRead < count) { + int fragAvail = fragmentLength - fragmentPosition; + int copyCount = min (fragAvail, remaining); + if (copyCount == 0) { + // no more to read + return nRead; + } + + // copy from native space + IntPtr nativep = (IntPtr) (void *) (currentFragment + fragmentPosition); + Marshal::Copy (nativep, buffer, offset, copyCount); + nRead += copyCount; + remaining -= copyCount; + fragmentPosition += copyCount; + offset += copyCount; + + // advance to next fragment? + if (fragmentPosition == fragmentLength) { + if (++fragmentIndex < fragmentCount) { + const std::string *datap = (const std::string *) fragments[fragmentIndex].ToPointer(); + currentFragment = datap->data(); + fragmentLength = datap->size(); + fragmentPosition = 0; + } + } + } + + return nRead; +} + + +void MessageBodyStream::pushCurrentFrame(bool lastFrame) +{ + // set flags as in SessionImpl::sendContent. + if (currentFramep->getBody()->type() == CONTENT_BODY) { + + if ((fragmentCount == 1) && lastFrame) { + // only one content frame + currentFramep->setFirstSegment(false); + } + else { + currentFramep->setFirstSegment(false); + currentFramep->setLastSegment(true); + if (fragmentCount != 1) { + currentFramep->setFirstFrame(false); + } + if (!lastFrame) { + currentFramep->setLastFrame(false); + } + } + } + else { + // the header frame + currentFramep->setFirstSegment(false); + if (!lastFrame) { + // there will be at least one content frame + currentFramep->setLastSegment(false); + } + } + + // add to frame set. This makes a copy and ref counts the body + (*frameSetpp)->append(*currentFramep); + + delete currentFramep; + + currentFramep = NULL; +} + + +IntPtr MessageBodyStream::GetFrameSet() +{ + if (currentFramep != NULL) { + // No more content. Tidy up the pending (possibly single header) frame. + pushCurrentFrame(true); + } + + if (frameSetpp == NULL) { + return (IntPtr) NULL; + } + + // shared_ptr.get() + return (IntPtr) (void *) (*frameSetpp).get(); +} + +IntPtr MessageBodyStream::GetHeader() +{ + return (IntPtr) headerBodyp; +} + + +// Ouput stream constructor + +MessageBodyStream::MessageBodyStream(int maxFrameSize) +{ + isInputStream = false; + + maxFrameContentSize = maxFrameSize - AMQFrame::frameOverhead(); + SequenceNumber unused; // only meaningful on incoming frames + frameSetpp = new FrameSet::shared_ptr(new FrameSet(unused)); + fragmentCount = 0; + length = 0; + position = 0; + + // header goes first in the outgoing frameset + + boost::intrusive_ptr<AMQBody> headerBody(new AMQHeaderBody); + currentFramep = new AMQFrame(headerBody); + headerBodyp = static_cast<AMQHeaderBody*>(headerBody.get()); + + // mark this header frame as "full" to force the first write to create a new content frame + fragmentPosition = maxFrameContentSize; +} + +void MessageBodyStream::Write(array<unsigned char>^ buffer, int offset, int count) +{ + if (isInputStream) + throw gcnew NotSupportedException(); + if (disposed) + throw gcnew ObjectDisposedException("Stream"); + if (count == 0) + return; + ThrowIfBadArgs(buffer, offset, count); + + if (currentFramep == NULL) { + // GetFrameSet() has been called and we no longer exclusively own the underlying frames. + throw gcnew InvalidOperationException ("Mesage Body output already completed"); + } + + if (count <= 0) + return; + + // keep GC memory movement at bay while copying to native space + pin_ptr<unsigned char> pinnedBuf = &buffer[0]; + + string *datap; + + int remaining = count; + while (remaining > 0) { + if (fragmentPosition == maxFrameContentSize) { + // move to a new frame, but not until ready to add new content. + // zero content is valid, or the final write may exactly fill to maxFrameContentSize + + pushCurrentFrame(false); + + currentFramep = new AMQFrame(AMQContentBody()); + fragmentPosition = 0; + fragmentCount++; + } + + int copyCount = min (remaining, (maxFrameContentSize - fragmentPosition)); + datap = &(currentFramep->castBody<AMQContentBody>()->getData()); + + char *outp = (char *) pinnedBuf + offset; + if (fragmentPosition == 0) { + datap->assign(outp, copyCount); + } + else { + datap->append(outp, copyCount); + } + + position += copyCount; + fragmentPosition += copyCount; + remaining -= copyCount; + offset += copyCount; + } +} + + +void MessageBodyStream::Cleanup() +{ + { + lock l(this); + if (disposed) + return; + + disposed = true; + } + + try {} + finally + { + if (frameSetpp != NULL) { + delete frameSetpp; + frameSetpp = NULL; + } + if (currentFramep != NULL) { + delete currentFramep; + currentFramep = NULL; + } + } +} + +MessageBodyStream::~MessageBodyStream() +{ + Cleanup(); +} + +MessageBodyStream::!MessageBodyStream() +{ + Cleanup(); +} + +void MessageBodyStream::Close() +{ + // Simulate Dispose()... + Cleanup(); + GC::SuppressFinalize(this); +} + + +}}} // namespace Apache::Qpid::Interop |