summaryrefslogtreecommitdiff
path: root/contrib
diff options
context:
space:
mode:
authorJens Geyer <jensg@apache.org>2014-03-07 19:41:48 +0100
committerJens Geyer <jensg@apache.org>2014-03-07 19:41:48 +0100
commit7bea35a4c2601b3227ba4062c401a4104e0fffb0 (patch)
treef98941c462723153ca1d347dd0dd8c7270cf94b5 /contrib
parentc975bbcc9c3c618a6ee8902ae47fed89a025b597 (diff)
downloadthrift-7bea35a4c2601b3227ba4062c401a4104e0fffb0.tar.gz
THRIFT-2382 contrib: sample for connecting Thrift with STOMP
Patch: Jens Geyer
Diffstat (limited to 'contrib')
-rw-r--r--contrib/Stomp/README18
-rw-r--r--contrib/Stomp/Thrift.Transport.STOMP.pas200
2 files changed, 218 insertions, 0 deletions
diff --git a/contrib/Stomp/README b/contrib/Stomp/README
new file mode 100644
index 000000000..2e5f21cbf
--- /dev/null
+++ b/contrib/Stomp/README
@@ -0,0 +1,18 @@
+Sample code for STOMP-based Thrift clients and/or servers.
+
+Although the sample Thrift STOMP Transport is written in
+Delphi/Pascal, it can easily serve as a starting point for
+similar implementations in other languages.
+
+STOMP is a protocol widely supported by many messaging systems,
+such as Apache ActiveMQ, RabbitMQ and many others. In particular,
+it can be used to communicate with Service-Bus products like Rebus
+or NServiceBus, when running against a STOMP-capable MQ system.
+
+A prerequisite for this sample is the Delphi STOMP Adapter written
+by Daniele Teti (http://www.danieleteti.it/stomp-client), currently
+hosted at Google Code (http://code.google.com/p/delphistompclient).
+
+At the time of writing, the STOMP adapter does not fully support
+binary data. Please check whether this has been fixed, otherwise
+you have to use the JSON protocol (or to fix it on your own).
diff --git a/contrib/Stomp/Thrift.Transport.STOMP.pas b/contrib/Stomp/Thrift.Transport.STOMP.pas
new file mode 100644
index 000000000..7dfb3763c
--- /dev/null
+++ b/contrib/Stomp/Thrift.Transport.STOMP.pas
@@ -0,0 +1,200 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *)
+
+unit Thrift.Transport.STOMP;
+
+interface
+
+uses
+ Classes,Windows, SysUtils,
+ Thrift,
+ Thrift.Transport,
+ Thrift.Protocol,
+ Thrift.Stream,
+ StompClient,
+ StompTypes;
+
+type
+ TStompTransportImpl = class( TStreamTransportImpl)
+ strict private
+ FData : TStringStream;
+ FServer : string;
+ FOutQueue : string;
+ FStompCli : IStompClient;
+ protected
+ function GetIsOpen: Boolean; override;
+ function Peek: Boolean; override;
+ public
+ constructor Create( const aServerAndPort, aOutQueue : string);
+ destructor Destroy; override;
+
+ procedure Open(); override;
+ procedure Close(); override;
+ procedure Flush; override;
+ end;
+
+
+ TStompServerTransportImpl = class( TServerTransportImpl)
+ strict private
+ FServer : string;
+ FInQueue : string;
+ FClient : IStompClient;
+ protected
+ procedure Listen; override;
+ procedure Close; override;
+ function Accept( const fnAccepting: TProc): ITransport; override;
+ public
+ constructor Create( const aServerAndPort, aInQueue : string);
+ destructor Destroy; override;
+ end;
+
+
+const
+ QUEUE_PREFIX = '/queue/';
+ TOPIC_PREFIX = '/topic/';
+ EXCHANGE_PREFIX = '/exchange/';
+
+
+implementation
+
+
+
+constructor TStompTransportImpl.Create( const aServerAndPort, aOutQueue : string);
+var adapter : IThriftStream;
+begin
+ FData := TStringStream.Create;
+ FServer := aServerAndPort;
+ FOutQueue := aOutQueue;
+
+ adapter := TThriftStreamAdapterDelphi.Create( FData, FALSE);
+ inherited Create( nil, adapter); // output only
+end;
+
+
+destructor TStompTransportImpl.Destroy;
+begin
+ inherited Destroy;
+ FreeAndNil( FData);
+ FStompCli := nil;
+end;
+
+
+function TStompTransportImpl.GetIsOpen: Boolean;
+begin
+ result := (FStompCli <> nil);
+end;
+
+
+function TStompTransportImpl.Peek: Boolean;
+begin
+ result := FALSE; // output only
+end;
+
+
+procedure TStompTransportImpl.Open;
+begin
+ if FStompCli <> nil
+ then raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen, 'already open')
+ else FStompCli := StompUtils.NewStomp( FServer);
+end;
+
+
+procedure TStompTransportImpl.Close;
+begin
+ FStompCli := nil;
+ FData.Clear;
+end;
+
+
+procedure TStompTransportImpl.Flush;
+begin
+ if FStompCli = nil
+ then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'not open');
+
+ FStompCli.Send( FOutQueue, FData.DataString);
+ FData.Clear;
+end;
+
+
+//--- TStompServerTransportImpl --------------------------------------------
+
+
+constructor TStompServerTransportImpl.Create( const aServerAndPort, aInQueue : string);
+begin
+ inherited Create;
+ FServer := aServerAndPort;
+ FInQueue := aInQueue;
+end;
+
+
+destructor TStompServerTransportImpl.Destroy;
+begin
+ try
+ Close;
+ finally
+ inherited Destroy;
+ end;
+end;
+
+
+procedure TStompServerTransportImpl.Listen;
+begin
+ FClient := StompUtils.NewStomp(FServer);
+ FClient.Subscribe( FInQueue);
+end;
+
+
+procedure TStompServerTransportImpl.Close;
+begin
+ if FClient <> nil then begin
+ FClient.Unsubscribe( FInQueue);
+ FClient := nil;
+ end;
+end;
+
+
+function TStompServerTransportImpl.Accept( const fnAccepting: TProc): ITransport;
+var frame : IStompFrame;
+ adapter : IThriftStream;
+ stream : TStringStream;
+begin
+ if FClient = nil
+ then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+ 'Not connected.');
+
+ if Assigned(fnAccepting)
+ then fnAccepting();
+
+ try
+ frame := FClient.Receive(MAXINT);
+ if frame = nil then Exit(nil);
+
+ stream := TStringStream.Create( frame.GetBody);
+ adapter := TThriftStreamAdapterDelphi.Create( stream, TRUE);
+ result := TStreamTransportImpl.Create( adapter, nil);
+
+ except
+ on E: Exception
+ do raise TTransportException.Create( E.ToString );
+ end;
+end;
+
+
+end.
+