diff options
-rw-r--r-- | CHANGES.md | 2 | ||||
-rw-r--r-- | lib/go/thrift/common_test.go | 78 | ||||
-rw-r--r-- | lib/go/thrift/example_middleware_test.go | 52 | ||||
-rw-r--r-- | lib/go/thrift/middleware.go | 70 | ||||
-rw-r--r-- | lib/go/thrift/middleware_test.go | 110 | ||||
-rw-r--r-- | lib/go/thrift/multiplexed_protocol.go | 61 | ||||
-rw-r--r-- | lib/go/thrift/multiplexed_protocol_test.go | 53 | ||||
-rw-r--r-- | lib/go/thrift/processor_factory.go | 10 |
8 files changed, 435 insertions, 1 deletions
diff --git a/CHANGES.md b/CHANGES.md index ca84cb7d2..b8bef21e2 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,6 +10,7 @@ - [THRIFT-5006](https://issues.apache.org/jira/browse/THRIFT-5006) - Implement DEFAULT_MAX_LENGTH at TFramedTransport - [THRIFT-5069](https://issues.apache.org/jira/browse/THRIFT-5069) - In Go library TDeserializer.Transport is now typed \*TMemoryBuffer instead of TTransport - [THRIFT-5072](https://issues.apache.org/jira/browse/THRIFT-5072) - Haskell generator fails to distinguish between multiple enum types with conflicting enum identifiers +- [THRIFT-5164](https://issues.apache.org/jira/browse/THRIFT-5164) - In Go library TProcessor interface now includes ProcessorMap and AddToProcessorMap functions. ### Java @@ -18,6 +19,7 @@ ### Go - [THRIFT-5069](https://issues.apache.org/jira/browse/THRIFT-5069) - Add TSerializerPool and TDeserializerPool, which are thread-safe versions of TSerializer and TDeserializer. +- [THRIFT-5164](https://issues.apache.org/jira/browse/THRIFT-5164) - Add ProcessorMiddleware function type and WrapProcessor function to support wrapping a TProcessor with middleware functions. ## 0.13.0 diff --git a/lib/go/thrift/common_test.go b/lib/go/thrift/common_test.go index 93597ff8a..95d4e2130 100644 --- a/lib/go/thrift/common_test.go +++ b/lib/go/thrift/common_test.go @@ -19,7 +19,10 @@ package thrift -import "context" +import ( + "context" + "fmt" +) type mockProcessor struct { ProcessFunc func(in, out TProtocol) (bool, TException) @@ -28,3 +31,76 @@ type mockProcessor struct { func (m *mockProcessor) Process(ctx context.Context, in, out TProtocol) (bool, TException) { return m.ProcessFunc(in, out) } + +func (m *mockProcessor) ProcessorMap() map[string]TProcessorFunction { + return map[string]TProcessorFunction{ + "mock": WrappedTProcessorFunction{ + Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) { + return m.ProcessFunc(in, out) + }, + }, + } +} + +func (m *mockProcessor) AddToProcessorMap(name string, processorFunc TProcessorFunction) {} + +type mockWrappedProcessorContextKey int + +const ( + processorName mockWrappedProcessorContextKey = iota +) + +// setMockWrappableProcessorName sets the "name" of the TProcessorFunction to +// call on a mockWrappableProcessor when calling Process. +// +// In a normal TProcessor, the request name is read from the request itself +// which happens in TProcessor.Process, so it is not passed into the call to +// Process itself, to get around this in testing, mockWrappableProcessor calls +// getMockWrappableProcessorName to get the name to use from the context +// object. +func setMockWrappableProcessorName(ctx context.Context, name string) context.Context { + return context.WithValue(ctx, processorName, name) +} + +// getMockWrappableProcessorName gets the "name" of the TProcessorFunction to +// call on a mockWrappableProcessor when calling Process. +func getMockWrappableProcessorName(ctx context.Context) (string, bool) { + val, ok := ctx.Value(processorName).(string) + return val, ok +} + +// mockWrappableProcessor can be used to create a mock object that fufills the +// TProcessor interface in testing. +type mockWrappableProcessor struct { + ProcessorFuncs map[string]TProcessorFunction +} + +// Process calls the TProcessorFunction assigned to the "name" set on the +// context object by setMockWrappableProcessorName. +// +// If no name is set on the context or there is no TProcessorFunction mapped to +// that name, the call will panic. +func (p *mockWrappableProcessor) Process(ctx context.Context, in, out TProtocol) (bool, TException) { + name, ok := getMockWrappableProcessorName(ctx) + if !ok { + panic("MockWrappableProcessorName not set on context") + } + processor, ok := p.ProcessorMap()[name] + if !ok { + panic(fmt.Sprintf("No processor set for name %q", name)) + } + return processor.Process(ctx, 0, in, out) +} + +func (p *mockWrappableProcessor) ProcessorMap() map[string]TProcessorFunction { + return p.ProcessorFuncs +} + +func (p *mockWrappableProcessor) AddToProcessorMap(name string, processorFunc TProcessorFunction) { + p.ProcessorFuncs[name] = processorFunc +} + +var ( + _ TProcessor = (*mockProcessor)(nil) + _ TProcessor = (*mockWrappableProcessor)(nil) +) diff --git a/lib/go/thrift/example_middleware_test.go b/lib/go/thrift/example_middleware_test.go new file mode 100644 index 000000000..47061103d --- /dev/null +++ b/lib/go/thrift/example_middleware_test.go @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package thrift + +import ( + "context" + "log" +) + +func simpleLoggingMiddleware(name string, next TProcessorFunction) TProcessorFunction { + return WrappedTProcessorFunction{ + Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) { + log.Printf("Before: %q", name) + success, err := next.Process(ctx, seqId, in, out) + log.Printf("After: %q", name) + log.Printf("Success: %v", success) + if err != nil { + log.Printf("Error: %v", err) + } + return success, err + }, + } +} + +func ExampleProcessorMiddleware() { + var ( + processor TProcessor + trans TServerTransport + transFactory TTransportFactory + protoFactory TProtocolFactory + ) + processor = WrapProcessor(processor, simpleLoggingMiddleware) + server := NewTSimpleServer4(processor, trans, transFactory, protoFactory) + log.Fatal(server.Serve()) +} diff --git a/lib/go/thrift/middleware.go b/lib/go/thrift/middleware.go new file mode 100644 index 000000000..18f2b99c1 --- /dev/null +++ b/lib/go/thrift/middleware.go @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package thrift + +import "context" + +// ProcessorMiddleware is a function that can be passed to WrapProcessor to wrap the +// TProcessorFunctions for that TProcessor. +// +// Middlewares are passed in the name of the function as set in the processor +// map of the TProcessor. +type ProcessorMiddleware func(name string, next TProcessorFunction) TProcessorFunction + +// WrapProcessor takes an existing TProcessor and wraps each of its inner +// TProcessorFunctions with the middlewares passed in and returns it. +// +// Middlewares will be called in the order that they are defined: +// +// 1. Middlewares[0] +// 2. Middlewares[1] +// ... +// N. Middlewares[n] +func WrapProcessor(processor TProcessor, middlewares ...ProcessorMiddleware) TProcessor { + for name, processorFunc := range processor.ProcessorMap() { + wrapped := processorFunc + // Add middlewares in reverse so the first in the list is the outermost. + for i := len(middlewares) - 1; i >= 0; i-- { + wrapped = middlewares[i](name, wrapped) + } + processor.AddToProcessorMap(name, wrapped) + } + return processor +} + +// WrappedTProcessorFunction is a convenience struct that implements the +// TProcessorFunction interface that can be used when implementing custom +// Middleware. +type WrappedTProcessorFunction struct { + // Wrapped is called by WrappedTProcessorFunction.Process and should be a + // "wrapped" call to a base TProcessorFunc.Process call. + Wrapped func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) +} + +// Process implements the TProcessorFunction interface using p.Wrapped. +func (p WrappedTProcessorFunction) Process(ctx context.Context, seqID int32, in, out TProtocol) (bool, TException) { + return p.Wrapped(ctx, seqID, in, out) +} + +// verify that WrappedTProcessorFunction implements TProcessorFunction +var ( + _ TProcessorFunction = WrappedTProcessorFunction{} + _ TProcessorFunction = (*WrappedTProcessorFunction)(nil) +) diff --git a/lib/go/thrift/middleware_test.go b/lib/go/thrift/middleware_test.go new file mode 100644 index 000000000..81cbc7bd1 --- /dev/null +++ b/lib/go/thrift/middleware_test.go @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package thrift + +import ( + "context" + "testing" +) + +type counter struct { + count int +} + +func (c *counter) incr() { + c.count++ +} + +func testMiddleware(c *counter) ProcessorMiddleware { + return func(name string, next TProcessorFunction) TProcessorFunction { + return WrappedTProcessorFunction{ + Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) { + c.incr() + return next.Process(ctx, seqId, in, out) + }, + } + } +} + +func newCounter(t *testing.T) *counter { + c := counter{} + if c.count != 0 { + t.Fatal("Unexpected initial count.") + } + return &c +} + +func TestWrapProcessor(t *testing.T) { + name := "test" + processor := &mockWrappableProcessor{ + ProcessorFuncs: map[string]TProcessorFunction{ + name: WrappedTProcessorFunction{ + Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) { + return true, nil + }, + }, + }, + } + c := newCounter(t) + ctx := setMockWrappableProcessorName(context.Background(), name) + wrapped := WrapProcessor(processor, testMiddleware(c)) + wrapped.Process(ctx, nil, nil) + if c.count != 1 { + t.Fatalf("Unexpected count value %v", c.count) + } +} + +func TestWrapTMultiplexedProcessor(t *testing.T) { + name := "test" + processorName := "foo" + c := newCounter(t) + processor := &TMultiplexedProcessor{} + processor.RegisterDefault(&mockWrappableProcessor{ + ProcessorFuncs: map[string]TProcessorFunction{ + name: WrappedTProcessorFunction{ + Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) { + return true, nil + }, + }, + }, + }) + processor.RegisterProcessor(processorName, &mockWrappableProcessor{ + ProcessorFuncs: map[string]TProcessorFunction{ + name: WrappedTProcessorFunction{ + Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) { + return true, nil + }, + }, + }, + }) + wrapped := WrapProcessor(processor, testMiddleware(c)) + ctx := setMockWrappableProcessorName(context.Background(), name) + in := NewStoredMessageProtocol(nil, name, 1, 1) + wrapped.Process(ctx, in, nil) + if c.count != 1 { + t.Fatalf("Unexpected count value %v", c.count) + } + + in = NewStoredMessageProtocol(nil, processorName+MULTIPLEXED_SEPARATOR+name, 1, 1) + wrapped.Process(ctx, in, nil) + if c.count != 2 { + t.Fatalf("Unexpected count value %v", c.count) + } +} diff --git a/lib/go/thrift/multiplexed_protocol.go b/lib/go/thrift/multiplexed_protocol.go index d028a30b3..9db59c4c9 100644 --- a/lib/go/thrift/multiplexed_protocol.go +++ b/lib/go/thrift/multiplexed_protocol.go @@ -117,6 +117,67 @@ func NewTMultiplexedProcessor() *TMultiplexedProcessor { } } +// ProcessorMap returns a mapping of "{ProcessorName}{MULTIPLEXED_SEPARATOR}{FunctionName}" +// to TProcessorFunction for any registered processors. If there is also a +// DefaultProcessor, the keys for the methods on that processor will simply be +// "{FunctionName}". If the TMultiplexedProcessor has both a DefaultProcessor and +// other registered processors, then the keys will be a mix of both formats. +// +// The implementation differs with other TProcessors in that the map returned is +// a new map, while most TProcessors just return their internal mapping directly. +// This means that edits to the map returned by this implementation of ProcessorMap +// will not affect the underlying mapping within the TMultiplexedProcessor. +func (t *TMultiplexedProcessor) ProcessorMap() map[string]TProcessorFunction { + processorFuncMap := make(map[string]TProcessorFunction) + for name, processor := range t.serviceProcessorMap { + for method, processorFunc := range processor.ProcessorMap() { + processorFuncName := name + MULTIPLEXED_SEPARATOR + method + processorFuncMap[processorFuncName] = processorFunc + } + } + if t.DefaultProcessor != nil { + for method, processorFunc := range t.DefaultProcessor.ProcessorMap() { + processorFuncMap[method] = processorFunc + } + } + return processorFuncMap +} + +// AddToProcessorMap updates the underlying TProcessor ProccessorMaps depending on +// the format of "name". +// +// If "name" is in the format "{ProcessorName}{MULTIPLEXED_SEPARATOR}{FunctionName}", +// then it sets the given TProcessorFunction on the inner TProcessor with the +// ProcessorName component using the FunctionName component. +// +// If "name" is just in the format "{FunctionName}", that is to say there is no +// MULTIPLEXED_SEPARATOR, and the TMultiplexedProcessor has a DefaultProcessor +// configured, then it will set the given TProcessorFunction on the DefaultProcessor +// using the given name. +// +// If there is not a TProcessor available for the given name, then this function +// does nothing. This can happen when there is no TProcessor registered for +// the given ProcessorName or if all that is given is the FunctionName and there +// is no DefaultProcessor set. +func (t *TMultiplexedProcessor) AddToProcessorMap(name string, processorFunc TProcessorFunction) { + components := strings.SplitN(name, MULTIPLEXED_SEPARATOR, 2) + if len(components) != 2 { + if t.DefaultProcessor != nil && len(components) == 1 { + t.DefaultProcessor.AddToProcessorMap(components[0], processorFunc) + } + return + } + processorName := components[0] + funcName := components[1] + if processor, ok := t.serviceProcessorMap[processorName]; ok { + processor.AddToProcessorMap(funcName, processorFunc) + } + +} + +// verify that TMultiplexedProcessor implements TProcessor +var _ TProcessor = (*TMultiplexedProcessor)(nil) + func (t *TMultiplexedProcessor) RegisterDefault(processor TProcessor) { t.DefaultProcessor = processor } diff --git a/lib/go/thrift/multiplexed_protocol_test.go b/lib/go/thrift/multiplexed_protocol_test.go new file mode 100644 index 000000000..8e70ac597 --- /dev/null +++ b/lib/go/thrift/multiplexed_protocol_test.go @@ -0,0 +1,53 @@ +package thrift + +import ( + "context" + "strings" + "testing" +) + +func TestMultiplexedProcessorMap(t *testing.T) { + name := "test" + processorName := "foo" + processor := &TMultiplexedProcessor{} + processor.RegisterDefault(&mockWrappableProcessor{ + ProcessorFuncs: map[string]TProcessorFunction{ + name: WrappedTProcessorFunction{ + Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) { + return true, nil + }, + }, + }, + }) + processor.RegisterProcessor(processorName, &mockWrappableProcessor{ + ProcessorFuncs: map[string]TProcessorFunction{ + name: WrappedTProcessorFunction{ + Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) { + return true, nil + }, + }, + }, + }) + + processorMap := processor.ProcessorMap() + if len(processorMap) != 2 { + t.Fatalf("Wrong processor map size %#v", processorMap) + } + for k := range processorMap { + components := strings.SplitN(k, MULTIPLEXED_SEPARATOR, 2) + if len(components) == 1 { + if components[0] != name { + t.Fatalf("Wrong name for default processor func, expected %q, got %q", name, components[0]) + } + } else if len(components) == 2 { + if components[0] != processorName { + t.Errorf("Wrong processor name, expected %q, got %q", processorName, components[0]) + } + if components[1] != name { + t.Errorf("Wrong name for processor func, expected %q, got %q", name, components[1]) + } + } else { + t.Fatalf("Wrong number of components %#v", components) + } + } +} diff --git a/lib/go/thrift/processor_factory.go b/lib/go/thrift/processor_factory.go index e4b132b30..245a3ccfc 100644 --- a/lib/go/thrift/processor_factory.go +++ b/lib/go/thrift/processor_factory.go @@ -25,6 +25,16 @@ import "context" // writes to some output stream. type TProcessor interface { Process(ctx context.Context, in, out TProtocol) (bool, TException) + + // ProcessorMap returns a map of thrift method names to TProcessorFunctions. + ProcessorMap() map[string]TProcessorFunction + + // AddToProcessorMap adds the given TProcessorFunction to the internal + // processor map at the given key. + // + // If one is already set at the given key, it will be replaced with the new + // TProcessorFunction. + AddToProcessorMap(string, TProcessorFunction) } type TProcessorFunction interface { |