diff options
Diffstat (limited to 'workhorse/internal/redis/keywatcher_test.go')
-rw-r--r-- | workhorse/internal/redis/keywatcher_test.go | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go new file mode 100644 index 00000000000..f1ee77e2194 --- /dev/null +++ b/workhorse/internal/redis/keywatcher_test.go @@ -0,0 +1,162 @@ +package redis + +import ( + "sync" + "testing" + "time" + + "github.com/rafaeljusto/redigomock" + "github.com/stretchr/testify/require" +) + +const ( + runnerKey = "runner:build_queue:10" +) + +func createSubscriptionMessage(key, data string) []interface{} { + return []interface{}{ + []byte("message"), + []byte(key), + []byte(data), + } +} + +func createSubscribeMessage(key string) []interface{} { + return []interface{}{ + []byte("subscribe"), + []byte(key), + []byte("1"), + } +} +func createUnsubscribeMessage(key string) []interface{} { + return []interface{}{ + []byte("unsubscribe"), + []byte(key), + []byte("1"), + } +} + +func countWatchers(key string) int { + keyWatcherMutex.Lock() + defer keyWatcherMutex.Unlock() + return len(keyWatcher[key]) +} + +func deleteWatchers(key string) { + keyWatcherMutex.Lock() + defer keyWatcherMutex.Unlock() + delete(keyWatcher, key) +} + +// Forces a run of the `Process` loop against a mock PubSubConn. +func processMessages(numWatchers int, value string) { + psc := redigomock.NewConn() + + // Setup the initial subscription message + psc.Command("SUBSCRIBE", keySubChannel).Expect(createSubscribeMessage(keySubChannel)) + psc.Command("UNSUBSCRIBE", keySubChannel).Expect(createUnsubscribeMessage(keySubChannel)) + psc.AddSubscriptionMessage(createSubscriptionMessage(keySubChannel, runnerKey+"="+value)) + + // Wait for all the `WatchKey` calls to be registered + for countWatchers(runnerKey) != numWatchers { + time.Sleep(time.Millisecond) + } + + processInner(psc) +} + +func TestWatchKeySeenChange(t *testing.T) { + conn, td := setupMockPool() + defer td() + + conn.Command("GET", runnerKey).Expect("something") + + wg := &sync.WaitGroup{} + wg.Add(1) + + go func() { + val, err := WatchKey(runnerKey, "something", time.Second) + require.NoError(t, err, "Expected no error") + require.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change") + wg.Done() + }() + + processMessages(1, "somethingelse") + wg.Wait() +} + +func TestWatchKeyNoChange(t *testing.T) { + conn, td := setupMockPool() + defer td() + + conn.Command("GET", runnerKey).Expect("something") + + wg := &sync.WaitGroup{} + wg.Add(1) + + go func() { + val, err := WatchKey(runnerKey, "something", time.Second) + require.NoError(t, err, "Expected no error") + require.Equal(t, WatchKeyStatusNoChange, val, "Expected notification without change to value") + wg.Done() + }() + + processMessages(1, "something") + wg.Wait() +} + +func TestWatchKeyTimeout(t *testing.T) { + conn, td := setupMockPool() + defer td() + + conn.Command("GET", runnerKey).Expect("something") + + val, err := WatchKey(runnerKey, "something", time.Millisecond) + require.NoError(t, err, "Expected no error") + require.Equal(t, WatchKeyStatusTimeout, val, "Expected value to not change") + + // Clean up watchers since Process isn't doing that for us (not running) + deleteWatchers(runnerKey) +} + +func TestWatchKeyAlreadyChanged(t *testing.T) { + conn, td := setupMockPool() + defer td() + + conn.Command("GET", runnerKey).Expect("somethingelse") + + val, err := WatchKey(runnerKey, "something", time.Second) + require.NoError(t, err, "Expected no error") + require.Equal(t, WatchKeyStatusAlreadyChanged, val, "Expected value to have already changed") + + // Clean up watchers since Process isn't doing that for us (not running) + deleteWatchers(runnerKey) +} + +func TestWatchKeyMassivelyParallel(t *testing.T) { + runTimes := 100 // 100 parallel watchers + + conn, td := setupMockPool() + defer td() + + wg := &sync.WaitGroup{} + wg.Add(runTimes) + + getCmd := conn.Command("GET", runnerKey) + + for i := 0; i < runTimes; i++ { + getCmd = getCmd.Expect("something") + } + + for i := 0; i < runTimes; i++ { + go func() { + val, err := WatchKey(runnerKey, "something", time.Second) + require.NoError(t, err, "Expected no error") + require.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change") + wg.Done() + }() + } + + processMessages(runTimes, "somethingelse") + wg.Wait() +} |