diff options
Diffstat (limited to 'workhorse/internal/redis/keywatcher_test.go')
-rw-r--r-- | workhorse/internal/redis/keywatcher_test.go | 227 |
1 files changed, 160 insertions, 67 deletions
diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go index 7ff5f8204c0..a2f2b73898f 100644 --- a/workhorse/internal/redis/keywatcher_test.go +++ b/workhorse/internal/redis/keywatcher_test.go @@ -1,10 +1,12 @@ package redis import ( + "errors" "sync" "testing" "time" + "github.com/gomodule/redigo/redis" "github.com/rafaeljusto/redigomock" "github.com/stretchr/testify/require" ) @@ -65,100 +67,191 @@ func processMessages(numWatchers int, value string) { processInner(psc) } -func TestWatchKeySeenChange(t *testing.T) { +type keyChangeTestCase struct { + desc string + returnValue string + isKeyMissing bool + watchValue string + processedValue string + expectedStatus WatchKeyStatus + timeout time.Duration +} + +func TestKeyChangesBubblesUpError(t *testing.T) { conn, td := setupMockPool() defer td() - conn.Command("GET", runnerKey).Expect("something") - - wg := &sync.WaitGroup{} - wg.Add(1) + conn.Command("GET", runnerKey).ExpectError(errors.New("test error")) - go func() { - val, err := WatchKey(runnerKey, "something", time.Second) - require.NoError(t, err, "Expected no error") - require.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change") - wg.Done() - }() + _, err := WatchKey(runnerKey, "something", time.Second) + require.Error(t, err, "Expected error") - processMessages(1, "somethingelse") - wg.Wait() + deleteWatchers(runnerKey) } -func TestWatchKeyNoChange(t *testing.T) { - conn, td := setupMockPool() - defer td() +func TestKeyChangesInstantReturn(t *testing.T) { + testCases := []keyChangeTestCase{ + // WatchKeyStatusAlreadyChanged + { + desc: "sees change with key existing and changed", + returnValue: "somethingelse", + watchValue: "something", + expectedStatus: WatchKeyStatusAlreadyChanged, + timeout: time.Second, + }, + { + desc: "sees change with key non-existing", + isKeyMissing: true, + watchValue: "something", + processedValue: "somethingelse", + expectedStatus: WatchKeyStatusAlreadyChanged, + timeout: time.Second, + }, + // WatchKeyStatusTimeout + { + desc: "sees timeout with key existing and unchanged", + returnValue: "something", + watchValue: "something", + expectedStatus: WatchKeyStatusTimeout, + timeout: time.Millisecond, + }, + { + desc: "sees timeout with key non-existing and unchanged", + isKeyMissing: true, + watchValue: "", + expectedStatus: WatchKeyStatusTimeout, + timeout: time.Millisecond, + }, + } - conn.Command("GET", runnerKey).Expect("something") + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + conn, td := setupMockPool() + defer td() - wg := &sync.WaitGroup{} - wg.Add(1) + if tc.isKeyMissing { + conn.Command("GET", runnerKey).ExpectError(redis.ErrNil) + } else { + conn.Command("GET", runnerKey).Expect(tc.returnValue) + } - go func() { - val, err := WatchKey(runnerKey, "something", time.Second) - require.NoError(t, err, "Expected no error") - require.Equal(t, WatchKeyStatusNoChange, val, "Expected notification without change to value") - wg.Done() - }() + val, err := WatchKey(runnerKey, tc.watchValue, tc.timeout) - processMessages(1, "something") - wg.Wait() -} + require.NoError(t, err, "Expected no error") + require.Equal(t, tc.expectedStatus, val, "Expected value") -func TestWatchKeyTimeout(t *testing.T) { - conn, td := setupMockPool() - defer td() + deleteWatchers(runnerKey) + }) + } +} - conn.Command("GET", runnerKey).Expect("something") +func TestKeyChangesWhenWatching(t *testing.T) { + testCases := []keyChangeTestCase{ + // WatchKeyStatusSeenChange + { + desc: "sees change with key existing", + returnValue: "something", + watchValue: "something", + processedValue: "somethingelse", + expectedStatus: WatchKeyStatusSeenChange, + }, + { + desc: "sees change with key non-existing, when watching empty value", + isKeyMissing: true, + watchValue: "", + processedValue: "something", + expectedStatus: WatchKeyStatusSeenChange, + }, + // WatchKeyStatusNoChange + { + desc: "sees no change with key existing", + returnValue: "something", + watchValue: "something", + processedValue: "something", + expectedStatus: WatchKeyStatusNoChange, + }, + } - val, err := WatchKey(runnerKey, "something", time.Millisecond) - require.NoError(t, err, "Expected no error") - require.Equal(t, WatchKeyStatusTimeout, val, "Expected value to not change") + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + conn, td := setupMockPool() + defer td() - // Clean up watchers since Process isn't doing that for us (not running) - deleteWatchers(runnerKey) -} + if tc.isKeyMissing { + conn.Command("GET", runnerKey).ExpectError(redis.ErrNil) + } else { + conn.Command("GET", runnerKey).Expect(tc.returnValue) + } -func TestWatchKeyAlreadyChanged(t *testing.T) { - conn, td := setupMockPool() - defer td() + wg := &sync.WaitGroup{} + wg.Add(1) - conn.Command("GET", runnerKey).Expect("somethingelse") + go func() { + defer wg.Done() + val, err := WatchKey(runnerKey, tc.watchValue, time.Second) - val, err := WatchKey(runnerKey, "something", time.Second) - require.NoError(t, err, "Expected no error") - require.Equal(t, WatchKeyStatusAlreadyChanged, val, "Expected value to have already changed") + require.NoError(t, err, "Expected no error") + require.Equal(t, tc.expectedStatus, val, "Expected value") + }() - // Clean up watchers since Process isn't doing that for us (not running) - deleteWatchers(runnerKey) + processMessages(1, tc.processedValue) + wg.Wait() + }) + } } -func TestWatchKeyMassivelyParallel(t *testing.T) { - runTimes := 100 // 100 parallel watchers +func TestKeyChangesParallel(t *testing.T) { + testCases := []keyChangeTestCase{ + { + desc: "massively parallel, sees change with key existing", + returnValue: "something", + watchValue: "something", + processedValue: "somethingelse", + expectedStatus: WatchKeyStatusSeenChange, + }, + { + desc: "massively parallel, sees change with key existing, watching missing keys", + isKeyMissing: true, + watchValue: "", + processedValue: "somethingelse", + expectedStatus: WatchKeyStatusSeenChange, + }, + } - conn, td := setupMockPool() - defer td() + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + runTimes := 100 - wg := &sync.WaitGroup{} - wg.Add(runTimes) + conn, td := setupMockPool() + defer td() - getCmd := conn.Command("GET", runnerKey) + getCmd := conn.Command("GET", runnerKey) - for i := 0; i < runTimes; i++ { - getCmd = getCmd.Expect("something") - } + for i := 0; i < runTimes; i++ { + if tc.isKeyMissing { + getCmd = getCmd.ExpectError(redis.ErrNil) + } else { + getCmd = getCmd.Expect(tc.returnValue) + } + } - for i := 0; i < runTimes; i++ { - go func() { - val, err := WatchKey(runnerKey, "something", time.Second) - require.NoError(t, err, "Expected no error") - require.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change") - wg.Done() - }() - } + wg := &sync.WaitGroup{} + wg.Add(runTimes) - processMessages(runTimes, "somethingelse") - wg.Wait() + for i := 0; i < runTimes; i++ { + go func() { + defer wg.Done() + val, err := WatchKey(runnerKey, tc.watchValue, time.Second) + + require.NoError(t, err, "Expected no error") + require.Equal(t, tc.expectedStatus, val, "Expected value") + }() + } + + processMessages(runTimes, tc.processedValue) + wg.Wait() + }) + } } func TestShutdown(t *testing.T) { |