diff --git a/internal/cgroups/cgroups_linux_test.go b/internal/cgroups/cgroups_linux_test.go index c3e7e143d7377732c339e08280f08021ff73fa86..80a307a941979afdcf14c8e936b8b2a91faaa14e 100644 --- a/internal/cgroups/cgroups_linux_test.go +++ b/internal/cgroups/cgroups_linux_test.go @@ -10,6 +10,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" ) -func TestNewManager(t *testing.T) { +func TestNewManagerNoop(t *testing.T) { require.IsType(t, &NoopManager{}, NewManager(cgroups.Config{}, testhelper.SharedLogger(t), 1)) } diff --git a/internal/cgroups/handler_linux.go b/internal/cgroups/handler_linux.go new file mode 100644 index 0000000000000000000000000000000000000000..2b92253fb890422d839c2b876075be1333f77e3c --- /dev/null +++ b/internal/cgroups/handler_linux.go @@ -0,0 +1,392 @@ +//go:build linux + +package cgroups + +import ( + "errors" + "fmt" + "io/fs" + "path/filepath" + "strings" + "time" + + "github.com/containerd/cgroups/v3/cgroup1" + "github.com/containerd/cgroups/v3/cgroup2" + "github.com/opencontainers/runtime-spec/specs-go" + "github.com/prometheus/client_golang/prometheus" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + cgroupscfg "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/cgroups" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" +) + +type ( + // createCgroupFunc is a function that creates a new cgroup. + createCgroupFunc[T any, H any] func(hierarchy H, resources *specs.LinuxResources, path string) (T, error) + // loadCgroupFunc is a function that loads an existing cgroup. + loadCgroupFunc[T any, H any] func(hierarchy H, path string) (T, error) + // addToCgroupFunc is a function that adds a process to an existing cgroup. + addToCgroupFunc[T any] func(control T, pid int) error + // deleteCgroupFunc is a function that deletes a cgroup. + deleteCgroupFunc[T any] func(control T) error +) + +// genericHandler is a cgroup handler that can be instantiated for either cgroups-v1 +// or cgroups-v2. +type genericHandler[T any, H any] struct { + cfg cgroupscfg.Config + logger log.Logger + pid int + supportsClone bool + + // hierarchy is either a cgroup1.Hierarchy or the cgroup2 Mountpoint path. + hierarchy H + createFunc createCgroupFunc[T, H] + loadFunc loadCgroupFunc[T, H] + addFunc addToCgroupFunc[T] + deleteFunc deleteCgroupFunc[T] + + metrics *cgroupsMetrics +} + +func newV1GenericHandler( + cfg cgroupscfg.Config, + logger log.Logger, + pid int, +) *genericHandler[cgroup1.Cgroup, cgroup1.Hierarchy] { + return &genericHandler[cgroup1.Cgroup, cgroup1.Hierarchy]{ + cfg: cfg, + logger: logger, + pid: pid, + supportsClone: false, + hierarchy: func() ([]cgroup1.Subsystem, error) { + return defaultSubsystems(cfg.Mountpoint) + }, + metrics: newV1CgroupsMetrics(), + createFunc: func(hierarchy cgroup1.Hierarchy, resources *specs.LinuxResources, cgroupPath string) (cgroup1.Cgroup, error) { + return cgroup1.New( + cgroup1.StaticPath(cgroupPath), + resources, + cgroup1.WithHiearchy(hierarchy)) + }, + loadFunc: func(hierarchy cgroup1.Hierarchy, cgroupPath string) (cgroup1.Cgroup, error) { + return cgroup1.Load( + cgroup1.StaticPath(cgroupPath), + cgroup1.WithHiearchy(hierarchy), + ) + }, + addFunc: func(control cgroup1.Cgroup, pid int) error { + return control.Add(cgroup1.Process{Pid: pid}) + }, + deleteFunc: func(control cgroup1.Cgroup) error { + return control.Delete() + }, + } +} + +func newV2GenericHandler( + cfg cgroupscfg.Config, + logger log.Logger, + pid int, +) *genericHandler[*cgroup2.Manager, string] { + return &genericHandler[*cgroup2.Manager, string]{ + cfg: cfg, + logger: logger, + pid: pid, + supportsClone: true, + hierarchy: cfg.Mountpoint, + metrics: newV2CgroupsMetrics(), + createFunc: func(mountpoint string, resources *specs.LinuxResources, cgroupPath string) (*cgroup2.Manager, error) { + return cgroup2.NewManager( + mountpoint, + "/"+cgroupPath, + cgroup2.ToResources(resources), + ) + }, + loadFunc: func(mountpoint string, cgroupPath string) (*cgroup2.Manager, error) { + return cgroup2.Load("/"+cgroupPath, cgroup2.WithMountpoint(mountpoint)) + }, + addFunc: func(control *cgroup2.Manager, pid int) error { + return control.AddProc(uint64(pid)) + }, + deleteFunc: func(control *cgroup2.Manager) error { + return control.Delete() + }, + } +} + +func (cvh *genericHandler[T, H]) currentProcessCgroup() string { + return config.GetGitalyProcessTempDir(cvh.cfg.HierarchyRoot, cvh.pid) +} + +func (cvh *genericHandler[T, H]) createCgroup(repoResources *specs.LinuxResources, cgroupPath string) error { + _, err := cvh.createFunc(cvh.hierarchy, repoResources, cgroupPath) + return err +} + +func (cvh *genericHandler[T, H]) addToCgroup(pid int, cgroupPath string) error { + control, err := cvh.loadFunc(cvh.hierarchy, cgroupPath) + if err != nil { + return err + } + + if err := cvh.addFunc(control, pid); err != nil { + // Command could finish so quickly before we can add it to a cgroup, so + // we don't consider it an error. + if strings.Contains(err.Error(), "no such process") { + return nil + } + return fmt.Errorf("failed adding process to cgroup: %w", err) + } + + return nil +} + +func (cvh *genericHandler[T, H]) setupParent(parentResources *specs.LinuxResources) error { + if _, err := cvh.createFunc(cvh.hierarchy, parentResources, cvh.currentProcessCgroup()); err != nil { + return fmt.Errorf("failed creating parent cgroup: %w", err) + } + return nil +} + +func (cvh *genericHandler[T, H]) cleanup() error { + processCgroupPath := cvh.currentProcessCgroup() + + control, err := cvh.loadFunc(cvh.hierarchy, processCgroupPath) + if err != nil { + return err + } + + if err := cvh.deleteFunc(control); err != nil { + return fmt.Errorf("failed cleaning up cgroup %s: %w", processCgroupPath, err) + } + + return nil +} + +func (cvh *genericHandler[T, H]) repoPath(groupID int) string { + return filepath.Join(cvh.currentProcessCgroup(), fmt.Sprintf("repos-%d", groupID)) +} + +func (cvh *genericHandler[T, H]) supportsCloneIntoCgroup() bool { + return cvh.supportsClone +} + +func (cvh *genericHandler[T, H]) stats() (Stats, error) { + processCgroupPath := cvh.currentProcessCgroup() + + control, err := cvh.loadFunc(cvh.hierarchy, processCgroupPath) + if err != nil { + return Stats{}, err + } + + switch c := any(control).(type) { + case cgroup1.Cgroup: + return v1Stats(c, processCgroupPath) + case *cgroup2.Manager: + return v2stats(c, processCgroupPath) + default: + return Stats{}, errors.New("unknown cgroup type") + } +} + +func (cvh *genericHandler[T, H]) collect(repoPath string, ch chan<- prometheus.Metric) { + logger := cvh.logger.WithField("cgroup_path", repoPath) + control, err := cvh.loadFunc(cvh.hierarchy, repoPath) + if err != nil { + logger.WithError(err).Warn("unable to load cgroup controller") + return + } + + switch c := any(control).(type) { + case cgroup1.Cgroup: + v1Collect(c, any(cvh.hierarchy).(cgroup1.Hierarchy), cvh.metrics, repoPath, cvh.logger, ch) + case *cgroup2.Manager: + v2Collect(c, cvh.metrics, repoPath, cvh.logger, ch) + } +} + +func v1Stats(control cgroup1.Cgroup, processCgroupPath string) (Stats, error) { + metrics, err := control.Stat() + if err != nil { + return Stats{}, fmt.Errorf("failed to fetch metrics %s: %w", processCgroupPath, err) + } + + return Stats{ + ParentStats: CgroupStats{ + CPUThrottledCount: metrics.CPU.Throttling.ThrottledPeriods, + CPUThrottledDuration: float64(metrics.CPU.Throttling.ThrottledTime) / float64(time.Second), + MemoryUsage: metrics.Memory.Usage.Usage, + MemoryLimit: metrics.Memory.Usage.Limit, + OOMKills: metrics.MemoryOomControl.OomKill, + UnderOOM: metrics.MemoryOomControl.UnderOom != 0, + }, + }, nil +} + +func v2stats(control *cgroup2.Manager, processCgroupPath string) (Stats, error) { + metrics, err := control.Stat() + if err != nil { + return Stats{}, fmt.Errorf("failed to fetch metrics %s: %w", processCgroupPath, err) + } + + stats := Stats{ + ParentStats: CgroupStats{ + CPUThrottledCount: metrics.CPU.NrThrottled, + CPUThrottledDuration: float64(metrics.CPU.ThrottledUsec) / float64(time.Second), + MemoryUsage: metrics.Memory.Usage, + MemoryLimit: metrics.Memory.UsageLimit, + }, + } + if metrics.MemoryEvents != nil { + stats.ParentStats.OOMKills = metrics.MemoryEvents.OomKill + } + return stats, nil +} + +func v1Collect(control cgroup1.Cgroup, hierarchy cgroup1.Hierarchy, m *cgroupsMetrics, repoPath string, logger log.Logger, ch chan<- prometheus.Metric) { + if metrics, err := control.Stat(); err != nil { + logger.WithError(err).Warn("unable to get cgroup stats") + } else { + memoryMetric := m.memoryReclaimAttemptsTotal.WithLabelValues(repoPath) + memoryMetric.Set(float64(metrics.Memory.Usage.Failcnt)) + ch <- memoryMetric + + cpuUserMetric := m.cpuUsage.WithLabelValues(repoPath, "user") + cpuUserMetric.Set(float64(metrics.CPU.Usage.User)) + ch <- cpuUserMetric + + ch <- prometheus.MustNewConstMetric( + m.cpuCFSPeriods, + prometheus.CounterValue, + float64(metrics.CPU.Throttling.Periods), + repoPath, + ) + + ch <- prometheus.MustNewConstMetric( + m.cpuCFSThrottledPeriods, + prometheus.CounterValue, + float64(metrics.CPU.Throttling.ThrottledPeriods), + repoPath, + ) + + ch <- prometheus.MustNewConstMetric( + m.cpuCFSThrottledTime, + prometheus.CounterValue, + float64(metrics.CPU.Throttling.ThrottledTime)/float64(time.Second), + repoPath, + ) + + cpuKernelMetric := m.cpuUsage.WithLabelValues(repoPath, "kernel") + cpuKernelMetric.Set(float64(metrics.CPU.Usage.Kernel)) + ch <- cpuKernelMetric + } + + if subsystems, err := hierarchy(); err != nil { + logger.WithError(err).Warn("unable to get cgroup hierarchy") + } else { + for _, subsystem := range subsystems { + processes, err := control.Processes(subsystem.Name(), true) + if err != nil { + logger.WithField("subsystem", subsystem.Name()). + WithError(err). + Warn("unable to get process list") + continue + } + + procsMetric := m.procs.WithLabelValues(repoPath, string(subsystem.Name())) + procsMetric.Set(float64(len(processes))) + ch <- procsMetric + } + } +} + +func v2Collect(control *cgroup2.Manager, m *cgroupsMetrics, repoPath string, logger log.Logger, ch chan<- prometheus.Metric) { + if metrics, err := control.Stat(); err != nil { + logger.WithError(err).Warn("unable to get cgroup stats") + } else { + cpuUserMetric := m.cpuUsage.WithLabelValues(repoPath, "user") + cpuUserMetric.Set(float64(metrics.CPU.UserUsec)) + ch <- cpuUserMetric + + ch <- prometheus.MustNewConstMetric( + m.cpuCFSPeriods, + prometheus.CounterValue, + float64(metrics.CPU.NrPeriods), + repoPath, + ) + + ch <- prometheus.MustNewConstMetric( + m.cpuCFSThrottledPeriods, + prometheus.CounterValue, + float64(metrics.CPU.NrThrottled), + repoPath, + ) + + ch <- prometheus.MustNewConstMetric( + m.cpuCFSThrottledTime, + prometheus.CounterValue, + float64(metrics.CPU.ThrottledUsec)/float64(time.Second), + repoPath, + ) + + cpuKernelMetric := m.cpuUsage.WithLabelValues(repoPath, "kernel") + cpuKernelMetric.Set(float64(metrics.CPU.SystemUsec)) + ch <- cpuKernelMetric + } + + if subsystems, err := control.Controllers(); err != nil { + logger.WithError(err).Warn("unable to get cgroup hierarchy") + } else { + processes, err := control.Procs(true) + if err != nil { + logger.WithError(err). + Warn("unable to get process list") + return + } + + for _, subsystem := range subsystems { + procsMetric := m.procs.WithLabelValues(repoPath, subsystem) + procsMetric.Set(float64(len(processes))) + ch <- procsMetric + } + } +} + +func defaultSubsystems(root string) ([]cgroup1.Subsystem, error) { + subsystems := []cgroup1.Subsystem{ + cgroup1.NewMemory(root, cgroup1.OptionalSwap()), + cgroup1.NewCpu(root), + } + + return subsystems, nil +} + +func pruneOldCgroupsV1(cfg cgroupscfg.Config, logger log.Logger) { + if err := config.PruneOldGitalyProcessDirectories( + logger, + filepath.Join(cfg.Mountpoint, "memory", + cfg.HierarchyRoot), + ); err != nil { + logger.WithError(err).Error("failed to clean up memory cgroups") + } + + if err := config.PruneOldGitalyProcessDirectories( + logger, + filepath.Join(cfg.Mountpoint, "cpu", + cfg.HierarchyRoot), + ); err != nil { + logger.WithError(err).Error("failed to clean up cpu cgroups") + } +} + +func pruneOldCgroupsV2(cfg cgroupscfg.Config, logger log.Logger) { + if err := config.PruneOldGitalyProcessDirectories( + logger, + filepath.Join(cfg.Mountpoint, cfg.HierarchyRoot), + ); err != nil { + var pathError *fs.PathError + if !errors.As(err, &pathError) { + logger.WithError(err).Error("failed to clean up cpu cgroups") + } + } +} diff --git a/internal/cgroups/handler_linux_test.go b/internal/cgroups/handler_linux_test.go new file mode 100644 index 0000000000000000000000000000000000000000..401151fb3028543882f9bed66734e42719626a7b --- /dev/null +++ b/internal/cgroups/handler_linux_test.go @@ -0,0 +1,898 @@ +//go:build linux + +package cgroups + +import ( + "fmt" + "io/fs" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "testing" + + cgrps "github.com/containerd/cgroups/v3" + "github.com/containerd/cgroups/v3/cgroup1" + "github.com/containerd/cgroups/v3/cgroup2" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/cgroups" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "golang.org/x/exp/slices" +) + +func defaultCgroupsConfig() cgroups.Config { + return cgroups.Config{ + HierarchyRoot: "gitaly", + Repositories: cgroups.Repositories{ + Count: 3, + MemoryBytes: 1024000, + CPUShares: 256, + CPUQuotaUs: 200, + }, + } +} + +func TestNewManager(t *testing.T) { + cfg := cgroups.Config{Repositories: cgroups.Repositories{Count: 10}} + + manager := newCgroupManagerWithMode(cfg, testhelper.SharedLogger(t), 1, cgrps.Legacy) + require.IsType(t, &genericHandler[cgroup1.Cgroup, cgroup1.Hierarchy]{}, manager.handler) + manager = newCgroupManagerWithMode(cfg, testhelper.SharedLogger(t), 1, cgrps.Hybrid) + require.IsType(t, &genericHandler[cgroup1.Cgroup, cgroup1.Hierarchy]{}, manager.handler) + manager = newCgroupManagerWithMode(cfg, testhelper.SharedLogger(t), 1, cgrps.Unified) + require.IsType(t, &genericHandler[*cgroup2.Manager, string]{}, manager.handler) + manager = newCgroupManagerWithMode(cfg, testhelper.SharedLogger(t), 1, cgrps.Unavailable) + require.Nil(t, manager) +} + +type expectedCgroup struct { + wantMemoryBytes int + wantCPUShares int + wantCPUQuotaUs int + wantCFSPeriod int + wantCPUWeight int + wantCPUMax string +} + +func TestSetup_ParentCgroups(t *testing.T) { + tests := []struct { + name string + cfg cgroups.Config + expectedV1 expectedCgroup + expectedV2 expectedCgroup + }{ + { + name: "all config specified", + cfg: cgroups.Config{ + MemoryBytes: 102400, + CPUShares: 256, + CPUQuotaUs: 200, + }, + expectedV1: expectedCgroup{ + wantMemoryBytes: 102400, + wantCPUShares: 256, + wantCPUQuotaUs: 200, + wantCFSPeriod: int(cfsPeriodUs), + }, + expectedV2: expectedCgroup{ + wantMemoryBytes: 102400, + wantCPUWeight: 256, + wantCPUMax: "200 100000", + }, + }, + { + name: "only memory limit set", + cfg: cgroups.Config{ + MemoryBytes: 102400, + }, + expectedV1: expectedCgroup{ + wantMemoryBytes: 102400, + }, + expectedV2: expectedCgroup{ + wantMemoryBytes: 102400, + }, + }, + { + name: "only cpu shares set", + cfg: cgroups.Config{ + CPUShares: 512, + }, + expectedV1: expectedCgroup{ + wantCPUShares: 512, + }, + expectedV2: expectedCgroup{ + wantCPUWeight: 512, + }, + }, + { + name: "only cpu quota set", + cfg: cgroups.Config{ + CPUQuotaUs: 200, + }, + expectedV1: expectedCgroup{ + wantCPUQuotaUs: 200, + wantCFSPeriod: int(cfsPeriodUs), + }, + expectedV2: expectedCgroup{ + wantCPUMax: "200 100000", + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + for _, version := range []int{1, 2} { + version := version + t.Run("cgroups-v"+strconv.Itoa(version), func(t *testing.T) { + t.Parallel() + + mock := newMock(t, version) + + pid := 1 + + cfg := tt.cfg + cfg.HierarchyRoot = "gitaly" + cfg.Mountpoint = mock.rootPath() + + manager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) + mock.setupMockCgroupFiles(t, manager, []uint{}) + + require.False(t, manager.Ready()) + require.NoError(t, manager.Setup()) + require.True(t, manager.Ready()) + + cgroupPath := filepath.Join("gitaly", fmt.Sprintf("gitaly-%d", pid)) + if version == 1 { + requireCgroupComponents(t, version, mock.rootPath(), cgroupPath, tt.expectedV1) + } else { + requireCgroupComponents(t, version, mock.rootPath(), cgroupPath, tt.expectedV2) + } + }) + } + }) + } +} + +func TestRepoCgroups(t *testing.T) { + tests := []struct { + name string + cfg cgroups.Repositories + expectedV1 expectedCgroup + expectedV2 expectedCgroup + }{ + { + name: "all config specified", + cfg: defaultCgroupsConfig().Repositories, + expectedV1: expectedCgroup{ + wantMemoryBytes: 1024000, + wantCPUShares: 256, + wantCPUQuotaUs: 200, + wantCFSPeriod: int(cfsPeriodUs), + }, + expectedV2: expectedCgroup{ + wantMemoryBytes: 1024000, + wantCPUWeight: 256, + wantCPUMax: "200 100000", + }, + }, + { + name: "only memory limit set", + cfg: cgroups.Repositories{ + MemoryBytes: 1024000, + }, + expectedV1: expectedCgroup{ + wantMemoryBytes: 1024000, + }, + expectedV2: expectedCgroup{ + wantMemoryBytes: 1024000, + }, + }, + { + name: "only cpu shares set", + cfg: cgroups.Repositories{ + CPUShares: 512, + }, + expectedV1: expectedCgroup{ + wantCPUShares: 512, + }, + expectedV2: expectedCgroup{ + wantCPUWeight: 512, + }, + }, + { + name: "only cpu quota set", + cfg: cgroups.Repositories{ + CPUQuotaUs: 100, + }, + expectedV1: expectedCgroup{ + wantCPUQuotaUs: 100, + wantCFSPeriod: int(cfsPeriodUs), + }, + expectedV2: expectedCgroup{ + wantCPUMax: "100 100000", + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + for _, version := range []int{1, 2} { + version := version + t.Run("cgroups-v"+strconv.Itoa(version), func(t *testing.T) { + t.Parallel() + + mock := newMock(t, version) + + pid := 1 + cfg := defaultCgroupsConfig() + cfg.Repositories = tt.cfg + cfg.Repositories.Count = 3 + cfg.HierarchyRoot = "gitaly" + cfg.Mountpoint = mock.rootPath() + + manager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) + + // Validate no shards have been created. We deliberately do not call + // `setupMockCgroupFiles()` here to confirm that the cgroup controller + // is creating repository directories in the correct location. + requireShards(t, version, mock, manager, pid) + + groupID := calcGroupID(cmdArgs, cfg.Repositories.Count) + + mock.setupMockCgroupFiles(t, manager, []uint{groupID}) + + require.False(t, manager.Ready()) + require.NoError(t, manager.Setup()) + require.True(t, manager.Ready()) + + ctx := testhelper.Context(t) + + // Create a command to force Gitaly to create the repo cgroup. + cmd := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...) + require.NoError(t, cmd.Run()) + _, err := manager.AddCommand(cmd) + require.NoError(t, err) + + requireShards(t, version, mock, manager, pid, groupID) + + var expected expectedCgroup + if version == 1 { + expected = tt.expectedV1 + } else { + expected = tt.expectedV2 + } + + for shard := uint(0); shard < cfg.Repositories.Count; shard++ { + // The negative case where no directory should exist is asserted + // by `requireShards()`. + if shard == groupID { + cgRelPath := filepath.Join( + "gitaly", fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", shard), + ) + + requireCgroupComponents(t, version, mock.rootPath(), cgRelPath, expected) + } + } + }) + } + }) + } +} + +func TestAddCommand(t *testing.T) { + for _, version := range []int{1, 2} { + t.Run("cgroups-v"+strconv.Itoa(version), func(t *testing.T) { + mock := newMock(t, version) + + config := defaultCgroupsConfig() + config.Repositories.Count = 10 + config.Repositories.MemoryBytes = 1024 + config.Repositories.CPUShares = 16 + config.HierarchyRoot = "gitaly" + config.Mountpoint = mock.rootPath() + + pid := 1 + manager1 := mock.newCgroupManager(config, testhelper.SharedLogger(t), pid) + mock.setupMockCgroupFiles(t, manager1, []uint{}) + require.NoError(t, manager1.Setup()) + + ctx := testhelper.Context(t) + + cmd2 := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...) + require.NoError(t, cmd2.Run()) + + groupID := calcGroupID(cmd2.Args, config.Repositories.Count) + + manager2 := mock.newCgroupManager(config, testhelper.SharedLogger(t), pid) + + t.Run("without overridden key", func(t *testing.T) { + _, err := manager2.AddCommand(cmd2) + require.NoError(t, err) + requireShards(t, version, mock, manager2, pid, groupID) + + for _, path := range mock.repoPaths(pid, groupID) { + procsPath := filepath.Join(path, "cgroup.procs") + content := readCgroupFile(t, procsPath) + + cmdPid, err := strconv.Atoi(string(content)) + require.NoError(t, err) + + require.Equal(t, cmd2.Process.Pid, cmdPid) + } + }) + + t.Run("with overridden key", func(t *testing.T) { + overriddenGroupID := calcGroupID([]string{"foobar"}, config.Repositories.Count) + + _, err := manager2.AddCommand(cmd2, WithCgroupKey("foobar")) + require.NoError(t, err) + requireShards(t, version, mock, manager2, pid, groupID, overriddenGroupID) + + for _, path := range mock.repoPaths(pid, overriddenGroupID) { + procsPath := filepath.Join(path, "cgroup.procs") + content := readCgroupFile(t, procsPath) + + cmdPid, err := strconv.Atoi(string(content)) + require.NoError(t, err) + + require.Equal(t, cmd2.Process.Pid, cmdPid) + } + }) + }) + } +} + +func TestCleanup(t *testing.T) { + for _, version := range []int{1, 2} { + t.Run("cgroups-v"+strconv.Itoa(version), func(t *testing.T) { + mock := newMock(t, version) + + pid := 1 + cfg := defaultCgroupsConfig() + cfg.Mountpoint = mock.rootPath() + + manager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) + mock.setupMockCgroupFiles(t, manager, []uint{0, 1, 2}) + + require.NoError(t, manager.Setup()) + require.NoError(t, manager.Cleanup()) + + for i := uint(0); i < 3; i++ { + for _, path := range mock.repoPaths(pid, i) { + require.NoDirExists(t, path) + } + } + }) + } +} + +func TestMetrics(t *testing.T) { + tests := []struct { + name string + metricsEnabled bool + pid int + expectV1 string + expectV2 string + }{ + { + name: "metrics enabled: true", + metricsEnabled: true, + pid: 1, + expectV1: `# HELP gitaly_cgroup_cpu_usage_total CPU Usage of Cgroup +# TYPE gitaly_cgroup_cpu_usage_total gauge +gitaly_cgroup_cpu_usage_total{path="%s",type="kernel"} 0 +gitaly_cgroup_cpu_usage_total{path="%s",type="user"} 0 +# HELP gitaly_cgroup_memory_reclaim_attempts_total Number of memory usage hits limits +# TYPE gitaly_cgroup_memory_reclaim_attempts_total gauge +gitaly_cgroup_memory_reclaim_attempts_total{path="%s"} 2 +# HELP gitaly_cgroup_procs_total Total number of procs +# TYPE gitaly_cgroup_procs_total gauge +gitaly_cgroup_procs_total{path="%s",subsystem="cpu"} 1 +gitaly_cgroup_procs_total{path="%s",subsystem="memory"} 1 +# HELP gitaly_cgroup_cpu_cfs_periods_total Number of elapsed enforcement period intervals +# TYPE gitaly_cgroup_cpu_cfs_periods_total counter +gitaly_cgroup_cpu_cfs_periods_total{path="%s"} 10 +# HELP gitaly_cgroup_cpu_cfs_throttled_periods_total Number of throttled period intervals +# TYPE gitaly_cgroup_cpu_cfs_throttled_periods_total counter +gitaly_cgroup_cpu_cfs_throttled_periods_total{path="%s"} 20 +# HELP gitaly_cgroup_cpu_cfs_throttled_seconds_total Total time duration the Cgroup has been throttled +# TYPE gitaly_cgroup_cpu_cfs_throttled_seconds_total counter +gitaly_cgroup_cpu_cfs_throttled_seconds_total{path="%s"} 0.001 +`, + expectV2: `# HELP gitaly_cgroup_cpu_cfs_periods_total Number of elapsed enforcement period intervals +# TYPE gitaly_cgroup_cpu_cfs_periods_total counter +gitaly_cgroup_cpu_cfs_periods_total{path="%s"} 10 +# HELP gitaly_cgroup_cpu_cfs_throttled_periods_total Number of throttled period intervals +# TYPE gitaly_cgroup_cpu_cfs_throttled_periods_total counter +gitaly_cgroup_cpu_cfs_throttled_periods_total{path="%s"} 20 +# HELP gitaly_cgroup_cpu_cfs_throttled_seconds_total Total time duration the Cgroup has been throttled +# TYPE gitaly_cgroup_cpu_cfs_throttled_seconds_total counter +gitaly_cgroup_cpu_cfs_throttled_seconds_total{path="%s"} 0.001 +# HELP gitaly_cgroup_cpu_usage_total CPU Usage of Cgroup +# TYPE gitaly_cgroup_cpu_usage_total gauge +gitaly_cgroup_cpu_usage_total{path="%s",type="kernel"} 0 +gitaly_cgroup_cpu_usage_total{path="%s",type="user"} 0 +# HELP gitaly_cgroup_procs_total Total number of procs +# TYPE gitaly_cgroup_procs_total gauge +gitaly_cgroup_procs_total{path="%s",subsystem="cpu"} 1 +gitaly_cgroup_procs_total{path="%s",subsystem="cpuset"} 1 +gitaly_cgroup_procs_total{path="%s",subsystem="memory"} 1 +`, + }, + { + name: "metrics enabled: false", + metricsEnabled: false, + pid: 2, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + for _, version := range []int{1, 2} { + version := version + t.Run("cgroups-v"+strconv.Itoa(version), func(t *testing.T) { + t.Parallel() + mock := newMock(t, version) + + config := defaultCgroupsConfig() + config.Repositories.Count = 1 + config.Repositories.MemoryBytes = 1048576 + config.Repositories.CPUShares = 16 + config.Mountpoint = mock.rootPath() + config.MetricsEnabled = tt.metricsEnabled + + manager1 := mock.newCgroupManager(config, testhelper.SharedLogger(t), tt.pid) + + var mockFiles []mockCgroupFile + if version == 1 { + mockFiles = append(mockFiles, mockCgroupFile{"memory.failcnt", "2"}) + } + mock.setupMockCgroupFiles(t, manager1, []uint{0}, mockFiles...) + require.NoError(t, manager1.Setup()) + + ctx := testhelper.Context(t) + + cmd := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...) + require.NoError(t, cmd.Start()) + _, err := manager1.AddCommand(cmd) + require.NoError(t, err) + + gitCmd1 := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...) + require.NoError(t, gitCmd1.Start()) + _, err = manager1.AddCommand(gitCmd1) + require.NoError(t, err) + + gitCmd2 := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...) + require.NoError(t, gitCmd2.Start()) + _, err = manager1.AddCommand(gitCmd2) + require.NoError(t, err) + defer func() { + require.NoError(t, gitCmd2.Wait()) + }() + + require.NoError(t, cmd.Wait()) + require.NoError(t, gitCmd1.Wait()) + + repoCgroupPath := filepath.Join(manager1.currentProcessCgroup(), "repos-0") + + var expected *strings.Reader + if version == 1 { + expected = strings.NewReader(strings.ReplaceAll(tt.expectV1, "%s", repoCgroupPath)) + } else { + expected = strings.NewReader(strings.ReplaceAll(tt.expectV2, "%s", repoCgroupPath)) + } + assert.NoError(t, testutil.CollectAndCompare(manager1, expected)) + }) + } + }) + } +} + +func TestPruneOldCgroups(t *testing.T) { + t.Parallel() + + testCases := []struct { + desc string + cfg cgroups.Config + expectedPruned bool + // setup returns a pid + setup func(t *testing.T, cfg cgroups.Config, mock mockCgroup) int + }{ + { + desc: "process belongs to another user", + cfg: cgroups.Config{ + HierarchyRoot: "gitaly", + Repositories: cgroups.Repositories{ + Count: 10, + MemoryBytes: 10 * 1024 * 1024, + CPUShares: 1024, + }, + }, + setup: func(t *testing.T, cfg cgroups.Config, mock mockCgroup) int { + pid := 1 + cgroupManager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) + mock.setupMockCgroupFiles(t, cgroupManager, []uint{0, 1, 2}) + require.NoError(t, cgroupManager.Setup()) + + return pid + }, + expectedPruned: true, + }, + { + desc: "no hierarchy root", + cfg: cgroups.Config{ + HierarchyRoot: "", + Repositories: cgroups.Repositories{ + Count: 10, + MemoryBytes: 10 * 1024 * 1024, + CPUShares: 1024, + }, + }, + setup: func(t *testing.T, cfg cgroups.Config, mock mockCgroup) int { + pid := 1 + cgroupManager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) + mock.setupMockCgroupFiles(t, cgroupManager, []uint{0, 1, 2}) + require.NoError(t, cgroupManager.Setup()) + return 1 + }, + expectedPruned: false, + }, + { + desc: "pid of finished process", + cfg: cgroups.Config{ + HierarchyRoot: "gitaly", + Repositories: cgroups.Repositories{ + Count: 10, + MemoryBytes: 10 * 1024 * 1024, + CPUShares: 1024, + }, + }, + setup: func(t *testing.T, cfg cgroups.Config, mock mockCgroup) int { + cmd := exec.Command("ls") + require.NoError(t, cmd.Run()) + pid := cmd.Process.Pid + + cgroupManager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) + mock.setupMockCgroupFiles(t, cgroupManager, []uint{0, 1, 2}) + + if mock.version() == 1 { + memoryRoot := filepath.Join( + cfg.Mountpoint, + "memory", + cfg.HierarchyRoot, + "memory.limit_in_bytes", + ) + require.NoError(t, os.WriteFile(memoryRoot, []byte{}, fs.ModeAppend)) + } else { + require.NoError(t, cgroupManager.Setup()) + + memoryFile := filepath.Join( + cfg.Mountpoint, + cfg.HierarchyRoot, + "memory.limit_in_bytes", + ) + require.NoError(t, os.WriteFile(memoryFile, []byte{}, fs.ModeAppend)) + } + + return pid + }, + expectedPruned: true, + }, + { + desc: "pid of running process", + cfg: cgroups.Config{ + HierarchyRoot: "gitaly", + Repositories: cgroups.Repositories{ + Count: 10, + MemoryBytes: 10 * 1024 * 1024, + CPUShares: 1024, + }, + }, + setup: func(t *testing.T, cfg cgroups.Config, mock mockCgroup) int { + pid := os.Getpid() + + cgroupManager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) + mock.setupMockCgroupFiles(t, cgroupManager, []uint{0, 1, 2}) + require.NoError(t, cgroupManager.Setup()) + + return pid + }, + expectedPruned: false, + }, + { + desc: "gitaly-0 directory is deleted", + cfg: cgroups.Config{ + HierarchyRoot: "gitaly", + Repositories: cgroups.Repositories{ + Count: 10, + MemoryBytes: 10 * 1024 * 1024, + CPUShares: 1024, + }, + }, + setup: func(t *testing.T, cfg cgroups.Config, mock mockCgroup) int { + cgroupManager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), 0) + mock.setupMockCgroupFiles(t, cgroupManager, []uint{0, 1, 2}) + require.NoError(t, cgroupManager.Setup()) + + return 0 + }, + expectedPruned: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + t.Run("cgroups-v1", func(t *testing.T) { + mock := newMock(t, 1) + tc.cfg.Mountpoint = mock.rootPath() + + memoryRoot := filepath.Join( + tc.cfg.Mountpoint, + "memory", + tc.cfg.HierarchyRoot, + ) + cpuRoot := filepath.Join( + tc.cfg.Mountpoint, + "cpu", + tc.cfg.HierarchyRoot, + ) + + require.NoError(t, os.MkdirAll(cpuRoot, perm.PublicDir)) + require.NoError(t, os.MkdirAll(memoryRoot, perm.PublicDir)) + + pid := tc.setup(t, tc.cfg, mock) + + logger := testhelper.NewLogger(t) + hook := testhelper.AddLoggerHook(logger) + + mock.pruneOldCgroups(tc.cfg, logger) + + // create cgroups directories with a different pid + oldGitalyProcessMemoryDir := filepath.Join( + memoryRoot, + fmt.Sprintf("gitaly-%d", pid), + ) + oldGitalyProcesssCPUDir := filepath.Join( + cpuRoot, + fmt.Sprintf("gitaly-%d", pid), + ) + + if tc.expectedPruned { + require.NoDirExists(t, oldGitalyProcessMemoryDir) + require.NoDirExists(t, oldGitalyProcesssCPUDir) + } else { + require.DirExists(t, oldGitalyProcessMemoryDir) + require.DirExists(t, oldGitalyProcesssCPUDir) + require.Empty(t, hook.AllEntries()) + } + }) + + t.Run("cgroups-v2", func(t *testing.T) { + mock := newMock(t, 2) + tc.cfg.Mountpoint = mock.rootPath() + + root := filepath.Join( + tc.cfg.Mountpoint, + tc.cfg.HierarchyRoot, + ) + require.NoError(t, os.MkdirAll(root, perm.PublicDir)) + + pid := tc.setup(t, tc.cfg, mock) + + logger := testhelper.NewLogger(t) + mock.pruneOldCgroups(tc.cfg, logger) + + // create cgroups directories with a different pid + oldGitalyProcessDir := filepath.Join( + root, + fmt.Sprintf("gitaly-%d", pid), + ) + + if tc.expectedPruned { + require.NoDirExists(t, oldGitalyProcessDir) + } else { + require.DirExists(t, oldGitalyProcessDir) + } + }) + }) + } +} + +func TestStats(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + version int + mockFiles []mockCgroupFile + expectedStats Stats + }{ + { + desc: "empty statistics", + version: 1, + mockFiles: []mockCgroupFile{ + {"memory.limit_in_bytes", "0"}, + {"memory.usage_in_bytes", "0"}, + {"memory.oom_control", ""}, + {"cpu.stat", ""}, + }, + expectedStats: Stats{}, + }, + { + desc: "cgroupfs recorded some stats", + version: 1, + mockFiles: []mockCgroupFile{ + {"memory.limit_in_bytes", "2000000000"}, + {"memory.usage_in_bytes", "1234000000"}, + {"memory.oom_control", `oom_kill_disable 1 +under_oom 1 +oom_kill 3`}, + {"cpu.stat", `nr_periods 10 +nr_throttled 50 +throttled_time 1000000`}, // 0.001 seconds + }, + expectedStats: Stats{ + ParentStats: CgroupStats{ + CPUThrottledCount: 50, + CPUThrottledDuration: 0.001, + MemoryUsage: 1234000000, + MemoryLimit: 2000000000, + OOMKills: 3, + UnderOOM: true, + }, + }, + }, + { + desc: "empty statistics", + version: 2, + mockFiles: []mockCgroupFile{ + {"memory.current", "0"}, + {"memory.max", "0"}, + {"cpu.stat", ""}, + }, + expectedStats: Stats{}, + }, + { + desc: "cgroupfs recorded some stats", + version: 2, + mockFiles: []mockCgroupFile{ + {"memory.max", "2000000000"}, + {"memory.current", "1234000000"}, + {"memory.events", `low 1 +high 2 +max 3 +oom 4 +oom_kill 5`}, + {"nr_throttled", "50"}, + {"throttled_usec", "1000000"}, + {"cpu.stat", `nr_periods 10 +nr_throttled 50 +throttled_usec 1000000`}, // 0.001 seconds + }, + expectedStats: Stats{ + ParentStats: CgroupStats{ + CPUThrottledCount: 50, + CPUThrottledDuration: 0.001, + MemoryUsage: 1234000000, + MemoryLimit: 2000000000, + OOMKills: 5, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + mock := newMock(t, tc.version) + + config := defaultCgroupsConfig() + config.Repositories.Count = 1 + config.Repositories.MemoryBytes = 2000000000 + config.Repositories.CPUShares = 16 + config.Mountpoint = mock.rootPath() + + manager := mock.newCgroupManager(config, testhelper.SharedLogger(t), 1) + + mock.setupMockCgroupFiles(t, manager, []uint{0}, tc.mockFiles...) + require.NoError(t, manager.Setup()) + + stats, err := manager.Stats() + require.NoError(t, err) + require.Equal(t, tc.expectedStats, stats) + }) + } +} + +func requireCgroupComponents(t *testing.T, version int, root string, cgroupPath string, expected expectedCgroup) { + t.Helper() + + if version == 1 { + memoryLimitPath := filepath.Join(root, "memory", cgroupPath, "memory.limit_in_bytes") + requireCgroupWithInt(t, memoryLimitPath, expected.wantMemoryBytes) + + cpuSharesPath := filepath.Join(root, "cpu", cgroupPath, "cpu.shares") + requireCgroupWithInt(t, cpuSharesPath, expected.wantCPUShares) + + cpuCFSQuotaPath := filepath.Join(root, "cpu", cgroupPath, "cpu.cfs_quota_us") + requireCgroupWithInt(t, cpuCFSQuotaPath, expected.wantCPUQuotaUs) + + cpuCFSPeriodPath := filepath.Join(root, "cpu", cgroupPath, "cpu.cfs_period_us") + requireCgroupWithInt(t, cpuCFSPeriodPath, expected.wantCFSPeriod) + } else { + memoryMaxPath := filepath.Join(root, cgroupPath, "memory.max") + requireCgroupWithInt(t, memoryMaxPath, expected.wantMemoryBytes) + + cpuWeightPath := filepath.Join(root, cgroupPath, "cpu.weight") + requireCgroupWithInt(t, cpuWeightPath, calculateWantCPUWeight(expected.wantCPUWeight)) + + cpuMaxPath := filepath.Join(root, cgroupPath, "cpu.max") + requireCgroupWithString(t, cpuMaxPath, expected.wantCPUMax) + } +} + +func readCgroupFile(t *testing.T, path string) []byte { + t.Helper() + + // The cgroups package defaults to permission 0 as it expects the file to be existing (the kernel creates the file) + // and its testing override the permission private variable to something sensible, hence we have to chmod ourselves + // so we can read the file. + require.NoError(t, os.Chmod(path, perm.PublicFile)) + + return testhelper.MustReadFile(t, path) +} + +func requireCgroupWithInt(t *testing.T, cgroupFile string, want int) { + t.Helper() + + if want <= 0 { + return + } + + require.Equal(t, + string(readCgroupFile(t, cgroupFile)), + strconv.Itoa(want), + ) +} + +func requireCgroupWithString(t *testing.T, cgroupFile string, want string) { + t.Helper() + + if want == "" { + return + } + require.Equal(t, + string(readCgroupFile(t, cgroupFile)), + want, + ) +} + +func calculateWantCPUWeight(wantCPUWeight int) int { + if wantCPUWeight == 0 { + return 0 + } + return 1 + ((wantCPUWeight-2)*9999)/262142 +} + +func requireShards(t *testing.T, version int, mock mockCgroup, mgr *CGroupManager, pid int, expectedShards ...uint) { + for shard := uint(0); shard < mgr.cfg.Repositories.Count; shard++ { + shouldExist := slices.Contains(expectedShards, shard) + + cgroupPath := filepath.Join("gitaly", + fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", shard)) + cgLock := mgr.status.getLock(cgroupPath) + require.Equal(t, shouldExist, cgLock.isCreated()) + + for _, diskPath := range mock.repoPaths(pid, shard) { + if shouldExist { + require.DirExists(t, diskPath) + } else { + require.NoDirExists(t, diskPath) + } + } + } +} diff --git a/internal/cgroups/manager_linux.go b/internal/cgroups/manager_linux.go index f55a7c58f69911bd0923ac61c45fa8a70cc8a8f6..da10c15d51ea4a64b509bb5f04ec2f8b0de899f0 100644 --- a/internal/cgroups/manager_linux.go +++ b/internal/cgroups/manager_linux.go @@ -92,9 +92,9 @@ func newCgroupManagerWithMode(cfg cgroupscfg.Config, logger log.Logger, pid int, var handler cgroupHandler switch mode { case cgrps.Legacy, cgrps.Hybrid: - handler = newV1Handler(cfg, logger, pid) + handler = newV1GenericHandler(cfg, logger, pid) case cgrps.Unified: - handler = newV2Handler(cfg, logger, pid) + handler = newV2GenericHandler(cfg, logger, pid) logger.Warn("Gitaly now includes experimental support for CgroupV2. Please proceed with caution and use this experimental feature at your own risk") default: logger.Warn("Gitaly has encountered an issue while trying to detect the version of the system's cgroup. As a result, all subsequent commands will be executed without cgroup support. Please check the system's cgroup configuration and try again") diff --git a/internal/cgroups/mock_linux_test.go b/internal/cgroups/mock_linux_test.go index ac54418a0b0838b49caed4b831840f7d7c5c4262..16f7c19004977c34dfd1d9b06ba61edf5d8e9624 100644 --- a/internal/cgroups/mock_linux_test.go +++ b/internal/cgroups/mock_linux_test.go @@ -35,12 +35,41 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" ) -type mockCgroup struct { +type mockCgroup interface { + setupMockCgroupFiles( + t *testing.T, + manager *CGroupManager, + shards []uint, + inputContent ...mockCgroupFile, + ) + newCgroupManager(cfg cgroupscfg.Config, logger log.Logger, pid int) *CGroupManager + pruneOldCgroups(cfg cgroupscfg.Config, logger log.Logger) + // rootPath returns the mock's root directory. + rootPath() string + // repoPaths returns the full disk path for each subcomponent, e.g. memory, cpu, of + // a repository cgroup. On v2 this is a single path. + repoPaths(pid int, repoID uint) []string + // version returns the cgroup version number, 1 or 2. + version() int +} + +func newMock(t *testing.T, version int) mockCgroup { + var mock mockCgroup + if version == 1 { + mock = newMockV1(t) + } else { + mock = newMockV2(t) + } + + return mock +} + +type mockCgroupV1 struct { root string subsystems []cgroup1.Subsystem } -func newMock(t *testing.T) *mockCgroup { +func newMockV1(t *testing.T) *mockCgroupV1 { t.Helper() root := testhelper.TempDir(t) @@ -52,7 +81,7 @@ func newMock(t *testing.T) *mockCgroup { require.NoError(t, os.MkdirAll(filepath.Join(root, string(s.Name())), perm.SharedDir)) } - return &mockCgroup{ + return &mockCgroupV1{ root: root, subsystems: subsystems, } @@ -93,7 +122,7 @@ nr_throttled 20 throttled_time 1000000`, } -func (m *mockCgroup) setupMockCgroupFiles( +func (m *mockCgroupV1) setupMockCgroupFiles( t *testing.T, manager *CGroupManager, shards []uint, @@ -142,14 +171,34 @@ func (m *mockCgroup) setupMockCgroupFiles( } } -func (m *mockCgroup) newCgroupManager(cfg cgroupscfg.Config, logger log.Logger, pid int) *CGroupManager { +func (m *mockCgroupV1) newCgroupManager(cfg cgroupscfg.Config, logger log.Logger, pid int) *CGroupManager { return newCgroupManagerWithMode(cfg, logger, pid, cgrps.Legacy) } -func (m *mockCgroup) pruneOldCgroups(cfg cgroupscfg.Config, logger log.Logger) { +func (m *mockCgroupV1) pruneOldCgroups(cfg cgroupscfg.Config, logger log.Logger) { pruneOldCgroupsWithMode(cfg, logger, cgrps.Legacy) } +func (m *mockCgroupV1) rootPath() string { + return m.root +} + +func (m *mockCgroupV1) repoPaths(pid int, repoID uint) []string { + paths := make([]string, 0, len(m.subsystems)) + + for _, s := range m.subsystems { + path := filepath.Join(m.root, string(s.Name()), "gitaly", + fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", repoID)) + paths = append(paths, path) + } + + return paths +} + +func (m *mockCgroupV1) version() int { + return 1 +} + type mockCgroupV2 struct { root string } @@ -221,3 +270,18 @@ func (m *mockCgroupV2) newCgroupManager(cfg cgroupscfg.Config, logger log.Logger func (m *mockCgroupV2) pruneOldCgroups(cfg cgroupscfg.Config, logger log.Logger) { pruneOldCgroupsWithMode(cfg, logger, cgrps.Unified) } + +func (m *mockCgroupV2) rootPath() string { + return m.root +} + +func (m *mockCgroupV2) repoPaths(pid int, repoID uint) []string { + path := filepath.Join(m.root, "gitaly", + fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", repoID)) + + return []string{path} +} + +func (m *mockCgroupV2) version() int { + return 2 +} diff --git a/internal/cgroups/v1_linux.go b/internal/cgroups/v1_linux.go deleted file mode 100644 index 53f6cb5a432516539881b86f364f1e442871e766..0000000000000000000000000000000000000000 --- a/internal/cgroups/v1_linux.go +++ /dev/null @@ -1,237 +0,0 @@ -//go:build linux - -package cgroups - -import ( - "fmt" - "path/filepath" - "strings" - "time" - - "github.com/containerd/cgroups/v3/cgroup1" - specs "github.com/opencontainers/runtime-spec/specs-go" - "github.com/prometheus/client_golang/prometheus" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" - cgroupscfg "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/cgroups" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" -) - -type cgroupV1Handler struct { - cfg cgroupscfg.Config - logger log.Logger - hierarchy func() ([]cgroup1.Subsystem, error) - - *cgroupsMetrics - pid int -} - -func newV1Handler(cfg cgroupscfg.Config, logger log.Logger, pid int) *cgroupV1Handler { - return &cgroupV1Handler{ - cfg: cfg, - logger: logger, - pid: pid, - hierarchy: func() ([]cgroup1.Subsystem, error) { - return defaultSubsystems(cfg.Mountpoint) - }, - cgroupsMetrics: newV1CgroupsMetrics(), - } -} - -func (cvh *cgroupV1Handler) setupParent(parentResources *specs.LinuxResources) error { - if _, err := cgroup1.New( - cgroup1.StaticPath(cvh.currentProcessCgroup()), - parentResources, - cgroup1.WithHiearchy(cvh.hierarchy), - ); err != nil { - return fmt.Errorf("failed creating parent cgroup: %w", err) - } - return nil -} - -func (cvh *cgroupV1Handler) createCgroup(reposResources *specs.LinuxResources, cgroupPath string) error { - _, err := cgroup1.New( - cgroup1.StaticPath(cgroupPath), - reposResources, - cgroup1.WithHiearchy(cvh.hierarchy), - ) - - return err -} - -func (cvh *cgroupV1Handler) addToCgroup(pid int, cgroupPath string) error { - control, err := cvh.loadCgroup(cgroupPath) - if err != nil { - return err - } - - if err := control.Add(cgroup1.Process{Pid: pid}); err != nil { - // Command could finish so quickly before we can add it to a cgroup, so - // we don't consider it an error. - if strings.Contains(err.Error(), "no such process") { - return nil - } - return fmt.Errorf("failed adding process to cgroup: %w", err) - } - - return nil -} - -func (cvh *cgroupV1Handler) loadCgroup(cgroupPath string) (cgroup1.Cgroup, error) { - control, err := cgroup1.Load( - cgroup1.StaticPath(cgroupPath), - cgroup1.WithHiearchy(cvh.hierarchy), - ) - if err != nil { - return nil, fmt.Errorf("failed loading %s cgroup: %w", cgroupPath, err) - } - return control, nil -} - -func (cvh *cgroupV1Handler) collect(repoPath string, ch chan<- prometheus.Metric) { - logger := cvh.logger.WithField("cgroup_path", repoPath) - control, err := cvh.loadCgroup(repoPath) - if err != nil { - logger.WithError(err).Warn("unable to load cgroup controller") - return - } - - if metrics, err := control.Stat(); err != nil { - logger.WithError(err).Warn("unable to get cgroup stats") - } else { - memoryMetric := cvh.memoryReclaimAttemptsTotal.WithLabelValues(repoPath) - memoryMetric.Set(float64(metrics.Memory.Usage.Failcnt)) - ch <- memoryMetric - - cpuUserMetric := cvh.cpuUsage.WithLabelValues(repoPath, "user") - cpuUserMetric.Set(float64(metrics.CPU.Usage.User)) - ch <- cpuUserMetric - - ch <- prometheus.MustNewConstMetric( - cvh.cpuCFSPeriods, - prometheus.CounterValue, - float64(metrics.CPU.Throttling.Periods), - repoPath, - ) - - ch <- prometheus.MustNewConstMetric( - cvh.cpuCFSThrottledPeriods, - prometheus.CounterValue, - float64(metrics.CPU.Throttling.ThrottledPeriods), - repoPath, - ) - - ch <- prometheus.MustNewConstMetric( - cvh.cpuCFSThrottledTime, - prometheus.CounterValue, - float64(metrics.CPU.Throttling.ThrottledTime)/float64(time.Second), - repoPath, - ) - - cpuKernelMetric := cvh.cpuUsage.WithLabelValues(repoPath, "kernel") - cpuKernelMetric.Set(float64(metrics.CPU.Usage.Kernel)) - ch <- cpuKernelMetric - } - - if subsystems, err := cvh.hierarchy(); err != nil { - logger.WithError(err).Warn("unable to get cgroup hierarchy") - } else { - for _, subsystem := range subsystems { - processes, err := control.Processes(subsystem.Name(), true) - if err != nil { - logger.WithField("subsystem", subsystem.Name()). - WithError(err). - Warn("unable to get process list") - continue - } - - procsMetric := cvh.procs.WithLabelValues(repoPath, string(subsystem.Name())) - procsMetric.Set(float64(len(processes))) - ch <- procsMetric - } - } -} - -func (cvh *cgroupV1Handler) cleanup() error { - processCgroupPath := cvh.currentProcessCgroup() - - control, err := cvh.loadCgroup(processCgroupPath) - if err != nil { - return err - } - - if err := control.Delete(); err != nil { - return fmt.Errorf("failed cleaning up cgroup %s: %w", processCgroupPath, err) - } - - return nil -} - -func (cvh *cgroupV1Handler) repoPath(groupID int) string { - return filepath.Join(cvh.currentProcessCgroup(), fmt.Sprintf("repos-%d", groupID)) -} - -func (cvh *cgroupV1Handler) currentProcessCgroup() string { - return config.GetGitalyProcessTempDir(cvh.cfg.HierarchyRoot, cvh.pid) -} - -func (cvh *cgroupV1Handler) stats() (Stats, error) { - processCgroupPath := cvh.currentProcessCgroup() - - control, err := cvh.loadCgroup(processCgroupPath) - if err != nil { - return Stats{}, err - } - - metrics, err := control.Stat() - if err != nil { - return Stats{}, fmt.Errorf("failed to fetch metrics %s: %w", processCgroupPath, err) - } - - return Stats{ - ParentStats: CgroupStats{ - CPUThrottledCount: metrics.CPU.Throttling.ThrottledPeriods, - CPUThrottledDuration: float64(metrics.CPU.Throttling.ThrottledTime) / float64(time.Second), - MemoryUsage: metrics.Memory.Usage.Usage, - MemoryLimit: metrics.Memory.Usage.Limit, - OOMKills: metrics.MemoryOomControl.OomKill, - UnderOOM: metrics.MemoryOomControl.UnderOom != 0, - Anon: metrics.Memory.RSS, - ActiveAnon: metrics.Memory.ActiveAnon, - InactiveAnon: metrics.Memory.InactiveAnon, - File: metrics.Memory.Cache, - ActiveFile: metrics.Memory.ActiveFile, - InactiveFile: metrics.Memory.InactiveFile, - }, - }, nil -} - -func (cvh *cgroupV1Handler) supportsCloneIntoCgroup() bool { - return false -} - -func defaultSubsystems(root string) ([]cgroup1.Subsystem, error) { - subsystems := []cgroup1.Subsystem{ - cgroup1.NewMemory(root, cgroup1.OptionalSwap()), - cgroup1.NewCpu(root), - } - - return subsystems, nil -} - -func pruneOldCgroupsV1(cfg cgroupscfg.Config, logger log.Logger) { - if err := config.PruneOldGitalyProcessDirectories( - logger, - filepath.Join(cfg.Mountpoint, "memory", - cfg.HierarchyRoot), - ); err != nil { - logger.WithError(err).Error("failed to clean up memory cgroups") - } - - if err := config.PruneOldGitalyProcessDirectories( - logger, - filepath.Join(cfg.Mountpoint, "cpu", - cfg.HierarchyRoot), - ); err != nil { - logger.WithError(err).Error("failed to clean up cpu cgroups") - } -} diff --git a/internal/cgroups/v1_linux_test.go b/internal/cgroups/v1_linux_test.go deleted file mode 100644 index 38d81293a438a89baf45754c48ec513515e3db6e..0000000000000000000000000000000000000000 --- a/internal/cgroups/v1_linux_test.go +++ /dev/null @@ -1,713 +0,0 @@ -//go:build linux - -package cgroups - -import ( - "fmt" - "io/fs" - "os" - "os/exec" - "path/filepath" - "strconv" - "strings" - "testing" - - cgrps "github.com/containerd/cgroups/v3" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/cgroups" - "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" - "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" - "golang.org/x/exp/slices" -) - -func defaultCgroupsConfig() cgroups.Config { - return cgroups.Config{ - HierarchyRoot: "gitaly", - Repositories: cgroups.Repositories{ - Count: 3, - MemoryBytes: 1024000, - CPUShares: 256, - CPUQuotaUs: 200, - }, - } -} - -func TestNewManagerV1(t *testing.T) { - cfg := cgroups.Config{Repositories: cgroups.Repositories{Count: 10}} - - manager := newCgroupManagerWithMode(cfg, testhelper.SharedLogger(t), 1, cgrps.Legacy) - require.IsType(t, &cgroupV1Handler{}, manager.handler) - manager = newCgroupManagerWithMode(cfg, testhelper.SharedLogger(t), 1, cgrps.Hybrid) - require.IsType(t, &cgroupV1Handler{}, manager.handler) - manager = newCgroupManagerWithMode(cfg, testhelper.SharedLogger(t), 1, cgrps.Unavailable) - require.Nil(t, manager) -} - -func TestSetup_ParentCgroups(t *testing.T) { - tests := []struct { - name string - cfg cgroups.Config - wantMemoryBytes int - wantCPUShares int - wantCPUQuotaUs int - wantCFSPeriod int - }{ - { - name: "all config specified", - cfg: cgroups.Config{ - MemoryBytes: 102400, - CPUShares: 256, - CPUQuotaUs: 200, - }, - wantMemoryBytes: 102400, - wantCPUShares: 256, - wantCPUQuotaUs: 200, - wantCFSPeriod: int(cfsPeriodUs), - }, - { - name: "only memory limit set", - cfg: cgroups.Config{ - MemoryBytes: 102400, - }, - wantMemoryBytes: 102400, - }, - { - name: "only cpu shares set", - cfg: cgroups.Config{ - CPUShares: 512, - }, - wantCPUShares: 512, - }, - { - name: "only cpu quota set", - cfg: cgroups.Config{ - CPUQuotaUs: 200, - }, - wantCPUQuotaUs: 200, - wantCFSPeriod: int(cfsPeriodUs), - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - mock := newMock(t) - pid := 1 - tt.cfg.HierarchyRoot = "gitaly" - tt.cfg.Mountpoint = mock.root - - v1Manager := mock.newCgroupManager(tt.cfg, testhelper.SharedLogger(t), pid) - require.False(t, v1Manager.Ready()) - require.NoError(t, v1Manager.Setup()) - require.True(t, v1Manager.Ready()) - - memoryLimitPath := filepath.Join( - mock.root, "memory", "gitaly", fmt.Sprintf("gitaly-%d", pid), "memory.limit_in_bytes", - ) - requireCgroup(t, memoryLimitPath, tt.wantMemoryBytes) - - cpuSharesPath := filepath.Join( - mock.root, "cpu", "gitaly", fmt.Sprintf("gitaly-%d", pid), "cpu.shares", - ) - requireCgroup(t, cpuSharesPath, tt.wantCPUShares) - - cpuCFSQuotaPath := filepath.Join( - mock.root, "cpu", "gitaly", fmt.Sprintf("gitaly-%d", pid), "cpu.cfs_quota_us", - ) - requireCgroup(t, cpuCFSQuotaPath, tt.wantCPUQuotaUs) - - cpuCFSPeriodPath := filepath.Join( - mock.root, "cpu", "gitaly", fmt.Sprintf("gitaly-%d", pid), "cpu.cfs_period_us", - ) - requireCgroup(t, cpuCFSPeriodPath, tt.wantCFSPeriod) - }) - } -} - -func TestSetup_RepoCgroups(t *testing.T) { - tests := []struct { - name string - cfg cgroups.Repositories - wantMemoryBytes int - wantCPUShares int - wantCPUQuotaUs int - wantCFSPeriod int - }{ - { - name: "all config specified", - cfg: defaultCgroupsConfig().Repositories, - wantMemoryBytes: 1024000, - wantCPUShares: 256, - wantCPUQuotaUs: 200, - wantCFSPeriod: int(cfsPeriodUs), - }, - { - name: "only memory limit set", - cfg: cgroups.Repositories{ - MemoryBytes: 1024000, - }, - wantMemoryBytes: 1024000, - }, - { - name: "only cpu shares set", - cfg: cgroups.Repositories{ - CPUShares: 512, - }, - wantCPUShares: 512, - }, - { - name: "only cpu quota set", - cfg: cgroups.Repositories{ - CPUQuotaUs: 100, - }, - wantCPUQuotaUs: 100, - wantCFSPeriod: int(cfsPeriodUs), - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - mock := newMock(t) - - pid := 1 - cfg := defaultCgroupsConfig() - cfg.Repositories = tt.cfg - cfg.Repositories.Count = 3 - cfg.HierarchyRoot = "gitaly" - cfg.Mountpoint = mock.root - - v1Manager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) - - require.False(t, v1Manager.Ready()) - require.NoError(t, v1Manager.Setup()) - require.True(t, v1Manager.Ready()) - - ctx := testhelper.Context(t) - - // Validate no shards have been created. We deliberately do not call - // `setupMockCgroupFiles()` here to confirm that the cgroup controller - // is creating repository directories in the correct location. - requireShardsV1(t, mock, v1Manager, pid) - - // Create a command to force Gitaly to create the repo cgroup. - cmd := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...) - require.NoError(t, cmd.Run()) - _, err := v1Manager.AddCommand(cmd) - require.NoError(t, err) - - groupID := calcGroupID(cmd.Args, cfg.Repositories.Count) - requireShardsV1(t, mock, v1Manager, pid, groupID) - - for i := 0; i < 3; i++ { - cgroupExists := uint(i) == groupID - - memoryLimitPath := filepath.Join( - mock.root, "memory", "gitaly", fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", i), "memory.limit_in_bytes", - ) - if cgroupExists { - requireCgroup(t, memoryLimitPath, tt.wantMemoryBytes) - } else { - require.NoFileExists(t, memoryLimitPath) - } - - cpuSharesPath := filepath.Join( - mock.root, "cpu", "gitaly", fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", i), "cpu.shares", - ) - if cgroupExists { - requireCgroup(t, cpuSharesPath, tt.wantCPUShares) - } else { - require.NoFileExists(t, cpuSharesPath) - } - - cpuCFSQuotaPath := filepath.Join( - mock.root, "cpu", "gitaly", fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", i), "cpu.cfs_quota_us", - ) - if cgroupExists { - requireCgroup(t, cpuCFSQuotaPath, tt.wantCPUQuotaUs) - } else { - require.NoFileExists(t, cpuCFSQuotaPath) - } - - cpuCFSPeriodPath := filepath.Join( - mock.root, "cpu", "gitaly", fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", i), "cpu.cfs_period_us", - ) - if cgroupExists { - requireCgroup(t, cpuCFSPeriodPath, tt.wantCFSPeriod) - } else { - require.NoFileExists(t, cpuCFSPeriodPath) - } - } - }) - } -} - -func TestAddCommand(t *testing.T) { - mock := newMock(t) - - config := defaultCgroupsConfig() - config.Repositories.Count = 10 - config.Repositories.MemoryBytes = 1024 - config.Repositories.CPUShares = 16 - config.HierarchyRoot = "gitaly" - config.Mountpoint = mock.root - - pid := 1 - groupID := calcGroupID(cmdArgs, config.Repositories.Count) - - v1Manager1 := mock.newCgroupManager(config, testhelper.SharedLogger(t), pid) - require.NoError(t, v1Manager1.Setup()) - ctx := testhelper.Context(t) - - cmd2 := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...) - require.NoError(t, cmd2.Run()) - - v1Manager2 := mock.newCgroupManager(config, testhelper.SharedLogger(t), pid) - - t.Run("without overridden key", func(t *testing.T) { - _, err := v1Manager2.AddCommand(cmd2) - require.NoError(t, err) - requireShardsV1(t, mock, v1Manager2, pid, groupID) - - for _, s := range mock.subsystems { - path := filepath.Join(mock.root, string(s.Name()), "gitaly", - fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", groupID), "cgroup.procs") - content := readCgroupFile(t, path) - - cmdPid, err := strconv.Atoi(string(content)) - require.NoError(t, err) - - require.Equal(t, cmd2.Process.Pid, cmdPid) - } - }) - - t.Run("with overridden key", func(t *testing.T) { - overridenGroupID := calcGroupID([]string{"foobar"}, config.Repositories.Count) - - _, err := v1Manager2.AddCommand(cmd2, WithCgroupKey("foobar")) - require.NoError(t, err) - - for _, s := range mock.subsystems { - path := filepath.Join(mock.root, string(s.Name()), "gitaly", - fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", overridenGroupID), "cgroup.procs") - content := readCgroupFile(t, path) - - cmdPid, err := strconv.Atoi(string(content)) - require.NoError(t, err) - - require.Equal(t, cmd2.Process.Pid, cmdPid) - } - }) -} - -func TestCleanup(t *testing.T) { - mock := newMock(t) - - pid := 1 - cfg := defaultCgroupsConfig() - cfg.Mountpoint = mock.root - - v1Manager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) - - require.NoError(t, v1Manager.Setup()) - require.NoError(t, v1Manager.Cleanup()) - - for i := 0; i < 3; i++ { - memoryPath := filepath.Join(mock.root, "memory", "gitaly", fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", i)) - cpuPath := filepath.Join(mock.root, "cpu", "gitaly", fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", i)) - - require.NoDirExists(t, memoryPath) - require.NoDirExists(t, cpuPath) - } -} - -func TestMetrics(t *testing.T) { - tests := []struct { - name string - metricsEnabled bool - pid int - expect string - }{ - { - name: "metrics enabled: true", - metricsEnabled: true, - pid: 1, - expect: `# HELP gitaly_cgroup_cpu_usage_total CPU Usage of Cgroup -# TYPE gitaly_cgroup_cpu_usage_total gauge -gitaly_cgroup_cpu_usage_total{path="%s",type="kernel"} 0 -gitaly_cgroup_cpu_usage_total{path="%s",type="user"} 0 -# HELP gitaly_cgroup_memory_reclaim_attempts_total Number of memory usage hits limits -# TYPE gitaly_cgroup_memory_reclaim_attempts_total gauge -gitaly_cgroup_memory_reclaim_attempts_total{path="%s"} 2 -# HELP gitaly_cgroup_procs_total Total number of procs -# TYPE gitaly_cgroup_procs_total gauge -gitaly_cgroup_procs_total{path="%s",subsystem="cpu"} 1 -gitaly_cgroup_procs_total{path="%s",subsystem="memory"} 1 -# HELP gitaly_cgroup_cpu_cfs_periods_total Number of elapsed enforcement period intervals -# TYPE gitaly_cgroup_cpu_cfs_periods_total counter -gitaly_cgroup_cpu_cfs_periods_total{path="%s"} 10 -# HELP gitaly_cgroup_cpu_cfs_throttled_periods_total Number of throttled period intervals -# TYPE gitaly_cgroup_cpu_cfs_throttled_periods_total counter -gitaly_cgroup_cpu_cfs_throttled_periods_total{path="%s"} 20 -# HELP gitaly_cgroup_cpu_cfs_throttled_seconds_total Total time duration the Cgroup has been throttled -# TYPE gitaly_cgroup_cpu_cfs_throttled_seconds_total counter -gitaly_cgroup_cpu_cfs_throttled_seconds_total{path="%s"} 0.001 -`, - }, - { - name: "metrics enabled: false", - metricsEnabled: false, - pid: 2, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - mock := newMock(t) - - config := defaultCgroupsConfig() - config.Repositories.Count = 1 - config.Repositories.MemoryBytes = 1048576 - config.Repositories.CPUShares = 16 - config.Mountpoint = mock.root - config.MetricsEnabled = tt.metricsEnabled - - v1Manager1 := mock.newCgroupManager(config, testhelper.SharedLogger(t), tt.pid) - - groupID := calcGroupID(cmdArgs, config.Repositories.Count) - - mock.setupMockCgroupFiles(t, v1Manager1, []uint{groupID}, mockCgroupFile{"memory.failcnt", "2"}) - require.NoError(t, v1Manager1.Setup()) - - ctx := testhelper.Context(t) - - cmd := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...) - require.NoError(t, cmd.Start()) - _, err := v1Manager1.AddCommand(cmd) - require.NoError(t, err) - - gitCmd1 := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...) - require.NoError(t, gitCmd1.Start()) - _, err = v1Manager1.AddCommand(gitCmd1) - require.NoError(t, err) - - gitCmd2 := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...) - require.NoError(t, gitCmd2.Start()) - _, err = v1Manager1.AddCommand(gitCmd2) - require.NoError(t, err) - - requireShardsV1(t, mock, v1Manager1, tt.pid, groupID) - - defer func() { - require.NoError(t, gitCmd2.Wait()) - }() - - require.NoError(t, cmd.Wait()) - require.NoError(t, gitCmd1.Wait()) - - repoCgroupPath := filepath.Join(v1Manager1.currentProcessCgroup(), "repos-0") - - expected := strings.NewReader(strings.ReplaceAll(tt.expect, "%s", repoCgroupPath)) - assert.NoError(t, testutil.CollectAndCompare(v1Manager1, expected)) - }) - } -} - -func TestPruneOldCgroups(t *testing.T) { - t.Parallel() - - testCases := []struct { - desc string - cfg cgroups.Config - expectedPruned bool - // setup returns a pid - setup func(t *testing.T, cfg cgroups.Config, mock *mockCgroup) int - }{ - { - desc: "process belongs to another user", - cfg: cgroups.Config{ - HierarchyRoot: "gitaly", - Repositories: cgroups.Repositories{ - Count: 10, - MemoryBytes: 10 * 1024 * 1024, - CPUShares: 1024, - }, - }, - setup: func(t *testing.T, cfg cgroups.Config, mock *mockCgroup) int { - pid := 1 - cgroupManager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) - require.NoError(t, cgroupManager.Setup()) - - return pid - }, - expectedPruned: true, - }, - { - desc: "no hierarchy root", - cfg: cgroups.Config{ - HierarchyRoot: "", - Repositories: cgroups.Repositories{ - Count: 10, - MemoryBytes: 10 * 1024 * 1024, - CPUShares: 1024, - }, - }, - setup: func(t *testing.T, cfg cgroups.Config, mock *mockCgroup) int { - pid := 1 - cgroupManager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) - require.NoError(t, cgroupManager.Setup()) - return 1 - }, - expectedPruned: false, - }, - { - desc: "pid of finished process", - cfg: cgroups.Config{ - HierarchyRoot: "gitaly", - Repositories: cgroups.Repositories{ - Count: 10, - MemoryBytes: 10 * 1024 * 1024, - CPUShares: 1024, - }, - }, - setup: func(t *testing.T, cfg cgroups.Config, mock *mockCgroup) int { - cmd := exec.Command("ls") - require.NoError(t, cmd.Run()) - pid := cmd.Process.Pid - - cgroupManager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) - require.NoError(t, cgroupManager.Setup()) - - memoryRoot := filepath.Join( - cfg.Mountpoint, - "memory", - cfg.HierarchyRoot, - "memory.limit_in_bytes", - ) - require.NoError(t, os.WriteFile(memoryRoot, []byte{}, fs.ModeAppend)) - - return pid - }, - expectedPruned: true, - }, - { - desc: "pid of running process", - cfg: cgroups.Config{ - HierarchyRoot: "gitaly", - Repositories: cgroups.Repositories{ - Count: 10, - MemoryBytes: 10 * 1024 * 1024, - CPUShares: 1024, - }, - }, - setup: func(t *testing.T, cfg cgroups.Config, mock *mockCgroup) int { - pid := os.Getpid() - - cgroupManager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) - require.NoError(t, cgroupManager.Setup()) - - return pid - }, - expectedPruned: false, - }, - { - desc: "gitaly-0 directory is deleted", - cfg: cgroups.Config{ - HierarchyRoot: "gitaly", - Repositories: cgroups.Repositories{ - Count: 10, - MemoryBytes: 10 * 1024 * 1024, - CPUShares: 1024, - }, - }, - setup: func(t *testing.T, cfg cgroups.Config, mock *mockCgroup) int { - cgroupManager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), 0) - require.NoError(t, cgroupManager.Setup()) - - return 0 - }, - expectedPruned: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - mock := newMock(t) - tc.cfg.Mountpoint = mock.root - - memoryRoot := filepath.Join( - tc.cfg.Mountpoint, - "memory", - tc.cfg.HierarchyRoot, - ) - cpuRoot := filepath.Join( - tc.cfg.Mountpoint, - "cpu", - tc.cfg.HierarchyRoot, - ) - - require.NoError(t, os.MkdirAll(cpuRoot, perm.PublicDir)) - require.NoError(t, os.MkdirAll(memoryRoot, perm.PublicDir)) - - pid := tc.setup(t, tc.cfg, mock) - - logger := testhelper.NewLogger(t) - hook := testhelper.AddLoggerHook(logger) - - mock.pruneOldCgroups(tc.cfg, logger) - - // create cgroups directories with a different pid - oldGitalyProcessMemoryDir := filepath.Join( - memoryRoot, - fmt.Sprintf("gitaly-%d", pid), - ) - oldGitalyProcesssCPUDir := filepath.Join( - cpuRoot, - fmt.Sprintf("gitaly-%d", pid), - ) - - if tc.expectedPruned { - require.NoDirExists(t, oldGitalyProcessMemoryDir) - require.NoDirExists(t, oldGitalyProcesssCPUDir) - } else { - require.DirExists(t, oldGitalyProcessMemoryDir) - require.DirExists(t, oldGitalyProcesssCPUDir) - require.Empty(t, hook.AllEntries()) - } - }) - } -} - -func TestStatsV1(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - desc string - mockFiles []mockCgroupFile - expectedStats Stats - }{ - { - desc: "empty statistics", - mockFiles: []mockCgroupFile{ - {"memory.limit_in_bytes", "0"}, - {"memory.usage_in_bytes", "0"}, - {"memory.oom_control", ""}, - {"cpu.stat", ""}, - }, - expectedStats: Stats{}, - }, - { - desc: "cgroupfs recorded some stats", - mockFiles: []mockCgroupFile{ - {"memory.limit_in_bytes", "2000000000"}, - {"memory.usage_in_bytes", "1234000000"}, - {"memory.oom_control", `oom_kill_disable 1 -under_oom 1 -oom_kill 3`}, - {"cpu.stat", `nr_periods 10 -nr_throttled 50 -throttled_time 1000000`}, // 0.001 seconds - {"memory.stat", `cache 235000000 -rss 234000000 -inactive_anon 200000000 -active_anon 34000000 -inactive_file 100000000 -active_file 135000000`}, - }, - expectedStats: Stats{ - ParentStats: CgroupStats{ - CPUThrottledCount: 50, - CPUThrottledDuration: 0.001, - MemoryUsage: 1234000000, - MemoryLimit: 2000000000, - OOMKills: 3, - UnderOOM: true, - Anon: 234000000, - ActiveAnon: 34000000, - InactiveAnon: 200000000, - File: 235000000, - ActiveFile: 135000000, - InactiveFile: 100000000, - }, - }, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - mock := newMock(t) - - config := defaultCgroupsConfig() - config.Repositories.Count = 1 - config.Repositories.MemoryBytes = 2000000000 - config.Repositories.CPUShares = 16 - config.Mountpoint = mock.root - - v1Manager := mock.newCgroupManager(config, testhelper.SharedLogger(t), 1) - - mock.setupMockCgroupFiles(t, v1Manager, []uint{}, tc.mockFiles...) - require.NoError(t, v1Manager.Setup()) - - stats, err := v1Manager.Stats() - require.NoError(t, err) - require.Equal(t, tc.expectedStats, stats) - }) - } -} - -func requireShardsV1(t *testing.T, mock *mockCgroup, mgr *CGroupManager, pid int, expectedShards ...uint) { - t.Helper() - - for shard := uint(0); shard < mgr.cfg.Repositories.Count; shard++ { - for _, s := range mock.subsystems { - cgroupPath := filepath.Join("gitaly", - fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", shard)) - diskPath := filepath.Join(mock.root, string(s.Name()), cgroupPath) - - if slices.Contains(expectedShards, shard) { - require.DirExists(t, diskPath) - - cgLock := mgr.status.getLock(cgroupPath) - require.True(t, cgLock.isCreated()) - } else { - require.NoDirExists(t, diskPath) - - // Confirm we pre-populated this map entry. - _, lockInserted := mgr.status.m[cgroupPath] - require.True(t, lockInserted) - } - } - } -} - -func requireCgroup(t *testing.T, cgroupFile string, want int) { - t.Helper() - - if want <= 0 { - // If files doesn't exist kernel will create it with default values - require.NoFileExistsf(t, cgroupFile, "cgroup file should not exist: %q", cgroupFile) - return - } - - require.Equal(t, - string(readCgroupFile(t, cgroupFile)), - strconv.Itoa(want), - ) -} - -func readCgroupFile(t *testing.T, path string) []byte { - t.Helper() - - // The cgroups package defaults to permission 0 as it expects the file to be existing (the kernel creates the file) - // and its testing override the permission private variable to something sensible, hence we have to chmod ourselves - // so we can read the file. - require.NoError(t, os.Chmod(path, perm.PublicFile)) - - return testhelper.MustReadFile(t, path) -} diff --git a/internal/cgroups/v2_linux.go b/internal/cgroups/v2_linux.go deleted file mode 100644 index 396cf104fdc8764f19b207f903f35bf5a3fb6905..0000000000000000000000000000000000000000 --- a/internal/cgroups/v2_linux.go +++ /dev/null @@ -1,220 +0,0 @@ -//go:build linux - -package cgroups - -import ( - "errors" - "fmt" - "io/fs" - "path/filepath" - "strings" - "time" - - "github.com/containerd/cgroups/v3/cgroup2" - "github.com/opencontainers/runtime-spec/specs-go" - "github.com/prometheus/client_golang/prometheus" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" - cgroupscfg "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/cgroups" - "gitlab.com/gitlab-org/gitaly/v16/internal/kernel" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" -) - -type cgroupV2Handler struct { - cfg cgroupscfg.Config - logger log.Logger - - *cgroupsMetrics - pid int - cloneIntoCgroup bool -} - -func newV2Handler(cfg cgroupscfg.Config, logger log.Logger, pid int) *cgroupV2Handler { - cloneIntoCgroup, err := kernel.IsAtLeast(kernel.Version{Major: 5, Minor: 7}) - if err != nil { - // Log the error for now as we're only rolling out functionality behind feature flag. - logger.WithError(err).Error("failed detecting kernel version, CLONE_INTO_CGROUP support disabled") - } - - return &cgroupV2Handler{ - cfg: cfg, - logger: logger, - pid: pid, - cgroupsMetrics: newV2CgroupsMetrics(), - cloneIntoCgroup: cloneIntoCgroup, - } -} - -func (cvh *cgroupV2Handler) setupParent(parentResources *specs.LinuxResources) error { - if _, err := cgroup2.NewManager(cvh.cfg.Mountpoint, "/"+cvh.currentProcessCgroup(), cgroup2.ToResources(parentResources)); err != nil { - return fmt.Errorf("failed creating parent cgroup: %w", err) - } - - return nil -} - -func (cvh *cgroupV2Handler) createCgroup(reposResources *specs.LinuxResources, cgroupPath string) error { - _, err := cgroup2.NewManager( - cvh.cfg.Mountpoint, - "/"+cgroupPath, - cgroup2.ToResources(reposResources), - ) - - return err -} - -func (cvh *cgroupV2Handler) addToCgroup(pid int, cgroupPath string) error { - control, err := cvh.loadCgroup(cgroupPath) - if err != nil { - return err - } - - if err := control.AddProc(uint64(pid)); err != nil { - // Command could finish so quickly before we can add it to a cgroup, so - // we don't consider it an error. - if strings.Contains(err.Error(), "no such process") { - return nil - } - return fmt.Errorf("failed adding process to cgroup: %w", err) - } - - return nil -} - -func (cvh *cgroupV2Handler) loadCgroup(cgroupPath string) (*cgroup2.Manager, error) { - control, err := cgroup2.Load("/"+cgroupPath, cgroup2.WithMountpoint(cvh.cfg.Mountpoint)) - if err != nil { - return nil, fmt.Errorf("failed loading %s cgroup: %w", cgroupPath, err) - } - return control, nil -} - -func (cvh *cgroupV2Handler) collect(repoPath string, ch chan<- prometheus.Metric) { - logger := cvh.logger.WithField("cgroup_path", repoPath) - control, err := cvh.loadCgroup(repoPath) - if err != nil { - logger.WithError(err).Warn("unable to load cgroup controller") - return - } - - if metrics, err := control.Stat(); err != nil { - logger.WithError(err).Warn("unable to get cgroup stats") - } else { - cpuUserMetric := cvh.cpuUsage.WithLabelValues(repoPath, "user") - cpuUserMetric.Set(float64(metrics.CPU.UserUsec)) - ch <- cpuUserMetric - - ch <- prometheus.MustNewConstMetric( - cvh.cpuCFSPeriods, - prometheus.CounterValue, - float64(metrics.CPU.NrPeriods), - repoPath, - ) - - ch <- prometheus.MustNewConstMetric( - cvh.cpuCFSThrottledPeriods, - prometheus.CounterValue, - float64(metrics.CPU.NrThrottled), - repoPath, - ) - - ch <- prometheus.MustNewConstMetric( - cvh.cpuCFSThrottledTime, - prometheus.CounterValue, - float64(metrics.CPU.ThrottledUsec)/float64(time.Second), - repoPath, - ) - - cpuKernelMetric := cvh.cpuUsage.WithLabelValues(repoPath, "kernel") - cpuKernelMetric.Set(float64(metrics.CPU.SystemUsec)) - ch <- cpuKernelMetric - } - - if subsystems, err := control.Controllers(); err != nil { - logger.WithError(err).Warn("unable to get cgroup hierarchy") - } else { - processes, err := control.Procs(true) - if err != nil { - logger.WithError(err). - Warn("unable to get process list") - return - } - - for _, subsystem := range subsystems { - procsMetric := cvh.procs.WithLabelValues(repoPath, subsystem) - procsMetric.Set(float64(len(processes))) - ch <- procsMetric - } - } -} - -func (cvh *cgroupV2Handler) cleanup() error { - processCgroupPath := cvh.currentProcessCgroup() - - control, err := cvh.loadCgroup(processCgroupPath) - if err != nil { - return err - } - - if err := control.Delete(); err != nil { - return fmt.Errorf("failed cleaning up cgroup %s: %w", processCgroupPath, err) - } - - return nil -} - -func (cvh *cgroupV2Handler) repoPath(groupID int) string { - return filepath.Join(cvh.currentProcessCgroup(), fmt.Sprintf("repos-%d", groupID)) -} - -func (cvh *cgroupV2Handler) currentProcessCgroup() string { - return config.GetGitalyProcessTempDir(cvh.cfg.HierarchyRoot, cvh.pid) -} - -func (cvh *cgroupV2Handler) stats() (Stats, error) { - processCgroupPath := cvh.currentProcessCgroup() - - control, err := cvh.loadCgroup(processCgroupPath) - if err != nil { - return Stats{}, err - } - - metrics, err := control.Stat() - if err != nil { - return Stats{}, fmt.Errorf("failed to fetch metrics %s: %w", processCgroupPath, err) - } - - stats := Stats{ - ParentStats: CgroupStats{ - CPUThrottledCount: metrics.CPU.NrThrottled, - CPUThrottledDuration: float64(metrics.CPU.ThrottledUsec) / float64(time.Second), - MemoryUsage: metrics.Memory.Usage, - MemoryLimit: metrics.Memory.UsageLimit, - Anon: metrics.Memory.Anon, - ActiveAnon: metrics.Memory.ActiveAnon, - InactiveAnon: metrics.Memory.InactiveAnon, - File: metrics.Memory.File, - ActiveFile: metrics.Memory.ActiveFile, - InactiveFile: metrics.Memory.InactiveFile, - }, - } - if metrics.MemoryEvents != nil { - stats.ParentStats.OOMKills = metrics.MemoryEvents.OomKill - } - return stats, nil -} - -func (cvh *cgroupV2Handler) supportsCloneIntoCgroup() bool { - return cvh.cloneIntoCgroup -} - -func pruneOldCgroupsV2(cfg cgroupscfg.Config, logger log.Logger) { - if err := config.PruneOldGitalyProcessDirectories( - logger, - filepath.Join(cfg.Mountpoint, cfg.HierarchyRoot), - ); err != nil { - var pathError *fs.PathError - if !errors.As(err, &pathError) { - logger.WithError(err).Error("failed to clean up cpu cgroups") - } - } -} diff --git a/internal/cgroups/v2_linux_test.go b/internal/cgroups/v2_linux_test.go deleted file mode 100644 index e6a788b4eadc12165e7acc9d7fcb9ede0ef20e7b..0000000000000000000000000000000000000000 --- a/internal/cgroups/v2_linux_test.go +++ /dev/null @@ -1,689 +0,0 @@ -//go:build linux - -package cgroups - -import ( - "fmt" - "io/fs" - "os" - "os/exec" - "path/filepath" - "strconv" - "strings" - "testing" - - cgrps "github.com/containerd/cgroups/v3" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/cgroups" - "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" - "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" - "golang.org/x/exp/slices" -) - -func defaultCgroupsV2Config() cgroups.Config { - return cgroups.Config{ - HierarchyRoot: "gitaly", - Repositories: cgroups.Repositories{ - Count: 3, - MemoryBytes: 1024000, - CPUShares: 256, - CPUQuotaUs: 2000, - }, - } -} - -func TestNewManagerV2(t *testing.T) { - cfg := cgroups.Config{Repositories: cgroups.Repositories{Count: 10}} - - manager := newCgroupManagerWithMode(cfg, testhelper.SharedLogger(t), 1, cgrps.Unified) - require.IsType(t, &cgroupV2Handler{}, manager.handler) -} - -func TestSetup_ParentCgroupsV2(t *testing.T) { - tests := []struct { - name string - cfg cgroups.Config - wantMemoryBytes int - wantCPUWeight int - wantCPUMax string - }{ - { - name: "all config specified", - cfg: cgroups.Config{ - MemoryBytes: 102400, - CPUShares: 256, - CPUQuotaUs: 2000, - }, - wantMemoryBytes: 102400, - wantCPUWeight: 256, - wantCPUMax: "2000 100000", - }, - { - name: "only memory limit set", - cfg: cgroups.Config{ - MemoryBytes: 102400, - }, - wantMemoryBytes: 102400, - }, - { - name: "only cpu shares set", - cfg: cgroups.Config{ - CPUShares: 512, - }, - wantCPUWeight: 512, - }, - { - name: "only cpu quota set", - cfg: cgroups.Config{ - CPUQuotaUs: 2000, - }, - wantCPUMax: "2000 100000", - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - mock := newMockV2(t) - - pid := 1 - tt.cfg.HierarchyRoot = "gitaly" - tt.cfg.Mountpoint = mock.root - - v2Manager := mock.newCgroupManager(tt.cfg, testhelper.SharedLogger(t), pid) - mock.setupMockCgroupFiles(t, v2Manager, []uint{}) - - require.False(t, v2Manager.Ready()) - require.NoError(t, v2Manager.Setup()) - require.True(t, v2Manager.Ready()) - - memoryMaxPath := filepath.Join( - mock.root, "gitaly", fmt.Sprintf("gitaly-%d", pid), "memory.max", - ) - requireCgroupWithInt(t, memoryMaxPath, tt.wantMemoryBytes) - - cpuWeightPath := filepath.Join( - mock.root, "gitaly", fmt.Sprintf("gitaly-%d", pid), "cpu.weight", - ) - requireCgroupWithInt(t, cpuWeightPath, calculateWantCPUWeight(tt.wantCPUWeight)) - - cpuMaxPath := filepath.Join( - mock.root, "gitaly", fmt.Sprintf("gitaly-%d", pid), "cpu.max", - ) - requireCgroupWithString(t, cpuMaxPath, tt.wantCPUMax) - }) - } -} - -func TestSetup_RepoCgroupsV2(t *testing.T) { - tests := []struct { - name string - cfg cgroups.Repositories - wantMemoryBytes int - wantCPUWeight int - wantCPUMax string - }{ - { - name: "all config specified", - cfg: defaultCgroupsV2Config().Repositories, - wantMemoryBytes: 1024000, - wantCPUWeight: 256, - wantCPUMax: "2000 100000", - }, - { - name: "only memory limit set", - cfg: cgroups.Repositories{ - Count: 3, - MemoryBytes: 1024000, - }, - wantMemoryBytes: 1024000, - }, - { - name: "only cpu shares set", - cfg: cgroups.Repositories{ - Count: 3, - CPUShares: 512, - }, - wantCPUWeight: 512, - }, - { - name: "only cpu quota set", - cfg: cgroups.Repositories{ - Count: 3, - CPUQuotaUs: 1000, - }, - wantCPUMax: "1000 100000", - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - mock := newMockV2(t) - - pid := 1 - - cfg := defaultCgroupsV2Config() - cfg.Mountpoint = mock.root - cfg.Repositories = tt.cfg - - groupID := calcGroupID(cmdArgs, cfg.Repositories.Count) - - v2Manager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) - - // Validate no shards have been created. We deliberately do not call - // `setupMockCgroupFiles()` here to confirm that the cgroup controller - // is creating repository directories in the correct location. - requireShardsV2(t, mock, v2Manager, pid) - - mock.setupMockCgroupFiles(t, v2Manager, []uint{groupID}) - - require.False(t, v2Manager.Ready()) - require.NoError(t, v2Manager.Setup()) - require.True(t, v2Manager.Ready()) - - ctx := testhelper.Context(t) - - // Create a command to force Gitaly to create the repo cgroup. - cmd := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...) - require.NoError(t, cmd.Run()) - _, err := v2Manager.AddCommand(cmd) - require.NoError(t, err) - - requireShardsV2(t, mock, v2Manager, pid, groupID) - - for i := 0; i < 3; i++ { - cgroupExists := uint(i) == groupID - - memoryMaxPath := filepath.Join( - mock.root, "gitaly", fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", i), "memory.max", - ) - - if cgroupExists { - requireCgroupWithInt(t, memoryMaxPath, tt.wantMemoryBytes) - } else { - require.NoFileExists(t, memoryMaxPath) - } - - cpuWeightPath := filepath.Join( - mock.root, "gitaly", fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", i), "cpu.weight", - ) - - if cgroupExists { - requireCgroupWithInt(t, cpuWeightPath, calculateWantCPUWeight(tt.wantCPUWeight)) - } else { - require.NoFileExists(t, cpuWeightPath) - } - - cpuMaxPath := filepath.Join( - mock.root, "gitaly", fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", i), "cpu.max", - ) - - if cgroupExists { - requireCgroupWithString(t, cpuMaxPath, tt.wantCPUMax) - } else { - require.NoFileExists(t, cpuMaxPath) - } - } - }) - } -} - -func TestAddCommandV2(t *testing.T) { - mock := newMockV2(t) - - config := defaultCgroupsV2Config() - config.Repositories.Count = 10 - config.Repositories.MemoryBytes = 1024 - config.Repositories.CPUShares = 16 - config.Mountpoint = mock.root - - pid := 1 - groupID := calcGroupID(cmdArgs, config.Repositories.Count) - - v2Manager1 := mock.newCgroupManager(config, testhelper.SharedLogger(t), pid) - mock.setupMockCgroupFiles(t, v2Manager1, []uint{}) - - require.NoError(t, v2Manager1.Setup()) - ctx := testhelper.Context(t) - - cmd2 := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...) - require.NoError(t, cmd2.Run()) - - v2Manager2 := mock.newCgroupManager(config, testhelper.SharedLogger(t), pid) - - t.Run("without overridden key", func(t *testing.T) { - groupID := calcGroupID(cmd2.Args, config.Repositories.Count) - - _, err := v2Manager2.AddCommand(cmd2) - require.NoError(t, err) - requireShardsV2(t, mock, v2Manager2, pid, groupID) - - path := filepath.Join(mock.root, "gitaly", - fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", groupID), "cgroup.procs") - content := readCgroupFile(t, path) - - cmdPid, err := strconv.Atoi(string(content)) - require.NoError(t, err) - - require.Equal(t, cmd2.Process.Pid, cmdPid) - }) - - t.Run("with overridden key", func(t *testing.T) { - overriddenGroupID := calcGroupID([]string{"foobar"}, config.Repositories.Count) - - _, err := v2Manager2.AddCommand(cmd2, WithCgroupKey("foobar")) - require.NoError(t, err) - requireShardsV2(t, mock, v2Manager2, pid, groupID, overriddenGroupID) - - path := filepath.Join(mock.root, "gitaly", - fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", overriddenGroupID), "cgroup.procs") - content := readCgroupFile(t, path) - - cmdPid, err := strconv.Atoi(string(content)) - require.NoError(t, err) - - require.Equal(t, cmd2.Process.Pid, cmdPid) - }) -} - -func TestCleanupV2(t *testing.T) { - mock := newMockV2(t) - - pid := 1 - cfg := defaultCgroupsV2Config() - cfg.Mountpoint = mock.root - - v2Manager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) - mock.setupMockCgroupFiles(t, v2Manager, []uint{0, 1, 2}) - - require.NoError(t, v2Manager.Setup()) - require.NoError(t, v2Manager.Cleanup()) - - for i := 0; i < 3; i++ { - require.NoDirExists(t, filepath.Join(mock.root, "gitaly", fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", i))) - } -} - -func TestMetricsV2(t *testing.T) { - tests := []struct { - name string - metricsEnabled bool - pid int - expect string - }{ - { - name: "metrics enabled: true", - metricsEnabled: true, - pid: 1, - expect: `# HELP gitaly_cgroup_cpu_cfs_periods_total Number of elapsed enforcement period intervals -# TYPE gitaly_cgroup_cpu_cfs_periods_total counter -gitaly_cgroup_cpu_cfs_periods_total{path="%s"} 10 -# HELP gitaly_cgroup_cpu_cfs_throttled_periods_total Number of throttled period intervals -# TYPE gitaly_cgroup_cpu_cfs_throttled_periods_total counter -gitaly_cgroup_cpu_cfs_throttled_periods_total{path="%s"} 20 -# HELP gitaly_cgroup_cpu_cfs_throttled_seconds_total Total time duration the Cgroup has been throttled -# TYPE gitaly_cgroup_cpu_cfs_throttled_seconds_total counter -gitaly_cgroup_cpu_cfs_throttled_seconds_total{path="%s"} 0.001 -# HELP gitaly_cgroup_cpu_usage_total CPU Usage of Cgroup -# TYPE gitaly_cgroup_cpu_usage_total gauge -gitaly_cgroup_cpu_usage_total{path="%s",type="kernel"} 0 -gitaly_cgroup_cpu_usage_total{path="%s",type="user"} 0 -# HELP gitaly_cgroup_procs_total Total number of procs -# TYPE gitaly_cgroup_procs_total gauge -gitaly_cgroup_procs_total{path="%s",subsystem="cpu"} 1 -gitaly_cgroup_procs_total{path="%s",subsystem="cpuset"} 1 -gitaly_cgroup_procs_total{path="%s",subsystem="memory"} 1 -`, - }, - { - name: "metrics enabled: false", - metricsEnabled: false, - pid: 2, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - mock := newMockV2(t) - - config := defaultCgroupsV2Config() - config.Repositories.Count = 1 - config.Repositories.MemoryBytes = 1048576 - config.Repositories.CPUShares = 16 - config.Mountpoint = mock.root - config.MetricsEnabled = tt.metricsEnabled - - groupID := calcGroupID(cmdArgs, config.Repositories.Count) - v2Manager1 := mock.newCgroupManager(config, testhelper.SharedLogger(t), tt.pid) - - mock.setupMockCgroupFiles(t, v2Manager1, []uint{groupID}) - require.NoError(t, v2Manager1.Setup()) - - ctx := testhelper.Context(t) - - cmd := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...) - require.NoError(t, cmd.Start()) - _, err := v2Manager1.AddCommand(cmd) - require.NoError(t, err) - - gitCmd1 := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...) - require.NoError(t, gitCmd1.Start()) - _, err = v2Manager1.AddCommand(gitCmd1) - require.NoError(t, err) - - gitCmd2 := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...) - require.NoError(t, gitCmd2.Start()) - _, err = v2Manager1.AddCommand(gitCmd2) - require.NoError(t, err) - - requireShardsV2(t, mock, v2Manager1, tt.pid, groupID) - - defer func() { - require.NoError(t, gitCmd2.Wait()) - }() - - require.NoError(t, cmd.Wait()) - require.NoError(t, gitCmd1.Wait()) - - repoCgroupPath := filepath.Join(v2Manager1.currentProcessCgroup(), "repos-0") - - expected := strings.NewReader(strings.ReplaceAll(tt.expect, "%s", repoCgroupPath)) - - assert.NoError(t, testutil.CollectAndCompare(v2Manager1, expected)) - }) - } -} - -func TestPruneOldCgroupsV2(t *testing.T) { - t.Parallel() - - testCases := []struct { - desc string - cfg cgroups.Config - expectedPruned bool - // setup returns a pid - setup func(*testing.T, cgroups.Config, *mockCgroupV2) int - }{ - { - desc: "process belongs to another user", - cfg: cgroups.Config{ - HierarchyRoot: "gitaly", - Repositories: cgroups.Repositories{ - Count: 10, - MemoryBytes: 10 * 1024 * 1024, - CPUShares: 1024, - }, - }, - setup: func(t *testing.T, cfg cgroups.Config, mock *mockCgroupV2) int { - pid := 1 - - cgroupManager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) - mock.setupMockCgroupFiles(t, cgroupManager, []uint{0, 1, 2}) - require.NoError(t, cgroupManager.Setup()) - - return pid - }, - expectedPruned: true, - }, - { - desc: "no hierarchy root", - cfg: cgroups.Config{ - HierarchyRoot: "", - Repositories: cgroups.Repositories{ - Count: 10, - MemoryBytes: 10 * 1024 * 1024, - CPUShares: 1024, - }, - }, - setup: func(t *testing.T, cfg cgroups.Config, mock *mockCgroupV2) int { - pid := 1 - - cgroupManager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) - mock.setupMockCgroupFiles(t, cgroupManager, []uint{0, 1, 2}) - require.NoError(t, cgroupManager.Setup()) - return 1 - }, - expectedPruned: false, - }, - { - desc: "pid of finished process", - cfg: cgroups.Config{ - HierarchyRoot: "gitaly", - Repositories: cgroups.Repositories{ - Count: 10, - MemoryBytes: 10 * 1024 * 1024, - CPUShares: 1024, - }, - }, - setup: func(t *testing.T, cfg cgroups.Config, mock *mockCgroupV2) int { - cmd := exec.Command("ls") - require.NoError(t, cmd.Run()) - pid := cmd.Process.Pid - - cgroupManager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) - mock.setupMockCgroupFiles(t, cgroupManager, []uint{0, 1, 2}) - require.NoError(t, cgroupManager.Setup()) - - memoryFile := filepath.Join( - cfg.Mountpoint, - cfg.HierarchyRoot, - "memory.limit_in_bytes", - ) - require.NoError(t, os.WriteFile(memoryFile, []byte{}, fs.ModeAppend)) - - return pid - }, - expectedPruned: true, - }, - { - desc: "pid of running process", - cfg: cgroups.Config{ - HierarchyRoot: "gitaly", - Repositories: cgroups.Repositories{ - Count: 10, - MemoryBytes: 10 * 1024 * 1024, - CPUShares: 1024, - }, - }, - setup: func(t *testing.T, cfg cgroups.Config, mock *mockCgroupV2) int { - pid := os.Getpid() - - cgroupManager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), pid) - mock.setupMockCgroupFiles(t, cgroupManager, []uint{0, 1, 2}) - require.NoError(t, cgroupManager.Setup()) - - return pid - }, - expectedPruned: false, - }, - { - desc: "gitaly-0 directory is deleted", - cfg: cgroups.Config{ - HierarchyRoot: "gitaly", - Repositories: cgroups.Repositories{ - Count: 10, - MemoryBytes: 10 * 1024 * 1024, - CPUShares: 1024, - }, - }, - setup: func(t *testing.T, cfg cgroups.Config, mock *mockCgroupV2) int { - cgroupManager := mock.newCgroupManager(cfg, testhelper.SharedLogger(t), 0) - mock.setupMockCgroupFiles(t, cgroupManager, []uint{0, 1, 2}) - require.NoError(t, cgroupManager.Setup()) - - return 0 - }, - expectedPruned: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - mock := newMockV2(t) - tc.cfg.Mountpoint = mock.root - - root := filepath.Join( - tc.cfg.Mountpoint, - tc.cfg.HierarchyRoot, - ) - require.NoError(t, os.MkdirAll(root, perm.PublicDir)) - - pid := tc.setup(t, tc.cfg, mock) - - logger := testhelper.NewLogger(t) - mock.pruneOldCgroups(tc.cfg, logger) - - // create cgroups directories with a different pid - oldGitalyProcessDir := filepath.Join( - root, - fmt.Sprintf("gitaly-%d", pid), - ) - - if tc.expectedPruned { - require.NoDirExists(t, oldGitalyProcessDir) - } else { - require.DirExists(t, oldGitalyProcessDir) - } - }) - } -} - -func TestStatsV2(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - desc string - mockFiles []mockCgroupFile - expectedStats Stats - }{ - { - desc: "empty statistics", - mockFiles: []mockCgroupFile{ - {"memory.current", "0"}, - {"memory.max", "0"}, - {"cpu.stat", ""}, - }, - expectedStats: Stats{}, - }, - { - desc: "cgroupfs recorded some stats", - mockFiles: []mockCgroupFile{ - {"memory.max", "2000000000"}, - {"memory.current", "1234000000"}, - {"memory.events", `low 1 -high 2 -max 3 -oom 4 -oom_kill 5`}, - {"nr_throttled", "50"}, - {"throttled_usec", "1000000"}, - {"cpu.stat", `nr_periods 10 -nr_throttled 50 -throttled_usec 1000000`}, // 0.001 seconds - {"memory.stat", `anon 234000000 -file 235000000 -inactive_anon 200000000 -active_anon 34000000 -inactive_file 100000000 -active_file 135000000`}, - }, - expectedStats: Stats{ - ParentStats: CgroupStats{ - CPUThrottledCount: 50, - CPUThrottledDuration: 0.001, - MemoryUsage: 1234000000, - MemoryLimit: 2000000000, - OOMKills: 5, - Anon: 234000000, - ActiveAnon: 34000000, - InactiveAnon: 200000000, - File: 235000000, - ActiveFile: 135000000, - InactiveFile: 100000000, - }, - }, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - mock := newMockV2(t) - - config := defaultCgroupsConfig() - config.Repositories.Count = 1 - config.Repositories.MemoryBytes = 2000000000 - config.Repositories.CPUShares = 16 - config.Mountpoint = mock.root - - v2Manager := mock.newCgroupManager(config, testhelper.SharedLogger(t), 1) - - mock.setupMockCgroupFiles(t, v2Manager, []uint{0}, tc.mockFiles...) - require.NoError(t, v2Manager.Setup()) - - stats, err := v2Manager.Stats() - require.NoError(t, err) - require.Equal(t, tc.expectedStats, stats) - }) - } -} - -func calculateWantCPUWeight(wantCPUWeight int) int { - if wantCPUWeight == 0 { - return 0 - } - return 1 + ((wantCPUWeight-2)*9999)/262142 -} - -func requireShardsV2(t *testing.T, mock *mockCgroupV2, mgr *CGroupManager, pid int, expectedShards ...uint) { - t.Helper() - - for shard := uint(0); shard < mgr.cfg.Repositories.Count; shard++ { - cgroupPath := filepath.Join("gitaly", fmt.Sprintf("gitaly-%d", pid), fmt.Sprintf("repos-%d", shard)) - diskPath := filepath.Join(mock.root, cgroupPath) - - if slices.Contains(expectedShards, shard) { - require.DirExists(t, diskPath) - - cgLock := mgr.status.getLock(cgroupPath) - require.True(t, cgLock.isCreated()) - } else { - require.NoDirExists(t, diskPath) - - // Confirm we pre-populated this map entry. - _, lockInserted := mgr.status.m[cgroupPath] - require.True(t, lockInserted) - } - } -} - -func requireCgroupWithString(t *testing.T, cgroupFile string, want string) { - t.Helper() - - if want == "" { - return - } - require.Equal(t, - string(readCgroupFile(t, cgroupFile)), - want, - ) -} - -func requireCgroupWithInt(t *testing.T, cgroupFile string, want int) { - t.Helper() - - if want <= 0 { - return - } - - require.Equal(t, - string(readCgroupFile(t, cgroupFile)), - strconv.Itoa(want), - ) -}