summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES.md2
-rw-r--r--lib/go/thrift/common_test.go78
-rw-r--r--lib/go/thrift/example_middleware_test.go52
-rw-r--r--lib/go/thrift/middleware.go70
-rw-r--r--lib/go/thrift/middleware_test.go110
-rw-r--r--lib/go/thrift/multiplexed_protocol.go61
-rw-r--r--lib/go/thrift/multiplexed_protocol_test.go53
-rw-r--r--lib/go/thrift/processor_factory.go10
8 files changed, 435 insertions, 1 deletions
diff --git a/CHANGES.md b/CHANGES.md
index ca84cb7d2..b8bef21e2 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -10,6 +10,7 @@
- [THRIFT-5006](https://issues.apache.org/jira/browse/THRIFT-5006) - Implement DEFAULT_MAX_LENGTH at TFramedTransport
- [THRIFT-5069](https://issues.apache.org/jira/browse/THRIFT-5069) - In Go library TDeserializer.Transport is now typed \*TMemoryBuffer instead of TTransport
- [THRIFT-5072](https://issues.apache.org/jira/browse/THRIFT-5072) - Haskell generator fails to distinguish between multiple enum types with conflicting enum identifiers
+- [THRIFT-5164](https://issues.apache.org/jira/browse/THRIFT-5164) - In Go library TProcessor interface now includes ProcessorMap and AddToProcessorMap functions.
### Java
@@ -18,6 +19,7 @@
### Go
- [THRIFT-5069](https://issues.apache.org/jira/browse/THRIFT-5069) - Add TSerializerPool and TDeserializerPool, which are thread-safe versions of TSerializer and TDeserializer.
+- [THRIFT-5164](https://issues.apache.org/jira/browse/THRIFT-5164) - Add ProcessorMiddleware function type and WrapProcessor function to support wrapping a TProcessor with middleware functions.
## 0.13.0
diff --git a/lib/go/thrift/common_test.go b/lib/go/thrift/common_test.go
index 93597ff8a..95d4e2130 100644
--- a/lib/go/thrift/common_test.go
+++ b/lib/go/thrift/common_test.go
@@ -19,7 +19,10 @@
package thrift
-import "context"
+import (
+ "context"
+ "fmt"
+)
type mockProcessor struct {
ProcessFunc func(in, out TProtocol) (bool, TException)
@@ -28,3 +31,76 @@ type mockProcessor struct {
func (m *mockProcessor) Process(ctx context.Context, in, out TProtocol) (bool, TException) {
return m.ProcessFunc(in, out)
}
+
+func (m *mockProcessor) ProcessorMap() map[string]TProcessorFunction {
+ return map[string]TProcessorFunction{
+ "mock": WrappedTProcessorFunction{
+ Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) {
+ return m.ProcessFunc(in, out)
+ },
+ },
+ }
+}
+
+func (m *mockProcessor) AddToProcessorMap(name string, processorFunc TProcessorFunction) {}
+
+type mockWrappedProcessorContextKey int
+
+const (
+ processorName mockWrappedProcessorContextKey = iota
+)
+
+// setMockWrappableProcessorName sets the "name" of the TProcessorFunction to
+// call on a mockWrappableProcessor when calling Process.
+//
+// In a normal TProcessor, the request name is read from the request itself
+// which happens in TProcessor.Process, so it is not passed into the call to
+// Process itself, to get around this in testing, mockWrappableProcessor calls
+// getMockWrappableProcessorName to get the name to use from the context
+// object.
+func setMockWrappableProcessorName(ctx context.Context, name string) context.Context {
+ return context.WithValue(ctx, processorName, name)
+}
+
+// getMockWrappableProcessorName gets the "name" of the TProcessorFunction to
+// call on a mockWrappableProcessor when calling Process.
+func getMockWrappableProcessorName(ctx context.Context) (string, bool) {
+ val, ok := ctx.Value(processorName).(string)
+ return val, ok
+}
+
+// mockWrappableProcessor can be used to create a mock object that fufills the
+// TProcessor interface in testing.
+type mockWrappableProcessor struct {
+ ProcessorFuncs map[string]TProcessorFunction
+}
+
+// Process calls the TProcessorFunction assigned to the "name" set on the
+// context object by setMockWrappableProcessorName.
+//
+// If no name is set on the context or there is no TProcessorFunction mapped to
+// that name, the call will panic.
+func (p *mockWrappableProcessor) Process(ctx context.Context, in, out TProtocol) (bool, TException) {
+ name, ok := getMockWrappableProcessorName(ctx)
+ if !ok {
+ panic("MockWrappableProcessorName not set on context")
+ }
+ processor, ok := p.ProcessorMap()[name]
+ if !ok {
+ panic(fmt.Sprintf("No processor set for name %q", name))
+ }
+ return processor.Process(ctx, 0, in, out)
+}
+
+func (p *mockWrappableProcessor) ProcessorMap() map[string]TProcessorFunction {
+ return p.ProcessorFuncs
+}
+
+func (p *mockWrappableProcessor) AddToProcessorMap(name string, processorFunc TProcessorFunction) {
+ p.ProcessorFuncs[name] = processorFunc
+}
+
+var (
+ _ TProcessor = (*mockProcessor)(nil)
+ _ TProcessor = (*mockWrappableProcessor)(nil)
+)
diff --git a/lib/go/thrift/example_middleware_test.go b/lib/go/thrift/example_middleware_test.go
new file mode 100644
index 000000000..47061103d
--- /dev/null
+++ b/lib/go/thrift/example_middleware_test.go
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package thrift
+
+import (
+ "context"
+ "log"
+)
+
+func simpleLoggingMiddleware(name string, next TProcessorFunction) TProcessorFunction {
+ return WrappedTProcessorFunction{
+ Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) {
+ log.Printf("Before: %q", name)
+ success, err := next.Process(ctx, seqId, in, out)
+ log.Printf("After: %q", name)
+ log.Printf("Success: %v", success)
+ if err != nil {
+ log.Printf("Error: %v", err)
+ }
+ return success, err
+ },
+ }
+}
+
+func ExampleProcessorMiddleware() {
+ var (
+ processor TProcessor
+ trans TServerTransport
+ transFactory TTransportFactory
+ protoFactory TProtocolFactory
+ )
+ processor = WrapProcessor(processor, simpleLoggingMiddleware)
+ server := NewTSimpleServer4(processor, trans, transFactory, protoFactory)
+ log.Fatal(server.Serve())
+}
diff --git a/lib/go/thrift/middleware.go b/lib/go/thrift/middleware.go
new file mode 100644
index 000000000..18f2b99c1
--- /dev/null
+++ b/lib/go/thrift/middleware.go
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package thrift
+
+import "context"
+
+// ProcessorMiddleware is a function that can be passed to WrapProcessor to wrap the
+// TProcessorFunctions for that TProcessor.
+//
+// Middlewares are passed in the name of the function as set in the processor
+// map of the TProcessor.
+type ProcessorMiddleware func(name string, next TProcessorFunction) TProcessorFunction
+
+// WrapProcessor takes an existing TProcessor and wraps each of its inner
+// TProcessorFunctions with the middlewares passed in and returns it.
+//
+// Middlewares will be called in the order that they are defined:
+//
+// 1. Middlewares[0]
+// 2. Middlewares[1]
+// ...
+// N. Middlewares[n]
+func WrapProcessor(processor TProcessor, middlewares ...ProcessorMiddleware) TProcessor {
+ for name, processorFunc := range processor.ProcessorMap() {
+ wrapped := processorFunc
+ // Add middlewares in reverse so the first in the list is the outermost.
+ for i := len(middlewares) - 1; i >= 0; i-- {
+ wrapped = middlewares[i](name, wrapped)
+ }
+ processor.AddToProcessorMap(name, wrapped)
+ }
+ return processor
+}
+
+// WrappedTProcessorFunction is a convenience struct that implements the
+// TProcessorFunction interface that can be used when implementing custom
+// Middleware.
+type WrappedTProcessorFunction struct {
+ // Wrapped is called by WrappedTProcessorFunction.Process and should be a
+ // "wrapped" call to a base TProcessorFunc.Process call.
+ Wrapped func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException)
+}
+
+// Process implements the TProcessorFunction interface using p.Wrapped.
+func (p WrappedTProcessorFunction) Process(ctx context.Context, seqID int32, in, out TProtocol) (bool, TException) {
+ return p.Wrapped(ctx, seqID, in, out)
+}
+
+// verify that WrappedTProcessorFunction implements TProcessorFunction
+var (
+ _ TProcessorFunction = WrappedTProcessorFunction{}
+ _ TProcessorFunction = (*WrappedTProcessorFunction)(nil)
+)
diff --git a/lib/go/thrift/middleware_test.go b/lib/go/thrift/middleware_test.go
new file mode 100644
index 000000000..81cbc7bd1
--- /dev/null
+++ b/lib/go/thrift/middleware_test.go
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package thrift
+
+import (
+ "context"
+ "testing"
+)
+
+type counter struct {
+ count int
+}
+
+func (c *counter) incr() {
+ c.count++
+}
+
+func testMiddleware(c *counter) ProcessorMiddleware {
+ return func(name string, next TProcessorFunction) TProcessorFunction {
+ return WrappedTProcessorFunction{
+ Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) {
+ c.incr()
+ return next.Process(ctx, seqId, in, out)
+ },
+ }
+ }
+}
+
+func newCounter(t *testing.T) *counter {
+ c := counter{}
+ if c.count != 0 {
+ t.Fatal("Unexpected initial count.")
+ }
+ return &c
+}
+
+func TestWrapProcessor(t *testing.T) {
+ name := "test"
+ processor := &mockWrappableProcessor{
+ ProcessorFuncs: map[string]TProcessorFunction{
+ name: WrappedTProcessorFunction{
+ Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) {
+ return true, nil
+ },
+ },
+ },
+ }
+ c := newCounter(t)
+ ctx := setMockWrappableProcessorName(context.Background(), name)
+ wrapped := WrapProcessor(processor, testMiddleware(c))
+ wrapped.Process(ctx, nil, nil)
+ if c.count != 1 {
+ t.Fatalf("Unexpected count value %v", c.count)
+ }
+}
+
+func TestWrapTMultiplexedProcessor(t *testing.T) {
+ name := "test"
+ processorName := "foo"
+ c := newCounter(t)
+ processor := &TMultiplexedProcessor{}
+ processor.RegisterDefault(&mockWrappableProcessor{
+ ProcessorFuncs: map[string]TProcessorFunction{
+ name: WrappedTProcessorFunction{
+ Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) {
+ return true, nil
+ },
+ },
+ },
+ })
+ processor.RegisterProcessor(processorName, &mockWrappableProcessor{
+ ProcessorFuncs: map[string]TProcessorFunction{
+ name: WrappedTProcessorFunction{
+ Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) {
+ return true, nil
+ },
+ },
+ },
+ })
+ wrapped := WrapProcessor(processor, testMiddleware(c))
+ ctx := setMockWrappableProcessorName(context.Background(), name)
+ in := NewStoredMessageProtocol(nil, name, 1, 1)
+ wrapped.Process(ctx, in, nil)
+ if c.count != 1 {
+ t.Fatalf("Unexpected count value %v", c.count)
+ }
+
+ in = NewStoredMessageProtocol(nil, processorName+MULTIPLEXED_SEPARATOR+name, 1, 1)
+ wrapped.Process(ctx, in, nil)
+ if c.count != 2 {
+ t.Fatalf("Unexpected count value %v", c.count)
+ }
+}
diff --git a/lib/go/thrift/multiplexed_protocol.go b/lib/go/thrift/multiplexed_protocol.go
index d028a30b3..9db59c4c9 100644
--- a/lib/go/thrift/multiplexed_protocol.go
+++ b/lib/go/thrift/multiplexed_protocol.go
@@ -117,6 +117,67 @@ func NewTMultiplexedProcessor() *TMultiplexedProcessor {
}
}
+// ProcessorMap returns a mapping of "{ProcessorName}{MULTIPLEXED_SEPARATOR}{FunctionName}"
+// to TProcessorFunction for any registered processors. If there is also a
+// DefaultProcessor, the keys for the methods on that processor will simply be
+// "{FunctionName}". If the TMultiplexedProcessor has both a DefaultProcessor and
+// other registered processors, then the keys will be a mix of both formats.
+//
+// The implementation differs with other TProcessors in that the map returned is
+// a new map, while most TProcessors just return their internal mapping directly.
+// This means that edits to the map returned by this implementation of ProcessorMap
+// will not affect the underlying mapping within the TMultiplexedProcessor.
+func (t *TMultiplexedProcessor) ProcessorMap() map[string]TProcessorFunction {
+ processorFuncMap := make(map[string]TProcessorFunction)
+ for name, processor := range t.serviceProcessorMap {
+ for method, processorFunc := range processor.ProcessorMap() {
+ processorFuncName := name + MULTIPLEXED_SEPARATOR + method
+ processorFuncMap[processorFuncName] = processorFunc
+ }
+ }
+ if t.DefaultProcessor != nil {
+ for method, processorFunc := range t.DefaultProcessor.ProcessorMap() {
+ processorFuncMap[method] = processorFunc
+ }
+ }
+ return processorFuncMap
+}
+
+// AddToProcessorMap updates the underlying TProcessor ProccessorMaps depending on
+// the format of "name".
+//
+// If "name" is in the format "{ProcessorName}{MULTIPLEXED_SEPARATOR}{FunctionName}",
+// then it sets the given TProcessorFunction on the inner TProcessor with the
+// ProcessorName component using the FunctionName component.
+//
+// If "name" is just in the format "{FunctionName}", that is to say there is no
+// MULTIPLEXED_SEPARATOR, and the TMultiplexedProcessor has a DefaultProcessor
+// configured, then it will set the given TProcessorFunction on the DefaultProcessor
+// using the given name.
+//
+// If there is not a TProcessor available for the given name, then this function
+// does nothing. This can happen when there is no TProcessor registered for
+// the given ProcessorName or if all that is given is the FunctionName and there
+// is no DefaultProcessor set.
+func (t *TMultiplexedProcessor) AddToProcessorMap(name string, processorFunc TProcessorFunction) {
+ components := strings.SplitN(name, MULTIPLEXED_SEPARATOR, 2)
+ if len(components) != 2 {
+ if t.DefaultProcessor != nil && len(components) == 1 {
+ t.DefaultProcessor.AddToProcessorMap(components[0], processorFunc)
+ }
+ return
+ }
+ processorName := components[0]
+ funcName := components[1]
+ if processor, ok := t.serviceProcessorMap[processorName]; ok {
+ processor.AddToProcessorMap(funcName, processorFunc)
+ }
+
+}
+
+// verify that TMultiplexedProcessor implements TProcessor
+var _ TProcessor = (*TMultiplexedProcessor)(nil)
+
func (t *TMultiplexedProcessor) RegisterDefault(processor TProcessor) {
t.DefaultProcessor = processor
}
diff --git a/lib/go/thrift/multiplexed_protocol_test.go b/lib/go/thrift/multiplexed_protocol_test.go
new file mode 100644
index 000000000..8e70ac597
--- /dev/null
+++ b/lib/go/thrift/multiplexed_protocol_test.go
@@ -0,0 +1,53 @@
+package thrift
+
+import (
+ "context"
+ "strings"
+ "testing"
+)
+
+func TestMultiplexedProcessorMap(t *testing.T) {
+ name := "test"
+ processorName := "foo"
+ processor := &TMultiplexedProcessor{}
+ processor.RegisterDefault(&mockWrappableProcessor{
+ ProcessorFuncs: map[string]TProcessorFunction{
+ name: WrappedTProcessorFunction{
+ Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) {
+ return true, nil
+ },
+ },
+ },
+ })
+ processor.RegisterProcessor(processorName, &mockWrappableProcessor{
+ ProcessorFuncs: map[string]TProcessorFunction{
+ name: WrappedTProcessorFunction{
+ Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) {
+ return true, nil
+ },
+ },
+ },
+ })
+
+ processorMap := processor.ProcessorMap()
+ if len(processorMap) != 2 {
+ t.Fatalf("Wrong processor map size %#v", processorMap)
+ }
+ for k := range processorMap {
+ components := strings.SplitN(k, MULTIPLEXED_SEPARATOR, 2)
+ if len(components) == 1 {
+ if components[0] != name {
+ t.Fatalf("Wrong name for default processor func, expected %q, got %q", name, components[0])
+ }
+ } else if len(components) == 2 {
+ if components[0] != processorName {
+ t.Errorf("Wrong processor name, expected %q, got %q", processorName, components[0])
+ }
+ if components[1] != name {
+ t.Errorf("Wrong name for processor func, expected %q, got %q", name, components[1])
+ }
+ } else {
+ t.Fatalf("Wrong number of components %#v", components)
+ }
+ }
+}
diff --git a/lib/go/thrift/processor_factory.go b/lib/go/thrift/processor_factory.go
index e4b132b30..245a3ccfc 100644
--- a/lib/go/thrift/processor_factory.go
+++ b/lib/go/thrift/processor_factory.go
@@ -25,6 +25,16 @@ import "context"
// writes to some output stream.
type TProcessor interface {
Process(ctx context.Context, in, out TProtocol) (bool, TException)
+
+ // ProcessorMap returns a map of thrift method names to TProcessorFunctions.
+ ProcessorMap() map[string]TProcessorFunction
+
+ // AddToProcessorMap adds the given TProcessorFunction to the internal
+ // processor map at the given key.
+ //
+ // If one is already set at the given key, it will be replaced with the new
+ // TProcessorFunction.
+ AddToProcessorMap(string, TProcessorFunction)
}
type TProcessorFunction interface {