From 2ad6e9724e0e4e100425f068f6162ca7edfcd205 Mon Sep 17 00:00:00 2001 From: Sylvester Chin Date: Tue, 1 Aug 2023 16:31:30 +0800 Subject: [PATCH] Use context in keywatcher to prepare for go-redis upgrade --- workhorse/internal/builds/register.go | 9 +++++---- workhorse/internal/builds/register_test.go | 3 ++- workhorse/internal/redis/keywatcher.go | 3 ++- workhorse/internal/redis/keywatcher_test.go | 13 ++++++++----- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/workhorse/internal/builds/register.go b/workhorse/internal/builds/register.go index 0a2fe47ed7e348..f45d03ab8edb32 100644 --- a/workhorse/internal/builds/register.go +++ b/workhorse/internal/builds/register.go @@ -2,6 +2,7 @@ package builds import ( "bytes" + "context" "encoding/json" "errors" "io" @@ -55,7 +56,7 @@ var ( type largeBodyError struct{ error } -type WatchKeyHandler func(key, value string, timeout time.Duration) (redis.WatchKeyStatus, error) +type WatchKeyHandler func(ctx context.Context, key, value string, timeout time.Duration) (redis.WatchKeyStatus, error) type runnerRequest struct { Token string `json:"token,omitempty"` @@ -102,11 +103,11 @@ func proxyRegisterRequest(h http.Handler, w http.ResponseWriter, r *http.Request h.ServeHTTP(w, r) } -func watchForRunnerChange(watchHandler WatchKeyHandler, token, lastUpdate string, duration time.Duration) (redis.WatchKeyStatus, error) { +func watchForRunnerChange(ctx context.Context, watchHandler WatchKeyHandler, token, lastUpdate string, duration time.Duration) (redis.WatchKeyStatus, error) { registerHandlerOpenAtWatching.Inc() defer registerHandlerOpenAtWatching.Dec() - return watchHandler(runnerBuildQueue+token, lastUpdate, duration) + return watchHandler(ctx, runnerBuildQueue+token, lastUpdate, duration) } func RegisterHandler(h http.Handler, watchHandler WatchKeyHandler, pollingDuration time.Duration) http.Handler { @@ -140,7 +141,7 @@ func RegisterHandler(h http.Handler, watchHandler WatchKeyHandler, pollingDurati return } - result, err := watchForRunnerChange(watchHandler, runnerRequest.Token, + result, err := watchForRunnerChange(r.Context(), watchHandler, runnerRequest.Token, runnerRequest.LastUpdate, pollingDuration) if err != nil { registerHandlerWatchErrors.Inc() diff --git a/workhorse/internal/builds/register_test.go b/workhorse/internal/builds/register_test.go index d5cbebd500bf6b..97d66517ac95be 100644 --- a/workhorse/internal/builds/register_test.go +++ b/workhorse/internal/builds/register_test.go @@ -2,6 +2,7 @@ package builds import ( "bytes" + "context" "errors" "io" "net/http" @@ -71,7 +72,7 @@ func TestRegisterHandlerMissingData(t *testing.T) { func expectWatcherToBeExecuted(t *testing.T, watchKeyStatus redis.WatchKeyStatus, watchKeyError error, httpStatus int, msgAndArgs ...interface{}) { executed := false - watchKeyHandler := func(key, value string, timeout time.Duration) (redis.WatchKeyStatus, error) { + watchKeyHandler := func(ctx context.Context, key, value string, timeout time.Duration) (redis.WatchKeyStatus, error) { executed = true return watchKeyStatus, watchKeyError } diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go index cdf6ccd7e83d17..2fd0753c3c9521 100644 --- a/workhorse/internal/redis/keywatcher.go +++ b/workhorse/internal/redis/keywatcher.go @@ -1,6 +1,7 @@ package redis import ( + "context" "errors" "fmt" "strings" @@ -251,7 +252,7 @@ const ( WatchKeyStatusNoChange ) -func (kw *KeyWatcher) WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error) { +func (kw *KeyWatcher) WatchKey(_ context.Context, key, value string, timeout time.Duration) (WatchKeyStatus, error) { notify := make(chan string, 1) if err := kw.addSubscription(key, notify); err != nil { return WatchKeyStatusNoChange, err diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go index bae49d81bb1893..3abc1bf1107ddd 100644 --- a/workhorse/internal/redis/keywatcher_test.go +++ b/workhorse/internal/redis/keywatcher_test.go @@ -1,6 +1,7 @@ package redis import ( + "context" "sync" "testing" "time" @@ -10,6 +11,8 @@ import ( "github.com/stretchr/testify/require" ) +var ctx = context.Background() + const ( runnerKey = "runner:build_queue:10" ) @@ -131,7 +134,7 @@ func TestKeyChangesInstantReturn(t *testing.T) { defer kw.Shutdown() kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()} - val, err := kw.WatchKey(runnerKey, tc.watchValue, tc.timeout) + val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, tc.timeout) require.NoError(t, err, "Expected no error") require.Equal(t, tc.expectedStatus, val, "Expected value") @@ -187,7 +190,7 @@ func TestKeyChangesWhenWatching(t *testing.T) { go func() { defer wg.Done() <-ready - val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second) + val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, tc.expectedStatus, val, "Expected value") @@ -245,7 +248,7 @@ func TestKeyChangesParallel(t *testing.T) { go func() { defer wg.Done() <-ready - val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second) + val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, tc.expectedStatus, val, "Expected value") @@ -273,7 +276,7 @@ func TestShutdown(t *testing.T) { go func() { defer wg.Done() - val, err := kw.WatchKey(runnerKey, "something", 10*time.Second) + val, err := kw.WatchKey(ctx, runnerKey, "something", 10*time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, WatchKeyStatusNoChange, val, "Expected value not to change") @@ -295,7 +298,7 @@ func TestShutdown(t *testing.T) { var err error done := make(chan struct{}) go func() { - val, err = kw.WatchKey(runnerKey, "something", 10*time.Second) + val, err = kw.WatchKey(ctx, runnerKey, "something", 10*time.Second) close(done) }() -- GitLab