From 3e27a7f25f68e99e127506137f36b02a587f34c4 Mon Sep 17 00:00:00 2001 From: Shinya Maeda Date: Tue, 2 Sep 2025 11:54:38 +0900 Subject: [PATCH] DWS workhorse proxy graceful termination --- .../duo_agentic_chat_graceful_stop.yml | 12 + ee/lib/api/ai/duo_workflows/workflows.rb | 9 +- workhorse/go.mod | 2 +- workhorse/go.sum | 4 + .../internal/ai_assist/duoworkflow/handler.go | 4 +- .../internal/ai_assist/duoworkflow/runner.go | 152 +++++++++-- .../ai_assist/duoworkflow/runner_test.go | 242 ++++++++++++------ workhorse/internal/api/api.go | 7 +- 8 files changed, 327 insertions(+), 105 deletions(-) create mode 100644 ee/config/feature_flags/gitlab_com_derisk/duo_agentic_chat_graceful_stop.yml diff --git a/ee/config/feature_flags/gitlab_com_derisk/duo_agentic_chat_graceful_stop.yml b/ee/config/feature_flags/gitlab_com_derisk/duo_agentic_chat_graceful_stop.yml new file mode 100644 index 00000000000000..a8c491227e9d2a --- /dev/null +++ b/ee/config/feature_flags/gitlab_com_derisk/duo_agentic_chat_graceful_stop.yml @@ -0,0 +1,12 @@ +--- +name: duo_agentic_chat_graceful_stop +description: | + Gracefully stop agentic chat sessions if it encountered a connection error between client and workhorse. + This feature flag only works for connections via workhorse proxy. Direct gRPC connection is out of scope. +feature_issue_url: https://gitlab.com/gitlab-org/modelops/applied-ml/code-suggestions/ai-assist/-/issues/1302 +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/203460 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/568837 +milestone: '18.4' +group: group::duo chat +type: gitlab_com_derisk +default_enabled: false diff --git a/ee/lib/api/ai/duo_workflows/workflows.rb b/ee/lib/api/ai/duo_workflows/workflows.rb index 179fad4d0d1d14..d7e33281c222c1 100644 --- a/ee/lib/api/ai/duo_workflows/workflows.rb +++ b/ee/lib/api/ai/duo_workflows/workflows.rb @@ -280,11 +280,18 @@ def create_workflow_params 'x-gitlab-unidirectional-streaming' => 'enabled' ).merge(model_metadata_headers) + if_websocket_disconnected = if ::Feature.enabled?(:duo_agentic_chat_graceful_stop, current_user) + 'stop' + else + '' + end + { DuoWorkflow: { Headers: headers, ServiceURI: Gitlab::DuoWorkflow::Client.url(user: current_user), - Secure: Gitlab::DuoWorkflow::Client.secure? + Secure: Gitlab::DuoWorkflow::Client.secure?, + IfWebsocketDisconnected: if_websocket_disconnected } } end diff --git a/workhorse/go.mod b/workhorse/go.mod index 02f0e48686bdb0..732ac12abf2335 100644 --- a/workhorse/go.mod +++ b/workhorse/go.mod @@ -32,7 +32,7 @@ require ( github.com/stretchr/testify v1.11.1 gitlab.com/gitlab-org/gitaly/v16 v16.11.0-rc1.0.20250408053233-c6d43513e93c gitlab.com/gitlab-org/labkit v1.29.0 - gitlab.com/gitlab-org/modelops/applied-ml/code-suggestions/ai-assist/clients/gopb v0.0.0-20250910175730-e1ecde3df22c + gitlab.com/gitlab-org/modelops/applied-ml/code-suggestions/ai-assist/clients/gopb v0.0.0-20251002064626-8fee05c6ae6d go.uber.org/goleak v1.3.0 gocloud.dev v0.40.1-0.20241107185025-56954848c3aa golang.org/x/image v0.28.0 diff --git a/workhorse/go.sum b/workhorse/go.sum index ccbd11c356a55d..7647e0e94b8f48 100644 --- a/workhorse/go.sum +++ b/workhorse/go.sum @@ -638,6 +638,10 @@ gitlab.com/gitlab-org/labkit v1.29.0 h1:+o4pkw1jQqnQjEDXwLlDlhUpkGBV3DnOuCjnee1I gitlab.com/gitlab-org/labkit v1.29.0/go.mod h1:ZHOQIOVQKeOEKvQ/GhGBjUNbV3zWsx8nty6D/SRCyd4= gitlab.com/gitlab-org/modelops/applied-ml/code-suggestions/ai-assist/clients/gopb v0.0.0-20250910175730-e1ecde3df22c h1:LyOZ/4WgbksOXWcCY3MJnQOQTe8vCfSrC/gYRT2cna4= gitlab.com/gitlab-org/modelops/applied-ml/code-suggestions/ai-assist/clients/gopb v0.0.0-20250910175730-e1ecde3df22c/go.mod h1:G/uyhLyG53gWlNSoTuWirrqS42PKcobyiQZvEAaJ9o8= +gitlab.com/gitlab-org/modelops/applied-ml/code-suggestions/ai-assist/clients/gopb v0.0.0-20250917052816-8873c29ad8f9 h1:MznEmww/m9m4u8K5h5dbU/NPpA14f/WaqSCDvhff82E= +gitlab.com/gitlab-org/modelops/applied-ml/code-suggestions/ai-assist/clients/gopb v0.0.0-20250917052816-8873c29ad8f9/go.mod h1:G/uyhLyG53gWlNSoTuWirrqS42PKcobyiQZvEAaJ9o8= +gitlab.com/gitlab-org/modelops/applied-ml/code-suggestions/ai-assist/clients/gopb v0.0.0-20251002064626-8fee05c6ae6d h1:H291G42KNo63LnkJ/BRuiJ4cz9aXjJFvXwrT/lyJTEk= +gitlab.com/gitlab-org/modelops/applied-ml/code-suggestions/ai-assist/clients/gopb v0.0.0-20251002064626-8fee05c6ae6d/go.mod h1:G/uyhLyG53gWlNSoTuWirrqS42PKcobyiQZvEAaJ9o8= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.etcd.io/raft/v3 v3.6.0 h1:5NtvbDVYpnfZWcIHgGRk9DyzkBIXOi8j+DDp1IcnUWQ= go.etcd.io/raft/v3 v3.6.0/go.mod h1:nLvLevg6+xrVtHUmVaTcTz603gQPHfh7kUAwV6YpfGo= diff --git a/workhorse/internal/ai_assist/duoworkflow/handler.go b/workhorse/internal/ai_assist/duoworkflow/handler.go index 7ac0415651de8f..cd9ea045d7f179 100644 --- a/workhorse/internal/ai_assist/duoworkflow/handler.go +++ b/workhorse/internal/ai_assist/duoworkflow/handler.go @@ -32,8 +32,8 @@ func Handler(rails *api.API) http.Handler { } defer func() { _ = runner.Close() }() - if err := runner.Execute(r.Context()); err != nil { - log.WithRequest(r).WithError(err).Error() + if result := runner.Execute(); result.err != nil { + log.WithRequest(r).WithError(result.err).Error() } }, "") } diff --git a/workhorse/internal/ai_assist/duoworkflow/runner.go b/workhorse/internal/ai_assist/duoworkflow/runner.go index 21fd62c52659af..698e0302c6ece8 100644 --- a/workhorse/internal/ai_assist/duoworkflow/runner.go +++ b/workhorse/internal/ai_assist/duoworkflow/runner.go @@ -45,13 +45,29 @@ type workflowStream interface { } type runner struct { - rails *api.API - token string - originalReq *http.Request - conn websocketConn - wf workflowStream - client *Client - sendMu sync.Mutex + rails *api.API + token string + originalReq *http.Request + conn websocketConn + wf workflowStream + client *Client + sendMu sync.Mutex + ifWebsocketDisconnected string +} + +// Channel represents the connection between two nodes. +type Channel string + +const ( + // ClientAndWorkhorse is for the websocket connection between client and Workhorse. + ClientAndWorkhorse Channel = "client-workhorse" + // WorkhorseAndDuoWorkflowService is for the gRPC connection between Workhorse and the Duo Workflow Service. + WorkhorseAndDuoWorkflowService Channel = "workhorse-duoworkflowservice" +) + +type result struct { + channel Channel + err error } func newRunner(conn websocketConn, rails *api.API, r *http.Request, cfg *api.DuoWorkflow) (*runner, error) { @@ -66,47 +82,74 @@ func newRunner(conn websocketConn, rails *api.API, r *http.Request, cfg *api.Duo } return &runner{ - rails: rails, - token: cfg.Headers["x-gitlab-oauth-token"], - originalReq: r, - conn: conn, - wf: wf, - client: client, + rails: rails, + token: cfg.Headers["x-gitlab-oauth-token"], + originalReq: r, + conn: conn, + wf: wf, + client: client, + ifWebsocketDisconnected: cfg.IfWebsocketDisconnected, }, nil } -func (r *runner) Execute(ctx context.Context) error { - errCh := make(chan error, 2) +// IfWebsocketDisconnectedStop (`stop`): +// Workhorse sends a signal to Duo Workflow Service to stop the workflow execution. +// After the signal recevied, Duo Workflow Service attempts to cleanup the workflow entry through the workhorse-DWS connection. +const IfWebsocketDisconnectedStop = "stop" + +func (r *runner) Execute() result { + log.WithRequest(r.originalReq).Info("duoworkflow: Executing runner...") + + resultCh := make(chan result, 2) go func() { + ret := result{err: nil, channel: ClientAndWorkhorse} + for { if err := r.handleWebSocketMessage(); err != nil { - errCh <- err + ret.err = err + resultCh <- ret return } } }() go func() { + ret := result{err: nil, channel: WorkhorseAndDuoWorkflowService} + for { action, err := r.wf.Recv() if err != nil { if err == io.EOF { - errCh <- nil // Expected error when a workflow ends + // Expected error when a workflow ends + resultCh <- ret } else { - errCh <- fmt.Errorf("duoworkflow: failed to read a gRPC message: %v", err) + ret.err = fmt.Errorf("duoworkflow: failed to read a gRPC message: %v", err) + resultCh <- ret } return } - if err := r.handleAgentAction(ctx, action); err != nil { - errCh <- err + if err := r.handleAgentAction(r.originalReq.Context(), action); err != nil { + ret.err = err + resultCh <- ret return } } }() - return <-errCh + // Waiting for the result from one of the channels + ret := <-resultCh + + if ret.err != nil { + log.WithRequest(r.originalReq).WithError(ret.err).WithFields( + log.Fields{"channel": ret.channel}, + ).Error("duoworkflow: connection terminated") + + r.handleChannelError(ret, resultCh) + } + + return ret } func (r *runner) Close() error { @@ -202,6 +245,10 @@ func (r *runner) handleAgentAction(ctx context.Context, action *pb.Action) error } if err = r.conn.WriteMessage(websocket.BinaryMessage, message); err != nil { + if r.ifWebsocketDisconnected != "" { + r.sendClientNotAvailableResponse(action) + return nil + } return fmt.Errorf("handleAgentAction: failed to send WS message: %v", err) } } @@ -209,6 +256,69 @@ func (r *runner) handleAgentAction(ctx context.Context, action *pb.Action) error return nil } +func (r *runner) handleChannelError(ret result, resultCh chan result) { + switch ret.channel { + case ClientAndWorkhorse: + r.handleClientAndWorkhorseChannelError(resultCh) + case WorkhorseAndDuoWorkflowService: + // no-op + return + } +} + +func (r *runner) handleClientAndWorkhorseChannelError(resultCh chan result) { + switch r.ifWebsocketDisconnected { + case IfWebsocketDisconnectedStop: + log.WithRequest(r.originalReq).Info("handleClientAndWorkhorseChannelError: Sending stop workflow request...") + + err := r.threadSafeSend(&pb.ClientEvent{ + Response: &pb.ClientEvent_StopWorkflow{ + StopWorkflow: &pb.StopWorkflowRequest{ + Reason: "Websocket disconnected between client and workhorse.", + }, + }, + }) + + if err != nil { + log.WithRequest(r.originalReq).WithError(err).Error("handleClientAndWorkhorseChannelError: Failed to send stop workflow request") + return + } + default: + return + } + + log.WithRequest(r.originalReq).Info("handleClientAndWorkhorseChannelError: Waiting for the other channel result...") + ret := <-resultCh + + if ret.err != nil { + log.WithRequest(r.originalReq).WithError(ret.err).Error("handleClientAndWorkhorseChannelError") + return + } + + log.WithRequest(r.originalReq).Info("handleClientAndWorkhorseChannelError: Done") +} + +func (r *runner) sendClientNotAvailableResponse(action *pb.Action) { + err := r.threadSafeSend(&pb.ClientEvent{ + Response: &pb.ClientEvent_ActionResponse{ + ActionResponse: &pb.ActionResponse{ + RequestID: action.RequestID, + // Response: "", + ResponseType: &pb.ActionResponse_PlainTextResponse{ + PlainTextResponse: &pb.PlainTextResponse{ + Response: "", + Error: "Client is not available to execute this action because they are not reachable. Find the other solution.", + }, + }, + }, + }, + }) + + if err != nil { + log.WithRequest(r.originalReq).WithError(err).Error("sendClientNotAvailableResponse: Failed to send client not available response") + } +} + func (r *runner) threadSafeSend(event *pb.ClientEvent) error { r.sendMu.Lock() defer r.sendMu.Unlock() diff --git a/workhorse/internal/ai_assist/duoworkflow/runner_test.go b/workhorse/internal/ai_assist/duoworkflow/runner_test.go index dba71376d8c1fb..46247e03c339ae 100644 --- a/workhorse/internal/ai_assist/duoworkflow/runner_test.go +++ b/workhorse/internal/ai_assist/duoworkflow/runner_test.go @@ -20,6 +20,7 @@ import ( ) type mockWebSocketConn struct { +<<<<<<< HEAD readMessages [][]byte writeMessages [][]byte readIndex int @@ -29,6 +30,15 @@ type mockWebSocketConn struct { writeControlError error setDeadlineError error blockCh chan bool +======= + readMessages [][]byte + writeMessages [][]byte + readIndex int + readError error + writeError error + blockCh chan bool + nextCh chan bool +>>>>>>> 527819e4cd1c (DWS workhorse proxy graceful termination) } func (m *mockWebSocketConn) ReadMessage() (int, []byte, error) { @@ -36,6 +46,10 @@ func (m *mockWebSocketConn) ReadMessage() (int, []byte, error) { <-m.blockCh } + if m.nextCh != nil { + defer func() { m.nextCh <- true }() + } + if m.readError != nil { return 0, nil, m.readError } @@ -139,27 +153,30 @@ func Test_newRunner(t *testing.T) { require.NotNil(t, runner.wf) require.NotNil(t, runner.client) require.Equal(t, apiClient, runner.rails) + require.Empty(t, runner.ifWebsocketDisconnected) runner.Close() } func TestRunner_Execute(t *testing.T) { tests := []struct { - name string - wsMessages [][]byte - recvActions []*pb.Action - writeMsgCount int - sendEventsCount int - expectedErrMsg string - wsBlockCh chan bool - wfBlockCh chan bool + name string + wsMessages [][]byte + recvActions []*pb.Action + writeMsgCount int + sendEventsCount int + expectedErrChannel Channel + expectedErrMsg string + wsBlockCh chan bool + wfBlockCh chan bool }{ { - name: "ws messages", - wsMessages: [][]byte{[]byte(`{"type": "test"}`), []byte(`{"type": "test2"}`)}, - wfBlockCh: make(chan bool), - sendEventsCount: 2, - expectedErrMsg: "handleWebSocketMessage: failed to read a WS message: EOF", + name: "ws messages", + wsMessages: [][]byte{[]byte(`{"type": "test"}`), []byte(`{"type": "test2"}`)}, + wfBlockCh: make(chan bool), + sendEventsCount: 2, + expectedErrChannel: ClientAndWorkhorse, + expectedErrMsg: "handleWebSocketMessage: failed to read a WS message: EOF", }, { name: "wf actions", @@ -199,19 +216,21 @@ func TestRunner_Execute(t *testing.T) { Client: &http.Client{}, URL: testURL, }, - token: "test-token", - originalReq: &http.Request{}, - conn: mockConn, - wf: mockWf, + token: "test-token", + originalReq: &http.Request{ + URL: testURL, + }, + conn: mockConn, + wf: mockWf, } - ctx := context.Background() - err := r.Execute(ctx) + result := r.Execute() if tt.expectedErrMsg != "" { - require.EqualError(t, err, tt.expectedErrMsg) + require.Equal(t, tt.expectedErrChannel, result.channel) + require.EqualError(t, result.err, tt.expectedErrMsg) } else { - require.NoError(t, err) + require.NoError(t, result.err) } require.Len(t, mockWf.sendEvents, tt.sendEventsCount) @@ -222,24 +241,27 @@ func TestRunner_Execute(t *testing.T) { func TestRunner_Execute_with_errors(t *testing.T) { tests := []struct { - name string - wsReadError error - wfRecvError error - expectedErrMsg string - wsBlockCh chan bool - wfBlockCh chan bool + name string + wsReadError error + wfRecvError error + expectedErrChannel Channel + expectedErrMsg string + wsBlockCh chan bool + wfBlockCh chan bool }{ { - name: "websocket read error", - wsReadError: errors.New("read error"), - wfBlockCh: make(chan bool), - expectedErrMsg: "handleWebSocketMessage: failed to read a WS message: read error", + name: "websocket read error", + wsReadError: errors.New("read error"), + wfBlockCh: make(chan bool), + expectedErrChannel: ClientAndWorkhorse, + expectedErrMsg: "handleWebSocketMessage: failed to read a WS message: read error", }, { - name: "workflow recv error", - wfRecvError: errors.New("recv error"), - wsBlockCh: make(chan bool), - expectedErrMsg: "duoworkflow: failed to read a gRPC message: recv error", + name: "workflow recv error", + wfRecvError: errors.New("recv error"), + wsBlockCh: make(chan bool), + expectedErrChannel: WorkhorseAndDuoWorkflowService, + expectedErrMsg: "duoworkflow: failed to read a gRPC message: recv error", }, { name: "workflow EOF error", @@ -266,19 +288,21 @@ func TestRunner_Execute_with_errors(t *testing.T) { Client: &http.Client{}, URL: testURL, }, - token: "test-token", - originalReq: &http.Request{}, - conn: mockConn, - wf: mockWf, + token: "test-token", + originalReq: &http.Request{ + URL: testURL, + }, + conn: mockConn, + wf: mockWf, } - ctx := context.Background() - err := r.Execute(ctx) + result := r.Execute() if tt.expectedErrMsg != "" { - require.EqualError(t, err, tt.expectedErrMsg) + require.Equal(t, tt.expectedErrChannel, result.channel) + require.EqualError(t, result.err, tt.expectedErrMsg) } else { - require.NoError(t, err) + require.NoError(t, result.err) } }) } @@ -286,27 +310,31 @@ func TestRunner_Execute_with_errors(t *testing.T) { func TestRunner_handleWebSocketMessage(t *testing.T) { tests := []struct { - name string - message []byte - readError error - sendError error - expectedErrMsg string + name string + message []byte + readError error + sendError error + expectedErrChannel Channel + expectedErrMsg string }{ { - name: "read error", - readError: errors.New("read error"), - expectedErrMsg: "handleWebSocketMessage: failed to read a WS message: read error", + name: "read error", + readError: errors.New("read error"), + expectedErrChannel: ClientAndWorkhorse, + expectedErrMsg: "handleWebSocketMessage: failed to read a WS message: read error", }, { - name: "invalid json", - message: []byte("invalid json"), - expectedErrMsg: "handleWebSocketMessage: failed to unmarshal a WS message: proto:", + name: "invalid json", + message: []byte("invalid json"), + expectedErrChannel: ClientAndWorkhorse, + expectedErrMsg: "handleWebSocketMessage: failed to unmarshal a WS message: proto:", }, { - name: "send error", - message: []byte(`{"type": "test"}`), - sendError: errors.New("send error"), - expectedErrMsg: "handleWebSocketMessage: failed to write a gRPC message: send error", + name: "send error", + message: []byte(`{"type": "test"}`), + sendError: errors.New("send error"), + expectedErrChannel: ClientAndWorkhorse, + expectedErrMsg: "handleWebSocketMessage: failed to write a gRPC message: send error", }, { name: "send EOF error", @@ -337,10 +365,12 @@ func TestRunner_handleWebSocketMessage(t *testing.T) { Client: &http.Client{}, URL: testURL, }, - token: "test-token", - originalReq: &http.Request{}, - conn: mockConn, - wf: mockWf, + token: "test-token", + originalReq: &http.Request{ + URL: testURL, + }, + conn: mockConn, + wf: mockWf, } err := r.handleWebSocketMessage() @@ -357,13 +387,14 @@ func TestRunner_handleWebSocketMessage(t *testing.T) { func TestRunner_handleAgentAction(t *testing.T) { tests := []struct { - name string - action *pb.Action - wsWriteError error - wfSendError error - expectedErrMsg string - shouldCallWS bool - shouldCallWF bool + name string + action *pb.Action + wsWriteError error + wfSendError error + expectedErrChannel Channel + expectedErrMsg string + shouldCallWS bool + shouldCallWF bool }{ { name: "successful HTTP request action", @@ -389,8 +420,9 @@ func TestRunner_handleAgentAction(t *testing.T) { }, }, }, - wfSendError: errors.New("workflow send failed"), - expectedErrMsg: "handleAgentAction: failed to send gRPC message: workflow send failed", + wfSendError: errors.New("workflow send failed"), + expectedErrChannel: WorkhorseAndDuoWorkflowService, + expectedErrMsg: "handleAgentAction: failed to send gRPC message: workflow send failed", }, { name: "successful non-HTTP action forwarded to websocket", @@ -414,8 +446,9 @@ func TestRunner_handleAgentAction(t *testing.T) { }, }, }, - wsWriteError: errors.New("websocket write failed"), - expectedErrMsg: "handleAgentAction: failed to send WS message: websocket write failed", + wsWriteError: errors.New("websocket write failed"), + expectedErrChannel: ClientAndWorkhorse, + expectedErrMsg: "handleAgentAction: failed to send WS message: websocket write failed", }, { name: "action with nil action type", @@ -448,15 +481,19 @@ func TestRunner_handleAgentAction(t *testing.T) { sendError: tt.wfSendError, } + testURL, _ := url.Parse("http://example.com") + r := &runner{ rails: &api.API{ Client: server.Client(), URL: serverURL, }, - token: "test-token", - originalReq: &http.Request{}, - conn: mockConn, - wf: mockWf, + token: "test-token", + originalReq: &http.Request{ + URL: testURL, + }, + conn: mockConn, + wf: mockWf, } ctx := context.Background() @@ -528,6 +565,27 @@ func TestRunner_closeWebSocketConnection(t *testing.T) { }, } +func TestRunner_Execute_with_stop(t *testing.T) { + tests := []struct { + name string + wsReadError error + wfReadError error + recvActions []*pb.Action + expectedErrChannel Channel + expectedErrMsg string + wsBlockCh chan bool + wfBlockCh chan bool + }{ + { + name: "websocket read error", + wsReadError: errors.New("ws read error"), + wfReadError: errors.New("wf read error"), + wfBlockCh: make(chan bool), + expectedErrChannel: ClientAndWorkhorse, + expectedErrMsg: "handleWebSocketMessage: failed to read a WS message: ws read error", + }, + } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { mockConn := &mockWebSocketConn{ @@ -547,6 +605,36 @@ func TestRunner_closeWebSocketConnection(t *testing.T) { } else { require.NoError(t, err) } + readError: tt.wsReadError, + nextCh: tt.wfBlockCh, + } + mockWf := &mockWorkflowStream{ + recvActions: tt.recvActions, + blockCh: tt.wfBlockCh, + } + + testURL, _ := url.Parse("http://example.com") + r := &runner{ + rails: &api.API{ + Client: &http.Client{}, + URL: testURL, + }, + token: "test-token", + originalReq: &http.Request{ + URL: testURL, + }, + conn: mockConn, + wf: mockWf, + ifWebsocketDisconnected: "stop", + } + + result := r.Execute() + + require.Equal(t, tt.expectedErrChannel, result.channel) + require.EqualError(t, result.err, tt.expectedErrMsg) + require.Len(t, mockWf.sendEvents, 1, "Expected one workflow event to be sent") + response := mockWf.sendEvents[0].Response.(*pb.ClientEvent_StopWorkflow).StopWorkflow.Reason + require.Equal(t, "Websocket disconnected between client and workhorse.", response) }) } } diff --git a/workhorse/internal/api/api.go b/workhorse/internal/api/api.go index 3026e22beddf75..c414b849b61740 100644 --- a/workhorse/internal/api/api.go +++ b/workhorse/internal/api/api.go @@ -144,9 +144,10 @@ type RemoteObject struct { // DuoWorkflow holds configuration for the Duo Workflow service. type DuoWorkflow struct { - Headers map[string]string - ServiceURI string - Secure bool + Headers map[string]string + ServiceURI string + Secure bool + IfWebsocketDisconnected string } // Response represents a structure containing various GitLab-related environment variables. -- GitLab