summaryrefslogtreecommitdiff
path: root/workhorse/internal/redis/keywatcher_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/redis/keywatcher_test.go')
-rw-r--r--workhorse/internal/redis/keywatcher_test.go227
1 files changed, 160 insertions, 67 deletions
diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go
index 7ff5f8204c0..a2f2b73898f 100644
--- a/workhorse/internal/redis/keywatcher_test.go
+++ b/workhorse/internal/redis/keywatcher_test.go
@@ -1,10 +1,12 @@
package redis
import (
+ "errors"
"sync"
"testing"
"time"
+ "github.com/gomodule/redigo/redis"
"github.com/rafaeljusto/redigomock"
"github.com/stretchr/testify/require"
)
@@ -65,100 +67,191 @@ func processMessages(numWatchers int, value string) {
processInner(psc)
}
-func TestWatchKeySeenChange(t *testing.T) {
+type keyChangeTestCase struct {
+ desc string
+ returnValue string
+ isKeyMissing bool
+ watchValue string
+ processedValue string
+ expectedStatus WatchKeyStatus
+ timeout time.Duration
+}
+
+func TestKeyChangesBubblesUpError(t *testing.T) {
conn, td := setupMockPool()
defer td()
- conn.Command("GET", runnerKey).Expect("something")
-
- wg := &sync.WaitGroup{}
- wg.Add(1)
+ conn.Command("GET", runnerKey).ExpectError(errors.New("test error"))
- go func() {
- val, err := WatchKey(runnerKey, "something", time.Second)
- require.NoError(t, err, "Expected no error")
- require.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change")
- wg.Done()
- }()
+ _, err := WatchKey(runnerKey, "something", time.Second)
+ require.Error(t, err, "Expected error")
- processMessages(1, "somethingelse")
- wg.Wait()
+ deleteWatchers(runnerKey)
}
-func TestWatchKeyNoChange(t *testing.T) {
- conn, td := setupMockPool()
- defer td()
+func TestKeyChangesInstantReturn(t *testing.T) {
+ testCases := []keyChangeTestCase{
+ // WatchKeyStatusAlreadyChanged
+ {
+ desc: "sees change with key existing and changed",
+ returnValue: "somethingelse",
+ watchValue: "something",
+ expectedStatus: WatchKeyStatusAlreadyChanged,
+ timeout: time.Second,
+ },
+ {
+ desc: "sees change with key non-existing",
+ isKeyMissing: true,
+ watchValue: "something",
+ processedValue: "somethingelse",
+ expectedStatus: WatchKeyStatusAlreadyChanged,
+ timeout: time.Second,
+ },
+ // WatchKeyStatusTimeout
+ {
+ desc: "sees timeout with key existing and unchanged",
+ returnValue: "something",
+ watchValue: "something",
+ expectedStatus: WatchKeyStatusTimeout,
+ timeout: time.Millisecond,
+ },
+ {
+ desc: "sees timeout with key non-existing and unchanged",
+ isKeyMissing: true,
+ watchValue: "",
+ expectedStatus: WatchKeyStatusTimeout,
+ timeout: time.Millisecond,
+ },
+ }
- conn.Command("GET", runnerKey).Expect("something")
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ conn, td := setupMockPool()
+ defer td()
- wg := &sync.WaitGroup{}
- wg.Add(1)
+ if tc.isKeyMissing {
+ conn.Command("GET", runnerKey).ExpectError(redis.ErrNil)
+ } else {
+ conn.Command("GET", runnerKey).Expect(tc.returnValue)
+ }
- go func() {
- val, err := WatchKey(runnerKey, "something", time.Second)
- require.NoError(t, err, "Expected no error")
- require.Equal(t, WatchKeyStatusNoChange, val, "Expected notification without change to value")
- wg.Done()
- }()
+ val, err := WatchKey(runnerKey, tc.watchValue, tc.timeout)
- processMessages(1, "something")
- wg.Wait()
-}
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, tc.expectedStatus, val, "Expected value")
-func TestWatchKeyTimeout(t *testing.T) {
- conn, td := setupMockPool()
- defer td()
+ deleteWatchers(runnerKey)
+ })
+ }
+}
- conn.Command("GET", runnerKey).Expect("something")
+func TestKeyChangesWhenWatching(t *testing.T) {
+ testCases := []keyChangeTestCase{
+ // WatchKeyStatusSeenChange
+ {
+ desc: "sees change with key existing",
+ returnValue: "something",
+ watchValue: "something",
+ processedValue: "somethingelse",
+ expectedStatus: WatchKeyStatusSeenChange,
+ },
+ {
+ desc: "sees change with key non-existing, when watching empty value",
+ isKeyMissing: true,
+ watchValue: "",
+ processedValue: "something",
+ expectedStatus: WatchKeyStatusSeenChange,
+ },
+ // WatchKeyStatusNoChange
+ {
+ desc: "sees no change with key existing",
+ returnValue: "something",
+ watchValue: "something",
+ processedValue: "something",
+ expectedStatus: WatchKeyStatusNoChange,
+ },
+ }
- val, err := WatchKey(runnerKey, "something", time.Millisecond)
- require.NoError(t, err, "Expected no error")
- require.Equal(t, WatchKeyStatusTimeout, val, "Expected value to not change")
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ conn, td := setupMockPool()
+ defer td()
- // Clean up watchers since Process isn't doing that for us (not running)
- deleteWatchers(runnerKey)
-}
+ if tc.isKeyMissing {
+ conn.Command("GET", runnerKey).ExpectError(redis.ErrNil)
+ } else {
+ conn.Command("GET", runnerKey).Expect(tc.returnValue)
+ }
-func TestWatchKeyAlreadyChanged(t *testing.T) {
- conn, td := setupMockPool()
- defer td()
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
- conn.Command("GET", runnerKey).Expect("somethingelse")
+ go func() {
+ defer wg.Done()
+ val, err := WatchKey(runnerKey, tc.watchValue, time.Second)
- val, err := WatchKey(runnerKey, "something", time.Second)
- require.NoError(t, err, "Expected no error")
- require.Equal(t, WatchKeyStatusAlreadyChanged, val, "Expected value to have already changed")
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, tc.expectedStatus, val, "Expected value")
+ }()
- // Clean up watchers since Process isn't doing that for us (not running)
- deleteWatchers(runnerKey)
+ processMessages(1, tc.processedValue)
+ wg.Wait()
+ })
+ }
}
-func TestWatchKeyMassivelyParallel(t *testing.T) {
- runTimes := 100 // 100 parallel watchers
+func TestKeyChangesParallel(t *testing.T) {
+ testCases := []keyChangeTestCase{
+ {
+ desc: "massively parallel, sees change with key existing",
+ returnValue: "something",
+ watchValue: "something",
+ processedValue: "somethingelse",
+ expectedStatus: WatchKeyStatusSeenChange,
+ },
+ {
+ desc: "massively parallel, sees change with key existing, watching missing keys",
+ isKeyMissing: true,
+ watchValue: "",
+ processedValue: "somethingelse",
+ expectedStatus: WatchKeyStatusSeenChange,
+ },
+ }
- conn, td := setupMockPool()
- defer td()
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ runTimes := 100
- wg := &sync.WaitGroup{}
- wg.Add(runTimes)
+ conn, td := setupMockPool()
+ defer td()
- getCmd := conn.Command("GET", runnerKey)
+ getCmd := conn.Command("GET", runnerKey)
- for i := 0; i < runTimes; i++ {
- getCmd = getCmd.Expect("something")
- }
+ for i := 0; i < runTimes; i++ {
+ if tc.isKeyMissing {
+ getCmd = getCmd.ExpectError(redis.ErrNil)
+ } else {
+ getCmd = getCmd.Expect(tc.returnValue)
+ }
+ }
- for i := 0; i < runTimes; i++ {
- go func() {
- val, err := WatchKey(runnerKey, "something", time.Second)
- require.NoError(t, err, "Expected no error")
- require.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change")
- wg.Done()
- }()
- }
+ wg := &sync.WaitGroup{}
+ wg.Add(runTimes)
- processMessages(runTimes, "somethingelse")
- wg.Wait()
+ for i := 0; i < runTimes; i++ {
+ go func() {
+ defer wg.Done()
+ val, err := WatchKey(runnerKey, tc.watchValue, time.Second)
+
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, tc.expectedStatus, val, "Expected value")
+ }()
+ }
+
+ processMessages(runTimes, tc.processedValue)
+ wg.Wait()
+ })
+ }
}
func TestShutdown(t *testing.T) {