diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index 8d1d6e941184210db3edf52a48a0c471a1d4d0f9..ceb0fabcf54673ab4cd3af83eb28e4542407cdad 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -206,18 +206,20 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor FOR UPDATE SKIP LOCKED ) , candidate AS ( - SELECT id - FROM replication_queue - WHERE id IN ( - SELECT DISTINCT FIRST_VALUE(queue.id) OVER (PARTITION BY lock_id, job->>'change' ORDER BY queue.created_at) + SELECT collapse_to AS id + FROM ( + SELECT + row_number() OVER repo_change = 1 AS first_in_partition, + max(queue.id) OVER repo_change AS collapse_to FROM replication_queue AS queue JOIN lock ON queue.lock_id = lock.id - WHERE queue.state IN ('ready', 'failed' ) + WHERE queue.state IN ('ready', 'failed') AND NOT EXISTS (SELECT 1 FROM replication_queue_job_lock WHERE lock_id = queue.lock_id) - ) - ORDER BY created_at + WINDOW repo_change AS (PARTITION BY lock_id, job->>'change') + ORDER BY created_at + ) AS repo_changes + WHERE first_in_partition LIMIT $3 - FOR UPDATE ) , job AS ( UPDATE replication_queue AS queue @@ -241,8 +243,7 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor WHERE lock.id = tracked.lock_id ) SELECT id, state, created_at, updated_at, lock_id, attempt, job, meta - FROM job - ORDER BY id` + FROM job` rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, nodeStorage, count) if err != nil { return nil, fmt.Errorf("query: %w", err) @@ -272,7 +273,7 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J query := ` WITH existing AS ( - SELECT id, lock_id, updated_at, job + SELECT id, lock_id, created_at, updated_at, job FROM replication_queue WHERE id = ANY($1) AND state = 'in_progress' @@ -295,10 +296,9 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J WHERE existing.id = queue.id OR ( queue.state = 'ready' - AND queue.created_at < existing.updated_at + AND queue.created_at < existing.created_at AND queue.lock_id = existing.lock_id AND queue.job->>'change' = existing.job->>'change' - AND queue.job->>'source_node_storage' = existing.job->>'source_node_storage' ) RETURNING queue.id, queue.lock_id ) diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index c8be612e8c913128887d2ccbce03e59d4c535acb..502ab9c0db200862c31f231665c4ce721e284640 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -300,10 +300,10 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { } // first request to deque - expectedEvents1 := []ReplicationEvent{events[0], events[2], events[4]} + expectedEvents1 := []ReplicationEvent{events[3], events[2], events[4]} expectedJobLocks1 := []JobLockRow{ - {JobID: events[0].ID, LockID: "praefect|gitaly-1|/project/path-1"}, {JobID: events[2].ID, LockID: "praefect|gitaly-1|/project/path-1"}, + {JobID: events[3].ID, LockID: "praefect|gitaly-1|/project/path-1"}, {JobID: events[4].ID, LockID: "praefect|gitaly-1|/project/path-2"}, } @@ -393,7 +393,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test // there is only one single lock for all fetched events because of their 'repo' and 'target' combination {ID: "praefect|gitaly-1|/project/path-1", Acquired: true}, }) - requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}}) + requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 2, LockID: "praefect|gitaly-1|/project/path-1"}}) var eventsType2 []ReplicationEvent for i := 0; i < 2; i++ { @@ -410,8 +410,8 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test {ID: "praefect|gitaly-1|/project/path-2", Acquired: true}, }) requireJobLocks(t, ctx, db, []JobLockRow{ - {JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}, - {JobID: 3, LockID: "praefect|gitaly-1|/project/path-2"}, + {JobID: 2, LockID: "praefect|gitaly-1|/project/path-1"}, + {JobID: 4, LockID: "praefect|gitaly-1|/project/path-2"}, }) } @@ -527,15 +527,15 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { {ID: events[6].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ - {JobID: events[0].ID, LockID: events[0].LockID}, - {JobID: events[2].ID, LockID: events[2].LockID}, + {JobID: events[3].ID, LockID: events[3].LockID}, {JobID: events[4].ID, LockID: events[4].LockID}, + {JobID: events[5].ID, LockID: events[5].LockID}, }) // release lock for events of second type - acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{events[2].ID}) + acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{events[5].ID}) require.NoError(t, err) - require.Equal(t, []uint64{3}, acknowledge1) + require.Equal(t, []uint64{6}, acknowledge1) requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: true}, {ID: events[2].LockID, Acquired: false}, @@ -543,7 +543,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { {ID: events[6].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ - {JobID: events[0].ID, LockID: events[0].LockID}, + {JobID: events[3].ID, LockID: events[3].LockID}, {JobID: events[4].ID, LockID: events[4].LockID}, }) @@ -557,9 +557,9 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { {ID: events[6].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ - {JobID: events[0].ID, LockID: events[0].LockID}, - {JobID: events[2].ID, LockID: events[2].LockID}, + {JobID: events[3].ID, LockID: events[3].LockID}, {JobID: events[4].ID, LockID: events[4].LockID}, + {JobID: events[5].ID, LockID: events[5].LockID}, }) // creation of the new event that is equal to those already dequeue and processed @@ -568,9 +568,9 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { _, err = queue.Enqueue(ctx, eventType1) require.NoError(t, err) - acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[0].ID, events[4].ID}) + acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[3].ID, events[4].ID}) require.NoError(t, err) - require.Equal(t, []uint64{events[0].ID, events[4].ID}, acknowledge2) + require.Equal(t, []uint64{events[3].ID, events[4].ID}, acknowledge2) requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: false}, {ID: events[2].LockID, Acquired: true}, @@ -578,7 +578,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { {ID: events[6].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ - {JobID: events[2].ID, LockID: events[2].LockID}, + {JobID: events[5].ID, LockID: events[5].LockID}, }) dequeuedEvents3, err := queue.Dequeue(ctx, "praefect", "gitaly-2", 3) @@ -591,13 +591,13 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { {ID: events[6].LockID, Acquired: true}, }) requireJobLocks(t, ctx, db, []JobLockRow{ - {JobID: events[2].ID, LockID: events[2].LockID}, + {JobID: events[5].ID, LockID: events[5].LockID}, {JobID: events[6].ID, LockID: events[6].LockID}, }) - acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[2].ID, events[6].ID}) + acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[5].ID, events[6].ID}) require.NoError(t, err) - require.Equal(t, []uint64{events[2].ID, events[6].ID}, acknowledged3) + require.Equal(t, []uint64{events[5].ID, events[6].ID}, acknowledged3) requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: false}, {ID: events[2].LockID, Acquired: false}, @@ -1147,7 +1147,7 @@ func requireJobLocks(t *testing.T, ctx context.Context, db glsql.DB, expected [] for i := range actual { actual[i].TriggeredAt = time.Time{} } - require.ElementsMatch(t, expected, actual) + require.Equal(t, expected, actual) } func fetchJobLocks(t *testing.T, ctx context.Context, db glsql.DB) []JobLockRow {