diff --git a/workhorse/internal/builds/register.go b/workhorse/internal/builds/register.go index 0a2fe47ed7e3489cc7a069a3af3958abf5615446..f45d03ab8edb3282ba35d58269fe402e0247089b 100644 --- a/workhorse/internal/builds/register.go +++ b/workhorse/internal/builds/register.go @@ -2,6 +2,7 @@ package builds import ( "bytes" + "context" "encoding/json" "errors" "io" @@ -55,7 +56,7 @@ var ( type largeBodyError struct{ error } -type WatchKeyHandler func(key, value string, timeout time.Duration) (redis.WatchKeyStatus, error) +type WatchKeyHandler func(ctx context.Context, key, value string, timeout time.Duration) (redis.WatchKeyStatus, error) type runnerRequest struct { Token string `json:"token,omitempty"` @@ -102,11 +103,11 @@ func proxyRegisterRequest(h http.Handler, w http.ResponseWriter, r *http.Request h.ServeHTTP(w, r) } -func watchForRunnerChange(watchHandler WatchKeyHandler, token, lastUpdate string, duration time.Duration) (redis.WatchKeyStatus, error) { +func watchForRunnerChange(ctx context.Context, watchHandler WatchKeyHandler, token, lastUpdate string, duration time.Duration) (redis.WatchKeyStatus, error) { registerHandlerOpenAtWatching.Inc() defer registerHandlerOpenAtWatching.Dec() - return watchHandler(runnerBuildQueue+token, lastUpdate, duration) + return watchHandler(ctx, runnerBuildQueue+token, lastUpdate, duration) } func RegisterHandler(h http.Handler, watchHandler WatchKeyHandler, pollingDuration time.Duration) http.Handler { @@ -140,7 +141,7 @@ func RegisterHandler(h http.Handler, watchHandler WatchKeyHandler, pollingDurati return } - result, err := watchForRunnerChange(watchHandler, runnerRequest.Token, + result, err := watchForRunnerChange(r.Context(), watchHandler, runnerRequest.Token, runnerRequest.LastUpdate, pollingDuration) if err != nil { registerHandlerWatchErrors.Inc() diff --git a/workhorse/internal/builds/register_test.go b/workhorse/internal/builds/register_test.go index d5cbebd500bf6bcb8d8a3ab2c0c50fd537028e96..97d66517ac95bec39aecf7de20b5c19b098cbddb 100644 --- a/workhorse/internal/builds/register_test.go +++ b/workhorse/internal/builds/register_test.go @@ -2,6 +2,7 @@ package builds import ( "bytes" + "context" "errors" "io" "net/http" @@ -71,7 +72,7 @@ func TestRegisterHandlerMissingData(t *testing.T) { func expectWatcherToBeExecuted(t *testing.T, watchKeyStatus redis.WatchKeyStatus, watchKeyError error, httpStatus int, msgAndArgs ...interface{}) { executed := false - watchKeyHandler := func(key, value string, timeout time.Duration) (redis.WatchKeyStatus, error) { + watchKeyHandler := func(ctx context.Context, key, value string, timeout time.Duration) (redis.WatchKeyStatus, error) { executed = true return watchKeyStatus, watchKeyError } diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go index cdf6ccd7e83d1741afc818830279c463a6b9c94f..2fd0753c3c95217693f3719c15dd8d22a09f49c2 100644 --- a/workhorse/internal/redis/keywatcher.go +++ b/workhorse/internal/redis/keywatcher.go @@ -1,6 +1,7 @@ package redis import ( + "context" "errors" "fmt" "strings" @@ -251,7 +252,7 @@ const ( WatchKeyStatusNoChange ) -func (kw *KeyWatcher) WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error) { +func (kw *KeyWatcher) WatchKey(_ context.Context, key, value string, timeout time.Duration) (WatchKeyStatus, error) { notify := make(chan string, 1) if err := kw.addSubscription(key, notify); err != nil { return WatchKeyStatusNoChange, err diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go index bae49d81bb1893c67774a5e30829fec1168d0998..3abc1bf1107dddd9501ebeee80b2b9f3c978cdc2 100644 --- a/workhorse/internal/redis/keywatcher_test.go +++ b/workhorse/internal/redis/keywatcher_test.go @@ -1,6 +1,7 @@ package redis import ( + "context" "sync" "testing" "time" @@ -10,6 +11,8 @@ import ( "github.com/stretchr/testify/require" ) +var ctx = context.Background() + const ( runnerKey = "runner:build_queue:10" ) @@ -131,7 +134,7 @@ func TestKeyChangesInstantReturn(t *testing.T) { defer kw.Shutdown() kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()} - val, err := kw.WatchKey(runnerKey, tc.watchValue, tc.timeout) + val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, tc.timeout) require.NoError(t, err, "Expected no error") require.Equal(t, tc.expectedStatus, val, "Expected value") @@ -187,7 +190,7 @@ func TestKeyChangesWhenWatching(t *testing.T) { go func() { defer wg.Done() <-ready - val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second) + val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, tc.expectedStatus, val, "Expected value") @@ -245,7 +248,7 @@ func TestKeyChangesParallel(t *testing.T) { go func() { defer wg.Done() <-ready - val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second) + val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, tc.expectedStatus, val, "Expected value") @@ -273,7 +276,7 @@ func TestShutdown(t *testing.T) { go func() { defer wg.Done() - val, err := kw.WatchKey(runnerKey, "something", 10*time.Second) + val, err := kw.WatchKey(ctx, runnerKey, "something", 10*time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, WatchKeyStatusNoChange, val, "Expected value not to change") @@ -295,7 +298,7 @@ func TestShutdown(t *testing.T) { var err error done := make(chan struct{}) go func() { - val, err = kw.WatchKey(runnerKey, "something", 10*time.Second) + val, err = kw.WatchKey(ctx, runnerKey, "something", 10*time.Second) close(done) }()