summaryrefslogtreecommitdiff
path: root/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp')
-rw-r--r--trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp337
1 files changed, 0 insertions, 337 deletions
diff --git a/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp b/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp
deleted file mode 100644
index f2cb5740d3..0000000000
--- a/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-#include <windows.h>
-#include <msclr\lock.h>
-
-#include "qpid/client/AsyncSession.h"
-#include "qpid/framing/FrameSet.h"
-#include "qpid/framing/AMQFrame.h"
-
-#include "MessageBodyStream.h"
-
-namespace Apache {
-namespace Qpid {
-namespace Interop {
-
-using namespace System;
-using namespace System::Runtime::InteropServices;
-using namespace System::Threading;
-using namespace msclr;
-
-using namespace qpid::client;
-using namespace qpid::framing;
-
-// Thefolowing def must match "Frames" private typedef.
-// TODO: make "Frames" publicly visible.
-typedef qpid::InlineVector<AMQFrame, 4> FrameSetFrames;
-
-using namespace std;
-
-static void ThrowIfBadArgs (array<unsigned char>^ buffer, int offset, int count)
-{
- if (buffer == nullptr)
- throw gcnew ArgumentNullException("buffer");
-
- if (offset < 0)
- throw gcnew ArgumentOutOfRangeException("offset");
-
- if (count < 0)
- throw gcnew ArgumentOutOfRangeException("count");
-
- if ((offset + count) > buffer->Length)
- throw gcnew ArgumentException("offset + count");
-}
-
-
-// Input stream constructor
-
-MessageBodyStream::MessageBodyStream(FrameSet::shared_ptr *fspp)
-{
- isInputStream = true;
- frameSetpp = fspp;
- fragmentCount = 0;
- length = 0;
- position = 0;
- currentFramep = NULL;
-
- const std::string *datap; // pointer to the fragment's string variable that holds the content
-
- for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) {
- if (i->getBody()->type() == CONTENT_BODY) {
- fragmentCount++;
- datap = &(i->castBody<AMQContentBody>()->getData());
- length += datap->size();
- }
- }
-
- // fragmentCount can be zero for an empty message
-
- fragmentIndex = 0;
- fragmentPosition = 0;
-
- if (fragmentCount == 0) {
- currentFragment = NULL;
- fragmentLength = 0;
- }
- else if (fragmentCount == 1) {
- currentFragment = datap->data();
- fragmentLength = (int) length;
- }
- else {
- fragments = gcnew array<IntPtr>(fragmentCount);
- fragmentIndex = 0;
- for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) {
- if (i->getBody()->type() == CONTENT_BODY) {
- datap = &(i->castBody<AMQContentBody>()->getData());
- fragments[fragmentIndex++] = (IntPtr) (void *) datap;
- }
- }
- fragmentIndex = 0;
- datap = (const std::string *) fragments[0].ToPointer();
- currentFragment = datap->data();
- fragmentLength = datap->size();
- }
-}
-
-
-int MessageBodyStream::Read(array<unsigned char>^ buffer, int offset, int count)
-{
- if (!isInputStream)
- throw gcnew NotSupportedException();
- if (disposed)
- throw gcnew ObjectDisposedException("Stream");
- if (count == 0)
- return 0;
- ThrowIfBadArgs(buffer, offset, count);
-
- int nRead = 0;
- int remaining = count;
-
- while (nRead < count) {
- int fragAvail = fragmentLength - fragmentPosition;
- int copyCount = min (fragAvail, remaining);
- if (copyCount == 0) {
- // no more to read
- return nRead;
- }
-
- // copy from native space
- IntPtr nativep = (IntPtr) (void *) (currentFragment + fragmentPosition);
- Marshal::Copy (nativep, buffer, offset, copyCount);
- nRead += copyCount;
- remaining -= copyCount;
- fragmentPosition += copyCount;
- offset += copyCount;
-
- // advance to next fragment?
- if (fragmentPosition == fragmentLength) {
- if (++fragmentIndex < fragmentCount) {
- const std::string *datap = (const std::string *) fragments[fragmentIndex].ToPointer();
- currentFragment = datap->data();
- fragmentLength = datap->size();
- fragmentPosition = 0;
- }
- }
- }
-
- return nRead;
-}
-
-
-void MessageBodyStream::pushCurrentFrame(bool lastFrame)
-{
- // set flags as in SessionImpl::sendContent.
- if (currentFramep->getBody()->type() == CONTENT_BODY) {
-
- if ((fragmentCount == 1) && lastFrame) {
- // only one content frame
- currentFramep->setFirstSegment(false);
- }
- else {
- currentFramep->setFirstSegment(false);
- currentFramep->setLastSegment(true);
- if (fragmentCount != 1) {
- currentFramep->setFirstFrame(false);
- }
- if (!lastFrame) {
- currentFramep->setLastFrame(false);
- }
- }
- }
- else {
- // the header frame
- currentFramep->setFirstSegment(false);
- if (!lastFrame) {
- // there will be at least one content frame
- currentFramep->setLastSegment(false);
- }
- }
-
- // add to frame set. This makes a copy and ref counts the body
- (*frameSetpp)->append(*currentFramep);
-
- delete currentFramep;
-
- currentFramep = NULL;
-}
-
-
-IntPtr MessageBodyStream::GetFrameSet()
-{
- if (currentFramep != NULL) {
- // No more content. Tidy up the pending (possibly single header) frame.
- pushCurrentFrame(true);
- }
-
- if (frameSetpp == NULL) {
- return (IntPtr) NULL;
- }
-
- // shared_ptr.get()
- return (IntPtr) (void *) (*frameSetpp).get();
-}
-
-IntPtr MessageBodyStream::GetHeader()
-{
- return (IntPtr) headerBodyp;
-}
-
-
-// Ouput stream constructor
-
-MessageBodyStream::MessageBodyStream(int maxFrameSize)
-{
- isInputStream = false;
-
- maxFrameContentSize = maxFrameSize - AMQFrame::frameOverhead();
- SequenceNumber unused; // only meaningful on incoming frames
- frameSetpp = new FrameSet::shared_ptr(new FrameSet(unused));
- fragmentCount = 0;
- length = 0;
- position = 0;
-
- // header goes first in the outgoing frameset
-
- boost::intrusive_ptr<AMQBody> headerBody(new AMQHeaderBody);
- currentFramep = new AMQFrame(headerBody);
- headerBodyp = static_cast<AMQHeaderBody*>(headerBody.get());
-
- // mark this header frame as "full" to force the first write to create a new content frame
- fragmentPosition = maxFrameContentSize;
-}
-
-void MessageBodyStream::Write(array<unsigned char>^ buffer, int offset, int count)
-{
- if (isInputStream)
- throw gcnew NotSupportedException();
- if (disposed)
- throw gcnew ObjectDisposedException("Stream");
- if (count == 0)
- return;
- ThrowIfBadArgs(buffer, offset, count);
-
- if (currentFramep == NULL) {
- // GetFrameSet() has been called and we no longer exclusively own the underlying frames.
- throw gcnew InvalidOperationException ("Mesage Body output already completed");
- }
-
- if (count <= 0)
- return;
-
- // keep GC memory movement at bay while copying to native space
- pin_ptr<unsigned char> pinnedBuf = &buffer[0];
-
- string *datap;
-
- int remaining = count;
- while (remaining > 0) {
- if (fragmentPosition == maxFrameContentSize) {
- // move to a new frame, but not until ready to add new content.
- // zero content is valid, or the final write may exactly fill to maxFrameContentSize
-
- pushCurrentFrame(false);
-
- currentFramep = new AMQFrame(AMQContentBody());
- fragmentPosition = 0;
- fragmentCount++;
- }
-
- int copyCount = min (remaining, (maxFrameContentSize - fragmentPosition));
- datap = &(currentFramep->castBody<AMQContentBody>()->getData());
-
- char *outp = (char *) pinnedBuf + offset;
- if (fragmentPosition == 0) {
- datap->assign(outp, copyCount);
- }
- else {
- datap->append(outp, copyCount);
- }
-
- position += copyCount;
- fragmentPosition += copyCount;
- remaining -= copyCount;
- offset += copyCount;
- }
-}
-
-
-void MessageBodyStream::Cleanup()
-{
- {
- lock l(this);
- if (disposed)
- return;
-
- disposed = true;
- }
-
- try {}
- finally
- {
- if (frameSetpp != NULL) {
- delete frameSetpp;
- frameSetpp = NULL;
- }
- if (currentFramep != NULL) {
- delete currentFramep;
- currentFramep = NULL;
- }
- }
-}
-
-MessageBodyStream::~MessageBodyStream()
-{
- Cleanup();
-}
-
-MessageBodyStream::!MessageBodyStream()
-{
- Cleanup();
-}
-
-void MessageBodyStream::Close()
-{
- // Simulate Dispose()...
- Cleanup();
- GC::SuppressFinalize(this);
-}
-
-
-}}} // namespace Apache::Qpid::Interop