From 5619808c45ba14958a20eb2446c973e8f0a97ee6 Mon Sep 17 00:00:00 2001 From: Mustafa Bayar Date: Thu, 6 Nov 2025 13:05:22 +0100 Subject: [PATCH] partition: Partition ID migration Currently partition's are addressed using uin64 IDs. When RAFT replicas are used, each replica will keep an internal ID and another string based key constructed from storageName + partitionID to route to correct internal partition IDs. This will complicate the certain flows where we will need to be aware of RAFT status to refer to correct ID format. Instead moving to the composite ID format by default will eliminate the need for separate routing table for RAFT. For this refactoring, we need two things: 1- Changing all the reference to old uint64 format to be string 2- Adding migration to format existing structure and DB keys In this commit, we are first introducing the migration without the actual partition ID refactoring. By doing so, we are ensuring backward compatibility: Phase 1 (Current): Deploy migration code in disabled state - Introduces forward/backward migration logic without activating it - Establishes rollback safety net Phase 2 (Next MR): Enable migration to transform partition ID structure - Introduce the actual partition ID refactoring - Set flag to true to activate the new partition ID structure - Migration runs automatically on deployment Rollback Safety: If issues arise after Phase 2, reverting to Phase 1 will trigger backward migration to restore original structure. --- internal/cli/gitaly/serve.go | 90 +- .../partition/partition_id_migration.go | 850 ++++++++++++++++++ .../partition/partition_id_migration_test.go | 559 ++++++++++++ 3 files changed, 1498 insertions(+), 1 deletion(-) create mode 100644 internal/gitaly/storage/storagemgr/partition/partition_id_migration.go create mode 100644 internal/gitaly/storage/storagemgr/partition/partition_id_migration_test.go diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 0531be28fa..70b238bb52 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -497,7 +497,7 @@ func run(appCtx *cli.Command, cfg config.Cfg, logger log.Logger) error { node = raftNode } - if err := replicaPartitionMigration(cfg, locator, dbMgr, logger, assignmentWorkerErrCh); err != nil { + if err := partitionIDMigration(cfg, locator, dbMgr, logger, assignmentWorkerErrCh); err != nil { return err } @@ -868,3 +868,91 @@ func replicaPartitionMigration(cfg config.Cfg, locator storage.Locator, dbMgr *d } return nil } + +// partitionIDMigration performs partition ID migration for all storages. +func partitionIDMigration(cfg config.Cfg, locator storage.Locator, dbMgr *databasemgr.DBManager, logger log.Logger, assignmentWorkerErrCh <-chan error) error { + for _, storageCfg := range cfg.Storages { + partitionsDir, err := locator.PartitionsDir(storageCfg.Name) + if err != nil { + return fmt.Errorf("retrieving partitions dir for storage %s: %w", storageCfg.Name, err) + } + + db, err := dbMgr.GetDB(storageCfg.Name) + if err != nil { + return fmt.Errorf("getting db: %w", err) + } + + migrator, err := partition.NewIDMigrator(logger, partitionsDir, storageCfg.Name, db) + if err != nil { + return fmt.Errorf("creating partition ID migrator: %w", err) + } + + migrated, err := migrator.CheckMigrationStatus() + if err != nil { + return fmt.Errorf("partition ID migrator status check: %w", err) + } + + var shouldMigrate bool + var migrate func() error + var actionDesc, completionDesc string + + // Unlike RAFT migration, We don't need this to be controlled by config, + // because this is a structural change transparent to users. + // + // It is initially hardcoded to false for controlled rollout. + // This two-phase deployment strategy ensures backward compatibility: + // + // Phase 1 (Current): Deploy migration code in disabled state + // - Introduces forward/backward migration logic without activating it + // - Establishes rollback safety net + // + // Phase 2 (Next MR): Enable migration to transform partition ID structure + // - Set flag to true to activate the new partition ID structure + // - Migration runs automatically on deployment + // + // Rollback Safety: If issues arise after Phase 2, reverting to Phase 1 code + // will trigger backward migration to restore original partition structure. + partitionMigrationEnabled := false + if partitionMigrationEnabled { + shouldMigrate = !migrated + migrate = migrator.Forward + actionDesc = "starting partition ID migration" + completionDesc = "completed partition ID migration" + } else { + shouldMigrate = migrated + migrate = migrator.Backward + actionDesc = "starting partition ID migration rollback" + completionDesc = "completed rollback partition ID migration" + } + + if shouldMigrate { + // Block and wait for assignment worker to complete before migration + if err := <-assignmentWorkerErrCh; err != nil { + return fmt.Errorf("partition assignment worker: %w", err) + } + + logger.Info(fmt.Sprintf("%s for storage: %s", actionDesc, storageCfg.Name)) + startTime := time.Now() + + if err := migrate(); err != nil { + if partitionMigrationEnabled { + return fmt.Errorf("performing partition ID migration: %w", err) + } + return fmt.Errorf("undoing partition ID migration: %w", err) + } + + logger.Info(fmt.Sprintf("%s for storage %s in %v", completionDesc, storageCfg.Name, time.Since(startTime))) + } + + if !partitionMigrationEnabled && !shouldMigrate { + // Partition ID migration is neither enabled, nor previously run (doesn't require rollback). + // Fallback to replicaPartitionMigration to not block RAFT testing. + // Once we enable partitionIDMigration, we can phase out replicaPartitionMigration entirely, + // as partitionIDMigration encompasses the essential functions of replicaPartitionMigration. + if err := replicaPartitionMigration(cfg, locator, dbMgr, logger, assignmentWorkerErrCh); err != nil { + return err + } + } + } + return nil +} diff --git a/internal/gitaly/storage/storagemgr/partition/partition_id_migration.go b/internal/gitaly/storage/storagemgr/partition/partition_id_migration.go new file mode 100644 index 0000000000..be3f6a4b43 --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/partition_id_migration.go @@ -0,0 +1,850 @@ +package partition + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/binary" + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "regexp" + "strconv" + "strings" + + "github.com/dgraph-io/badger/v4" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v18/internal/log" + "gitlab.com/gitlab-org/gitaly/v18/internal/safe" +) + +var ( + // db key to track partition ID format migration + partitionIDFormatMigrationKey = []byte("partition_id_format_migration_status") + // db key prefix to track hash ID to uint64 ID mappings + prefixIDMigrationMapping = []byte("id_migration_hash_mapping/") + // matches xx/yy/hash (new path for pathToMigrate - full SHA256 hash, 64 hex chars) + hashPartitionPattern = regexp.MustCompile(`^([a-z0-9]{2})/([a-z0-9]{2})/([a-z0-9]{64})$`) +) + +// migrationEntry contains the values to apply for migration +type migrationEntry struct { + oldKey []byte + newKey []byte + value []byte +} + +// IDMigrator handles migrations between partition structures and partition ID formats +type IDMigrator struct { + logger log.Logger + storageName string + partitionsDir string + db keyvalue.Store +} + +// NewIDMigrator creates a new partition ID migrator instance +func NewIDMigrator(logger log.Logger, absoluteStateDir, storageName string, db keyvalue.Store) (*IDMigrator, error) { + partitionsDir, err := getPartitionsDir(absoluteStateDir) + if err != nil { + return nil, fmt.Errorf("determining partitions directory: %w", err) + } + + return &IDMigrator{ + logger: logger, + storageName: storageName, + partitionsDir: partitionsDir, + db: db, + }, nil +} + +// Forward migrates from the legacy to new partition structure and partition ID format +func (m *IDMigrator) Forward() error { + // Then, migrate partition structure for raft replica model + if err := m.partitionIDMigration(); err != nil { + if backwardErr := m.Backward(); backwardErr != nil { + return fmt.Errorf("partition ID migration failed: %w, and reversion also failed: %w", err, backwardErr) + } + return fmt.Errorf("partition ID migration: %w", err) + } + + if err := cleanupOldPartitionStructure(m.partitionsDir); err != nil { + return fmt.Errorf("cleanup legacy partition structure: %w", err) + } + m.logger.Info("ID Migration: cleaned up the legacy disk paths for the partitions") + + if err := m.updateMigrationInDB(); err != nil { + return fmt.Errorf("update migration status: %w", err) + } + + return nil +} + +// Backward handles the reverse migration to restore the legacy structure +// from the new one. +// Note: This assumes that the new structure is correctly set up and working. +func (m *IDMigrator) Backward() error { + // First, undo partition structure migration + if err := m.undoPartitionIDMigration(); err != nil { + return fmt.Errorf("undoing partition restructure: %w", err) + } + + if err := m.cleanupHashBasedPartitionStructure(); err != nil { + return fmt.Errorf("cleanup hash-based partition structure: %w", err) + } + m.logger.Info("ID Migration: cleaned up the new disk paths for the partitions") + + if err := m.deleteMigrationInDB(); err != nil { + return fmt.Errorf("delete migration status: %w", err) + } + + // Delete the hash mapping from the database + if err := m.deleteHashMapping(); err != nil { + return fmt.Errorf("delete hash mapping: %w", err) + } + + return nil +} + +// BEFORE MIGRATION: +// +// ── partitions +// ├── 59 # First two chars of hash(partitionID) +// │ └── 94 # Next two chars of hash(partitionID) +// │ └── 12345 # Numeric partitionID +// │ └── wal # Write-ahead log directory +// │ ├── 0000000000000001 # Log sequence number +// │ │ ├── MANIFEST +// │ │ └── RAFT +// │ └── 0000000000000002 +// │ ├── MANIFEST +// │ └── RAFT +// +// AFTER MIGRATION: +// +// ── partitions +// ├── 59 +// │ └── 94 +// │ └── 12345 +// │ └── wal +// │ ├── 0000000000000001 +// │ │ ├── MANIFEST +// │ │ └── RAFT +// │ └── 0000000000000002 +// │ ├── MANIFEST +// │ └── RAFT +// └── a8 +// └── 42 +// └── a842e11d8535f4506d1c65a940b19ecb5ade7ec76c48b6f407b8644c987fcf5c +// └── wal +// ├── 0000000000000001 +// │ ├── MANIFEST +// │ └── RAFT +// └── 0000000000000002 +// ├── MANIFEST +// └── RAFT +// +// partitionIDMigration restructures partitions from the legacy directory structure +// to correspond to new partition ID format. It also updates the badger DB keys. +func (m *IDMigrator) partitionIDMigration() error { + // Track all directories that need to be synced + dirsToSync := make(map[string]struct{}) + // Track hash to partition ID mapping for undo operation as we can not + // reverse the hash to figure out the original partition ID + hashToPartitionID := make(map[string]string) + + err := filepath.Walk(m.partitionsDir, func(path string, info fs.FileInfo, err error) error { + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + // Skip the base path itself + if path == m.partitionsDir { + return nil + } + + // Get relative path from state directory + relPath, err := filepath.Rel(m.partitionsDir, path) + if err != nil { + return err + } + + matches := pathPattern.FindStringSubmatch(relPath) + if len(matches) == 0 { + // Path doesn't match our pattern, skip it + return nil + } + // It matched, third capture group will be partitionID + partitionID := matches[3] + newPartitionID, newWalDir := pathToMigrate(m.storageName, m.partitionsDir, partitionID) + + // Store the mapping for undo operation + hashToPartitionID[newPartitionID] = partitionID + + // Add dir to be synced + dirsToSync[newWalDir] = struct{}{} + + // For files and directories beyond the /wal level + // Get components after /wal by removing the matched prefix + subPath := strings.TrimPrefix(relPath, matches[0]) + // Remove leading separator if present + subPath = strings.TrimPrefix(subPath, string(os.PathSeparator)) + newPath := filepath.Join(newWalDir, subPath) + if info.IsDir() { + if err := os.MkdirAll(newPath, info.Mode().Perm()); err != nil { + return fmt.Errorf("failed to create directory %s: %w", newPath, err) + } + } else if info.Mode().IsRegular() { + if err := os.Link(path, newPath); err != nil { + return fmt.Errorf("failed to hardlink file from %s to %s: %w", path, newPath, err) + } + } + + return nil + }) + if err != nil { + return err + } + + // Store the hash to partition ID mapping in the DB for undo operation + if err := m.storeHashMapping(hashToPartitionID); err != nil { + return fmt.Errorf("store hash mapping: %w", err) + } + + syncer := safe.NewSyncer() + for dir := range dirsToSync { + if err := syncer.SyncRecursive(context.Background(), dir); err != nil { + return fmt.Errorf("syncing new replica structure: %w", err) + } + } + + m.logger.Info(fmt.Sprintf("ID Migration: migrated disk paths for %d partitions", len(hashToPartitionID))) + + // Migrate badger DB keys from uint64 to string format + if err := m.migrateBadgerDBKeys(); err != nil { + return fmt.Errorf("migrate badger DB keys: %w", err) + } + + return nil +} + +// BEFORE MIGRATION: +// +// ── partitions +// └── a8 +// └── 42 +// └── a842e11d8535f4506d1c65a940b19ecb5ade7ec76c48b6f407b8644c987fcf5c +// └── wal +// └── 0000000000000001 +// ├── MANIFEST +// └── RAFT +// +// AFTER MIGRATION: +// +// ── partitions +// ├── 59 +// │ └── 94 +// │ └── 12345 +// │ └── wal +// │ └── 0000000000000001 +// │ ├── MANIFEST +// │ └── RAFT +// └── a8 +// └── 42 +// └── a842e11d8535f4506d1c65a940b19ecb5ade7ec76c48b6f407b8644c987fcf5c +// └── wal +// └── 0000000000000001 +// ├── MANIFEST +// └── RAFT +// +// undoPartitionIDMigration reverses the partition migration by creating hardlinks +// from the new structure back to the legacy structure. This is the opposite of PartitionIDMigration. +func (m *IDMigrator) undoPartitionIDMigration() error { + // Track directories that need to be synced + dirsToSync := make(map[string]struct{}) + + // Load the hash to partition ID mapping from the database + hashToPartitionID, err := m.loadHashMapping() + if err != nil { + return fmt.Errorf("load hash mapping: %w", err) + } + + err = filepath.Walk(m.partitionsDir, func(path string, info fs.FileInfo, err error) error { + if err != nil { + if os.IsNotExist(err) { + return os.MkdirAll(path, info.Mode().Perm()) + } + return err + } + + // Skip the base path itself + if path == m.partitionsDir { + return nil + } + + // Get relative path from partitionsDir + relPath, err := filepath.Rel(m.partitionsDir, path) + if err != nil { + return err + } + + // Check if this is a directory in the new hash-based structure + matches := hashPartitionPattern.FindStringSubmatch(relPath) + if len(matches) == 0 { + return nil // Skip if not matching the expected pattern + } + + // Extract the hash from the matches + hash := matches[3] + + // Look up the partition ID from our map + partitionID, ok := hashToPartitionID[hash] + if !ok { + // If we can't find it in the map, skip this directory + return nil + } + + oldPartition := storage.ComputePartition(partitionID) + oldWalPath := filepath.Join(m.partitionsDir, oldPartition) + + // Add the old WAL path to directories to sync + dirsToSync[oldWalPath] = struct{}{} + + // Use filepath.Walk again to process all subdirectories and files in the WAL directory + return filepath.Walk(path, func(subPath string, subInfo fs.FileInfo, err error) error { + if err != nil { + return err + } + + // Skip the WAL directory itself as we've already created it + if subPath == path { + return nil + } + + // Get the relative path from the new WAL directory + relSubPath, err := filepath.Rel(path, subPath) + if err != nil { + return fmt.Errorf("failed to get relative path for %s: %w", subPath, err) + } + + // Create the corresponding path in the old structure + oldSubPath := filepath.Join(oldWalPath, relSubPath) + + if subInfo.IsDir() { + // Create directory with same permissions + if err := os.MkdirAll(oldSubPath, subInfo.Mode().Perm()); err != nil { + return fmt.Errorf("failed to create directory %s: %w", oldSubPath, err) + } + } else if subInfo.Mode().IsRegular() { + // Check if file already exists (could be a hardlink from the forward migration) + if _, statErr := os.Stat(oldSubPath); statErr == nil { + // File already exists, skip creating hardlink + return nil + } else if !os.IsNotExist(statErr) { + return fmt.Errorf("failed to stat %s: %w", oldSubPath, statErr) + } + + // Create hardlink for the file + if err := os.Link(subPath, oldSubPath); err != nil { + return fmt.Errorf("failed to create hardlink from %s to %s: %w", subPath, oldSubPath, err) + } + } + + return nil + }) + }) + if err != nil { + return err + } + + // Sync all directories at once after all files have been created + syncer := safe.NewSyncer() + for dir := range dirsToSync { + if err := syncer.SyncRecursive(context.Background(), dir); err != nil { + return fmt.Errorf("syncing legacy partition structure: %w", err) + } + } + + m.logger.Info("ID Migration: rolled back partition disk path migration") + + // Undo badger DB key migration + if err := m.undoBadgerDBKeyMigration(hashToPartitionID); err != nil { + return fmt.Errorf("undo badger DB key migration: %w", err) + } + + m.logger.Info("ID Migration: rolled back partition KV pair migration") + + return nil +} + +func (m *IDMigrator) updateMigrationInDB() error { + return m.db.Update(func(txn keyvalue.ReadWriter) error { + // just set any value as the presence of a key is sufficient + if err := txn.Set(partitionIDFormatMigrationKey, []byte(nil)); err != nil { + return fmt.Errorf("set entry: %w", err) + } + + return nil + }) +} + +// CheckMigrationStatus is used to validate whether the entire migration was complete +func (m *IDMigrator) CheckMigrationStatus() (bool, error) { + var migrated bool + err := m.db.View(func(txn keyvalue.ReadWriter) error { + _, err := txn.Get(partitionIDFormatMigrationKey) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + migrated = false + return nil + } + return fmt.Errorf("get: %w", err) + } + migrated = true + return nil + }) + + return migrated, err +} + +func (m *IDMigrator) deleteMigrationInDB() error { + return m.db.Update(func(txn keyvalue.ReadWriter) error { + if err := txn.Delete(partitionIDFormatMigrationKey); err != nil { + return fmt.Errorf("delete partition ID format migration key: %w", err) + } + + return nil + }) +} + +// migrateBadgerDBKeys migrates all badger DB keys from uint64 to string format +func (m *IDMigrator) migrateBadgerDBKeys() error { + return m.db.Update(func(txn keyvalue.ReadWriter) error { + // Create iterator to scan all partition-related keys + iter := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: []byte(storagemgr.PrefixPartition), + }) + defer iter.Close() + + var keysToMigrate []migrationEntry + + for iter.Rewind(); iter.Valid(); iter.Next() { + item := iter.Item() + oldKey := item.Key() + + splitOldKey := bytes.SplitN(oldKey, []byte("/"), 3) + // Partition IDs in legacy format are always 8 bytes + if len(splitOldKey) < 3 { + continue // Skip malformed keys + } + + partitionIDBytes := splitOldKey[1] + // Only migrate if partition ID is exactly 8 bytes (legacy uint64 format) + if len(partitionIDBytes) != 8 { + continue // Already migrated or not in legacy format + } + + keySuffix := splitOldKey[2] // Everything after the 8-byte partition ID + + // Parse as uint64 (legacy format) + val := binary.BigEndian.Uint64(partitionIDBytes) + partitionID := strconv.FormatUint(val, 10) + + // Get the value + value, err := item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("copy value for key %q: %w", oldKey, err) + } + + keysToMigrate = append(keysToMigrate, migrationEntry{ + oldKey: append([]byte(nil), oldKey...), + newKey: createHashedPartitionKey(m.storageName, partitionID, keySuffix), + value: value, + }) + } + + // Migrate the keys using batch writer + batch := m.db.NewWriteBatch() + defer batch.Cancel() + + for _, km := range keysToMigrate { + // Set the new key + if err := batch.Set(km.newKey, km.value); err != nil { + return fmt.Errorf("set new key %q: %w", km.newKey, err) + } + + // Delete the old key + if err := batch.Delete(km.oldKey); err != nil { + return fmt.Errorf("delete old key %q: %w", km.oldKey, err) + } + } + + // Also handle partition assignments + assignmentIter := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: []byte("partition_assignment/"), + }) + defer assignmentIter.Close() + + var assignmentsToUpdate []struct { + key []byte + value []byte + } + + for assignmentIter.Rewind(); assignmentIter.Valid(); assignmentIter.Next() { + item := assignmentIter.Item() + key := item.Key() + + oldValue, err := item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("copy assignment value for key %q: %w", key, err) + } + + // If the value is 8 bytes, it is the legacy uint64 format + if len(oldValue) == 8 { + val := binary.BigEndian.Uint64(oldValue) + newPartitionID := getHashedPartitionID(m.storageName, strconv.FormatUint(val, 10)) + newValue := []byte(newPartitionID) + + assignmentsToUpdate = append(assignmentsToUpdate, struct { + key []byte + value []byte + }{ + key: append([]byte(nil), key...), + value: newValue, + }) + } + } + + for _, assignment := range assignmentsToUpdate { + if err := batch.Set(assignment.key, assignment.value); err != nil { + return fmt.Errorf("update assignment value for key %q: %w", assignment.key, err) + } + } + + if err := batch.Flush(); err != nil { + return fmt.Errorf("flush assignment batch: %w", err) + } + + m.logger.Info(fmt.Sprintf("ID Migration: migrated %d items (%d keys, %d assignments)", + len(keysToMigrate)+len(assignmentsToUpdate), + len(keysToMigrate), + len(assignmentsToUpdate))) + + return nil + }) +} + +// undoBadgerDBKeyMigration reverses the badger DB key migration from hash back to uint64 format +func (m *IDMigrator) undoBadgerDBKeyMigration(hashToPartitionID map[string]string) error { + return m.db.Update(func(txn keyvalue.ReadWriter) error { + // Create iterator to scan all partition-related keys + iter := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: []byte(storagemgr.PrefixPartition), + }) + defer iter.Close() + + // Collect keys to migrate (can't modify while iterating) + var keysToMigrate []migrationEntry + + for iter.Rewind(); iter.Valid(); iter.Next() { + item := iter.Item() + oldKey := item.Key() + + splitOldKey := bytes.SplitN(oldKey, []byte("/"), 3) + if len(splitOldKey) < 3 { + continue // Skip malformed keys + } + + hashBytes := splitOldKey[1] + // Check if this is already in legacy format (8 bytes) + // This can happen if the undo migration runs successfully, + // but fails at cleanup stage. + if len(hashBytes) == 8 { + continue + } + + keySuffix := splitOldKey[2] + + // Look up the partition ID from the hash + partitionID, ok := hashToPartitionID[string(hashBytes)] + if !ok { + return fmt.Errorf("missing ID mapping for %s", hashBytes) + } + + partitionKey, err := createLegacyPartitionKey(partitionID, keySuffix) + if err != nil { + return fmt.Errorf("create legacy partition key: %w", err) + } + + // Get the value + value, err := item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("copy value for key %q: %w", oldKey, err) + } + + keysToMigrate = append(keysToMigrate, migrationEntry{ + oldKey: append([]byte(nil), oldKey...), + newKey: partitionKey, + value: value, + }) + } + + // Migrate the keys using batch writer + batch := m.db.NewWriteBatch() + defer batch.Cancel() + + for _, km := range keysToMigrate { + // Set the new key + if err := batch.Set(km.newKey, km.value); err != nil { + return fmt.Errorf("set legacy key %q: %w", km.newKey, err) + } + + // Delete the old key + if err := batch.Delete(km.oldKey); err != nil { + return fmt.Errorf("delete new key %q: %w", km.oldKey, err) + } + } + + // Reverse migrate partition assignment values from string back to uint64 format + assignmentIter := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: []byte("partition_assignment/"), + }) + defer assignmentIter.Close() + + var assignmentsToUpdate []struct { + key []byte + value []byte + } + + for assignmentIter.Rewind(); assignmentIter.Valid(); assignmentIter.Next() { + item := assignmentIter.Item() + key := item.Key() + + oldValue, err := item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("copy assignment value for key %q: %w", key, err) + } + + // The value is a hash (64 hex chars) + hash := string(oldValue) + if len(hash) != 64 { + continue // Skip non-hash values + } + + // Look up the partition ID from the hash + partitionID, ok := hashToPartitionID[hash] + if !ok { + return fmt.Errorf("missing hash mapping for %s during rollback", hash) + } + + // Convert partition ID to uint64 + val, parseErr := strconv.ParseUint(partitionID, 10, 64) + if parseErr != nil { + continue // Skip if partition ID is not a valid uint64 + } + + // Convert back to uint64 binary format + marshaled := make([]byte, 8) + binary.BigEndian.PutUint64(marshaled, val) + + assignmentsToUpdate = append(assignmentsToUpdate, struct { + key []byte + value []byte + }{ + key: append([]byte(nil), key...), + value: marshaled, + }) + } + + for _, assignment := range assignmentsToUpdate { + if err := batch.Set(assignment.key, assignment.value); err != nil { + return fmt.Errorf("update assignment value for key %q: %w", assignment.key, err) + } + } + + if err := batch.Flush(); err != nil { + return fmt.Errorf("flush batch: %w", err) + } + + return nil + }) +} + +// storeHashMapping stores the hash to partition ID mapping in the database +func (m *IDMigrator) storeHashMapping(hashToPartitionID map[string]string) error { + batch := m.db.NewWriteBatch() + defer batch.Cancel() + + for hash, partitionID := range hashToPartitionID { + key := append(prefixIDMigrationMapping, []byte(hash)...) + if err := batch.Set(key, []byte(partitionID)); err != nil { + return fmt.Errorf("store hash mapping for %s: %w", hash, err) + } + } + + if err := batch.Flush(); err != nil { + return fmt.Errorf("flush batch: %w", err) + } + + return nil +} + +// loadHashMapping loads the hash to partition ID mapping from the database +func (m *IDMigrator) loadHashMapping() (map[string]string, error) { + hashToPartitionID := make(map[string]string) + err := m.db.View(func(txn keyvalue.ReadWriter) error { + iter := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: prefixIDMigrationMapping, + }) + defer iter.Close() + + for iter.Rewind(); iter.Valid(); iter.Next() { + item := iter.Item() + key := item.Key() + + hash := strings.TrimPrefix(string(key), string(prefixIDMigrationMapping)) + + value, err := item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("copy value for hash %s: %w", hash, err) + } + + hashToPartitionID[hash] = string(value) + } + + return nil + }) + + return hashToPartitionID, err +} + +// deleteHashMapping removes all hash mapping entries from the database +func (m *IDMigrator) deleteHashMapping() error { + batch := m.db.NewWriteBatch() + defer batch.Cancel() + + if err := m.db.View(func(txn keyvalue.ReadWriter) error { + iter := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: prefixIDMigrationMapping, + }) + defer iter.Close() + + for iter.Rewind(); iter.Valid(); iter.Next() { + key := append([]byte(nil), iter.Item().Key()...) + if err := batch.Delete(key); err != nil { + return fmt.Errorf("delete hash mapping key %q: %w", key, err) + } + } + + return nil + }); err != nil { + return fmt.Errorf("iterate migration mapping keys: %w", err) + } + + if err := batch.Flush(); err != nil { + return fmt.Errorf("flush batch: %w", err) + } + + return nil +} + +// cleanupHashBasedPartitionStructure removes the hash-based partition structure after undoing the migration +func (m *IDMigrator) cleanupHashBasedPartitionStructure() error { + // Walk through the new structure and remove directories that match the hash pattern + dirsToRemove := make(map[string]struct{}) + syncer := safe.NewSyncer() + + err := filepath.Walk(m.partitionsDir, func(path string, info fs.FileInfo, err error) error { + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + // Skip the base path itself + if path == m.partitionsDir { + return nil + } + + // Only look at directories + if !info.IsDir() { + return nil + } + + relPath, err := filepath.Rel(m.partitionsDir, path) + if err != nil { + return err + } + + // Look for directories in the hash-based structure format: /xx/yy/hash + matches := hashPartitionPattern.FindStringSubmatch(relPath) + if len(matches) > 0 && relPath == matches[0] { + dirsToRemove[path] = struct{}{} + + // Skip processing this directory's contents + return filepath.SkipDir + } + + return nil + }) + if err != nil { + return err + } + + // Now remove all identified directories + for dir := range dirsToRemove { + if err := os.RemoveAll(dir); err != nil { + return fmt.Errorf("failed to remove hash-based directory structure %s: %w", dir, err) + } + + // Sync immediate parent + if err := syncer.SyncParent(context.Background(), dir); err != nil { + return fmt.Errorf("syncing deleted files: %w", err) + } + } + + return nil +} + +func getHashedPartitionID(storageName, partitionID string) string { + targetPartitionName := storage.GetRaftPartitionName(storageName, partitionID) + return fmt.Sprintf("%x", sha256.Sum256([]byte(targetPartitionName))) +} + +func pathToMigrate(storageName, partitionsBase, partitionID string) (newPartitionID string, absolutePath string) { + hash := getHashedPartitionID(storageName, partitionID) + hashedPath := fmt.Sprintf("%s/%s/%s", hash[:2], hash[2:4], hash) + + return hash, + filepath.Join( + partitionsBase, + hashedPath, + "wal", + ) +} + +// createLegacyPartitionKey creates a partition key in the legacy uint64 format +func createLegacyPartitionKey(partitionID string, keySuffix []byte) ([]byte, error) { + // Convert partition ID to uint64 + val, parseErr := strconv.ParseUint(partitionID, 10, 64) + if parseErr != nil { + return nil, fmt.Errorf("partition ID is in wrong format: %s", partitionID) + } + + marshaled := make([]byte, 8) + binary.BigEndian.PutUint64(marshaled, val) + return fmt.Appendf(nil, "%s%s/%s", storagemgr.PrefixPartition, marshaled, keySuffix), nil +} + +// createHashedPartitionKey creates a partition key in the new hashed id format +func createHashedPartitionKey(storageName, partitionID string, keySuffix []byte) []byte { + hashedID := getHashedPartitionID(storageName, partitionID) + return fmt.Appendf(nil, "%s%s/%s", storagemgr.PrefixPartition, hashedID, keySuffix) +} diff --git a/internal/gitaly/storage/storagemgr/partition/partition_id_migration_test.go b/internal/gitaly/storage/storagemgr/partition/partition_id_migration_test.go new file mode 100644 index 0000000000..3aa3eea8a3 --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/partition_id_migration_test.go @@ -0,0 +1,559 @@ +package partition + +import ( + "encoding/binary" + "fmt" + "os" + "path/filepath" + "strconv" + "testing" + + "github.com/dgraph-io/badger/v4" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testcfg" +) + +func TestPartitionIDMigrator_Forward(t *testing.T) { + t.Parallel() + + t.Run("successful migration", func(t *testing.T) { + t.Parallel() + // Create a new temp directory for this test + tempDir := testhelper.TempDir(t) + partitionsDir := filepath.Join(tempDir, "partitions") + cfg := testcfg.Build(t, testcfg.WithStorages("testStorage")) + + logger := testhelper.SharedLogger(t) + ctx := testhelper.Context(t) + + // Create partitions directory + err := os.MkdirAll(filepath.Join(tempDir, "partitions"), mode.Directory) + require.NoError(t, err, "Failed to create partitions directory") + + _, db := getTestDBManager(t, ctx, cfg, logger) + + // Setup old format DB keys for both partitions + setupLegacyFormatDBKeys(t, db, "123") + setupLegacyFormatDBKeys(t, db, "456") + + migrator, err := NewIDMigrator(logger, partitionsDir, cfg.Storages[0].Name, db) + require.NoError(t, err) + + // Setup old partition structure using the helper + setupDirectory(t, partitionsDir, map[string]bool{ + "xx/yy/123/wal/0000000000000001/RAFT": false, + "zz/yy/456/wal/0000000000000001/RAFT": false, + }) + + // Run the migration + require.NoError(t, migrator.Forward()) + + // Verify DB keys were migrated for both partitions + verifyNewFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + verifyNewFormatDBKeys(t, db, cfg.Storages[0].Name, "456") + + // Verify old structure is gone, new structure should exist + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/xx": {Mode: mode.Directory}, + "/xx/yy": {Mode: mode.Directory}, // parent of partition 123 still remains + "/zz": {Mode: mode.Directory}, + "/zz/yy": {Mode: mode.Directory}, // parent of partition 456 still remains + "/43": {Mode: mode.Directory}, + "/43/02": {Mode: mode.Directory}, + "/43/02/4302ef58917c73c82721690cbf90475f82c7863dbdf8c73fe012ce5aed77ef8c": {Mode: mode.Directory}, + "/43/02/4302ef58917c73c82721690cbf90475f82c7863dbdf8c73fe012ce5aed77ef8c/wal": {Mode: mode.Directory}, + "/43/02/4302ef58917c73c82721690cbf90475f82c7863dbdf8c73fe012ce5aed77ef8c/wal/0000000000000001": {Mode: mode.Directory}, + "/43/02/4302ef58917c73c82721690cbf90475f82c7863dbdf8c73fe012ce5aed77ef8c/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + "/7e": {Mode: mode.Directory}, + "/7e/8d": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal/0000000000000001": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + }) + + assertPartitionIDMigrationKeyExists(t, migrator.db) + migrated, err := migrator.CheckMigrationStatus() + require.NoError(t, err) + require.True(t, migrated) + }) + + t.Run("can be run multiple times", func(t *testing.T) { + t.Parallel() + + // Create a new temp directory for this test + tempDir := testhelper.TempDir(t) + partitionsDir := filepath.Join(tempDir, "partitions") + + cfg := testcfg.Build(t, testcfg.WithStorages("testStorage")) + + logger := testhelper.SharedLogger(t) + ctx := testhelper.Context(t) + + // Create partitions directory + err := os.MkdirAll(filepath.Join(tempDir, "partitions"), mode.Directory) + require.NoError(t, err, "Failed to create partitions directory") + + _, db := getTestDBManager(t, ctx, cfg, logger) + + setupLegacyFormatDBKeys(t, db, "123") + + migrator, err := NewIDMigrator(logger, partitionsDir, cfg.Storages[0].Name, db) + require.NoError(t, err) + + // Setup structure with read-only directory to cause cleanup error + setupDirectory(t, partitionsDir, map[string]bool{ + "xx/yy/123/wal/0000000000000001": true, // partition directory + }) + + // Make the directory read-only on the parent to cause error during cleanup + legacyPartitionPath := filepath.Join(partitionsDir, "xx/yy") + err = os.Chmod(legacyPartitionPath, 0o500) // read-only + require.NoError(t, err) + + // Run the migration - should complete the migration but fail during cleanup + require.Error(t, migrator.Forward()) + + // Restore permissions for cleanup + assert.NoError(t, os.Chmod(legacyPartitionPath, mode.Directory)) + + // Since cleanup failed, old structure still exists + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/xx": {Mode: mode.Directory}, + "/xx/yy": {Mode: mode.Directory}, + "/xx/yy/123": {Mode: mode.Directory}, // expected to not be cleaned up + "/7e": {Mode: mode.Directory}, + "/7e/8d": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal/0000000000000001": {Mode: mode.Directory}, + }) + assertPartitionIDMigrationKeyAbsent(t, migrator.db) + migrated, err := migrator.CheckMigrationStatus() + require.NoError(t, err) + require.False(t, migrated) + require.NoError(t, migrator.Forward()) + + // Verify DB keys were migrated + verifyNewFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + + // /xx/yy/123 got cleaned up + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/xx": {Mode: mode.Directory}, + "/xx/yy": {Mode: mode.Directory}, + "/7e": {Mode: mode.Directory}, + "/7e/8d": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal/0000000000000001": {Mode: mode.Directory}, + }) + assertPartitionIDMigrationKeyExists(t, migrator.db) + migrated, err = migrator.CheckMigrationStatus() + require.NoError(t, err) + require.True(t, migrated) + }) + + t.Run("rollback on BadgerDB migration failure", func(t *testing.T) { + t.Parallel() + + // Create a new temp directory for this test + tempDir := testhelper.TempDir(t) + partitionsDir := filepath.Join(tempDir, "partitions") + cfg := testcfg.Build(t, testcfg.WithStorages("testStorage")) + + logger := testhelper.SharedLogger(t) + ctx := testhelper.Context(t) + + // Create partitions directory + err := os.MkdirAll(filepath.Join(tempDir, "partitions"), mode.Directory) + require.NoError(t, err, "Failed to create partitions directory") + + _, db := getTestDBManager(t, ctx, cfg, logger) + + // Setup old format DB keys + setupLegacyFormatDBKeys(t, db, "123") + + // Setup old partition structure + setupDirectory(t, partitionsDir, map[string]bool{ + "a6/65/123/wal/0000000000000001/RAFT": false, + }) + + // Wrap the DB to inject failure during batch flush in migrateBadgerDBKeys. + // We skip the first flush which is from the storeHashMapping. + mockDB := &mockDBWithFailure{ + Store: db, + failOnBatchFlush: true, + targetFlushCount: 2, + } + migrator, err := NewIDMigrator(logger, partitionsDir, cfg.Storages[0].Name, mockDB) + require.NoError(t, err) + + // Run the migration - filesystem migration will succeed but BadgerDB migration will fail + // This should trigger Backward() to rollback the filesystem changes + err = migrator.Forward() + require.Error(t, err) + require.NotContains(t, err.Error(), "reversion also failed", "rollback should succeed") + + // Verify that the filesystem was rolled back to the old structure + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/7e": {Mode: mode.Directory}, + "/7e/8d": {Mode: mode.Directory}, + "/a6": {Mode: mode.Directory}, + "/a6/65": {Mode: mode.Directory}, + "/a6/65/123": {Mode: mode.Directory}, + "/a6/65/123/wal": {Mode: mode.Directory}, + "/a6/65/123/wal/0000000000000001": {Mode: mode.Directory}, + "/a6/65/123/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + }) + + // Verify DB keys are still in legacy format + verifyLegacyFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + + // Verify migration status is not set + assertPartitionIDMigrationKeyAbsent(t, migrator.db) + migrated, err := migrator.CheckMigrationStatus() + require.NoError(t, err) + require.False(t, migrated) + + // Trying again should succeed + require.NoError(t, migrator.Forward()) + + // Verify DB keys were migrated + verifyNewFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/a6": {Mode: mode.Directory}, + "/a6/65": {Mode: mode.Directory}, + "/7e": {Mode: mode.Directory}, + "/7e/8d": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal/0000000000000001": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + }) + assertPartitionIDMigrationKeyExists(t, migrator.db) + migrated, err = migrator.CheckMigrationStatus() + require.NoError(t, err) + require.True(t, migrated) + }) +} + +func TestPartitionIDMigrator_Backward(t *testing.T) { + t.Parallel() + + t.Run("successful migration", func(t *testing.T) { + t.Parallel() + // Create a new temp directory for this test + tempDir := testhelper.TempDir(t) + partitionsDir := filepath.Join(tempDir, "partitions") + cfg := testcfg.Build(t, testcfg.WithStorages("testStorage")) + logger := testhelper.SharedLogger(t) + ctx := testhelper.Context(t) + + // Create partitions directory + err := os.MkdirAll(filepath.Join(tempDir, "partitions"), mode.Directory) + require.NoError(t, err, "Failed to create partitions directory") + + _, db := getTestDBManager(t, ctx, cfg, logger) + + // Setup new format DB keys for both partitions + setupNewFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + setupNewFormatDBKeys(t, db, cfg.Storages[0].Name, "456") + + migrator, err := NewIDMigrator(logger, partitionsDir, cfg.Storages[0].Name, db) + require.NoError(t, err) + // Setup new partition structure using the helper + setupDirectory(t, partitionsDir, map[string]bool{ + "xx/yy/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal/0000000000000001/RAFT": false, + "zz/yy/4302ef58917c73c82721690cbf90475f82c7863dbdf8c73fe012ce5aed77ef8c/wal/0000000000000001/RAFT": false, + }) + + // Run the migration + err = migrator.Backward() + require.NoError(t, err) + + // Verify DB keys were reverted for both partitions + verifyLegacyFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + verifyLegacyFormatDBKeys(t, db, cfg.Storages[0].Name, "456") + + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/xx": {Mode: mode.Directory}, + "/xx/yy": {Mode: mode.Directory}, + "/zz": {Mode: mode.Directory}, + "/zz/yy": {Mode: mode.Directory}, + "/b3": {Mode: mode.Directory}, + "/b3/a8": {Mode: mode.Directory}, + "/b3/a8/456": {Mode: mode.Directory}, + "/b3/a8/456/wal": {Mode: mode.Directory}, + "/b3/a8/456/wal/0000000000000001": {Mode: mode.Directory}, + "/b3/a8/456/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + "/a6": {Mode: mode.Directory}, + "/a6/65": {Mode: mode.Directory}, + "/a6/65/123": {Mode: mode.Directory}, + "/a6/65/123/wal": {Mode: mode.Directory}, + "/a6/65/123/wal/0000000000000001": {Mode: mode.Directory}, + "/a6/65/123/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + }) + assertPartitionIDMigrationKeyAbsent(t, migrator.db) + }) + + t.Run("can be run multiple times", func(t *testing.T) { + t.Parallel() + + // Create a new temp directory for this test + tempDir := testhelper.TempDir(t) + partitionsDir := filepath.Join(tempDir, "partitions") + cfg := testcfg.Build(t, testcfg.WithStorages("testStorage")) + logger := testhelper.SharedLogger(t) + ctx := testhelper.Context(t) + + // Create partitions directory + err := os.MkdirAll(filepath.Join(tempDir, "partitions"), mode.Directory) + require.NoError(t, err, "Failed to create partitions directory") + + _, db := getTestDBManager(t, ctx, cfg, logger) + + // Setup new format DB keys + setupNewFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + + migrator, err := NewIDMigrator(logger, partitionsDir, cfg.Storages[0].Name, db) + require.NoError(t, err) + // Setup structure with read-only directory to cause cleanup error + structure := map[string]bool{ + "qq/yy/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal/0000000000000001/RAFT": false, // partition directory + } + + // Creates all the parent dirs in the path specified + setupDirectory(t, partitionsDir, structure) + + // Make the directory read-only on the parent to cause error during cleanup + newPartitionPath := filepath.Join(partitionsDir, "qq/yy") + require.NoError(t, os.Chmod(newPartitionPath, 0o500)) // read-only + + // Run the migration - should complete the migration but fail during cleanup + require.Error(t, migrator.Backward()) + + _, err = os.Stat(filepath.Join(partitionsDir, "qq/yy/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a")) + assert.NoError(t, err, "Dir should exist as it didn't get cleaned up") + + // Restore permissions for cleanup + assert.NoError(t, os.Chmod(newPartitionPath, mode.Directory)) + require.NoError(t, migrator.Backward()) + + // Verify DB keys were reverted + verifyLegacyFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/qq": {Mode: mode.Directory}, + "/qq/yy": {Mode: mode.Directory}, + "/a6": {Mode: mode.Directory}, + "/a6/65": {Mode: mode.Directory}, + "/a6/65/123": {Mode: mode.Directory}, + "/a6/65/123/wal": {Mode: mode.Directory}, + "/a6/65/123/wal/0000000000000001": {Mode: mode.Directory}, + "/a6/65/123/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + }) + assertPartitionIDMigrationKeyAbsent(t, migrator.db) + }) +} + +// assertPartitionIDMigrationKeyExists checks if the partition ID format migration key exists in the database +func assertPartitionIDMigrationKeyExists(t *testing.T, db keyvalue.Store) { + require.NoError(t, db.View(func(txn keyvalue.ReadWriter) error { + item, err := txn.Get(partitionIDFormatMigrationKey) + require.NoError(t, err) + require.NotNil(t, item) // Ensure we got a valid item + return nil + })) +} + +// assertPartitionIDMigrationKeyAbsent checks if the partition ID format migration key is absent from the database +func assertPartitionIDMigrationKeyAbsent(t *testing.T, db keyvalue.Store) { + require.NoError(t, db.View(func(txn keyvalue.ReadWriter) error { + _, getErr := txn.Get(partitionIDFormatMigrationKey) + require.ErrorIs(t, getErr, badger.ErrKeyNotFound) + return nil + })) +} + +// setupLegacyFormatDBKeys creates DB keys in the legacy uint64 format for testing +func setupLegacyFormatDBKeys(t *testing.T, db keyvalue.Store, partitionID string) { + t.Helper() + + require.NoError(t, db.Update(func(txn keyvalue.ReadWriter) error { + // Create legacy format partition key (uint64) + legacyPartitionKey, err := createLegacyPartitionKey(partitionID, []byte("test_key")) + require.NoError(t, err) + require.NoError(t, txn.Set(legacyPartitionKey, []byte("test_value"))) + + val, err := strconv.ParseUint(partitionID, 10, 64) + require.NoError(t, err) + marshaled := make([]byte, 8) + binary.BigEndian.PutUint64(marshaled, val) + + // Set a partition assignment key with legacy format value (use unique key per partition) + assignmentKey := []byte("partition_assignment/test_repo_" + partitionID) + require.NoError(t, txn.Set(assignmentKey, marshaled)) + + return nil + })) +} + +// setupNewFormatDBKeys creates DB keys in the new hash format for testing +func setupNewFormatDBKeys(t *testing.T, db keyvalue.Store, storageName, partitionID string) { + t.Helper() + + require.NoError(t, db.Update(func(txn keyvalue.ReadWriter) error { + // Create new format partition key (hash) + newPartitionKey := createHashedPartitionKey(storageName, partitionID, []byte("test_key")) + require.NoError(t, txn.Set(newPartitionKey, []byte("test_value"))) + + // Set a partition assignment key with new format value (use unique key per partition) + hashedPartitionID := getHashedPartitionID(storageName, partitionID) + assignmentKey := []byte("partition_assignment/test_repo_" + partitionID) + require.NoError(t, txn.Set(assignmentKey, []byte(hashedPartitionID))) + + // Store the hash mapping for undo operation + hashMappingKey := append(prefixIDMigrationMapping, []byte(hashedPartitionID)...) + require.NoError(t, txn.Set(hashMappingKey, []byte(partitionID))) + + return nil + })) +} + +// verifyNewFormatDBKeys verifies that DB keys have been migrated to the new hash format +func verifyNewFormatDBKeys(t *testing.T, db keyvalue.Store, storageName, partitionID string) { + t.Helper() + + require.NoError(t, db.View(func(txn keyvalue.ReadWriter) error { + // Verify new format partition key exists (hash-based) + newPartitionKey := createHashedPartitionKey(storageName, partitionID, []byte("test_key")) + + item, err := txn.Get(newPartitionKey) + require.NoError(t, err, "new format partition key should exist") + + value, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, "test_value", string(value)) + + // Verify legacy format partition key does not exist + legacyPartitionKey, err := createLegacyPartitionKey(partitionID, []byte("test_key")) + require.NoError(t, err) + + _, err = txn.Get(legacyPartitionKey) + require.ErrorIs(t, err, badger.ErrKeyNotFound, "legacy format partition key should not exist") + + // Verify partition assignment value is in new hash format (use unique key per partition) + assignmentKey := []byte("partition_assignment/test_repo_" + partitionID) + item, err = txn.Get(assignmentKey) + require.NoError(t, err) + + hashedPartitionID := getHashedPartitionID(storageName, partitionID) + assignmentValue, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, hashedPartitionID, string(assignmentValue), "assignment value should be in new hash format") + + // Verify hash mapping exists + hashMappingKey := append(prefixIDMigrationMapping, []byte(hashedPartitionID)...) + item, err = txn.Get(hashMappingKey) + require.NoError(t, err, "hash mapping should exist") + + mappingValue, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, partitionID, string(mappingValue), "hash mapping should map to original partition ID") + + return nil + })) +} + +// verifyLegacyFormatDBKeys verifies that DB keys have been reverted to the legacy uint64 format +func verifyLegacyFormatDBKeys(t *testing.T, db keyvalue.Store, storageName, partitionID string) { + t.Helper() + + require.NoError(t, db.View(func(txn keyvalue.ReadWriter) error { + // Verify legacy format partition key exists + legacyPartitionKey, err := createLegacyPartitionKey(partitionID, []byte("test_key")) + require.NoError(t, err) + + item, err := txn.Get(legacyPartitionKey) + require.NoError(t, err, "legacy format partition key should exist") + + value, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, "test_value", string(value)) + + // Verify new format partition key does not exist (hash-based) + newPartitionKey := createHashedPartitionKey(storageName, partitionID, []byte("test_key")) + + _, err = txn.Get(newPartitionKey) + require.ErrorIs(t, err, badger.ErrKeyNotFound, "new format partition key should not exist") + + // Verify partition assignment value is in legacy format (use unique key per partition) + assignmentKey := []byte("partition_assignment/test_repo_" + partitionID) + item, err = txn.Get(assignmentKey) + require.NoError(t, err) + + val, err := strconv.ParseUint(partitionID, 10, 64) + require.NoError(t, err) + marshaled := make([]byte, 8) + binary.BigEndian.PutUint64(marshaled, val) + + assignmentValue, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, marshaled, assignmentValue, "assignment value should be in legacy uint64 format") + + // Verify hash mapping does not exist + hashMappingKey := append(prefixIDMigrationMapping, []byte(getHashedPartitionID(storageName, partitionID))...) + _, err = txn.Get(hashMappingKey) + require.ErrorIs(t, err, badger.ErrKeyNotFound, "hash mapping should not exist after backward migration") + + return nil + })) +} + +// mockDBWithFailure wraps a real DB and injects failures at specific points +type mockDBWithFailure struct { + keyvalue.Store + failOnBatchFlush bool + batchFlushCount int + targetFlushCount int // Fail on the Nth flush + hasFailed bool // Track if we've already failed once +} + +func (m *mockDBWithFailure) NewWriteBatch() keyvalue.WriteBatch { + realBatch := m.Store.NewWriteBatch() + return &mockBatchWithFailure{ + WriteBatch: realBatch, + failOnFlush: m.failOnBatchFlush, + parent: m, + } +} + +type mockBatchWithFailure struct { + keyvalue.WriteBatch + failOnFlush bool + parent *mockDBWithFailure +} + +func (m *mockBatchWithFailure) Flush() error { + if m.failOnFlush && !m.parent.hasFailed { + m.parent.batchFlushCount++ + // Fail on the target flush (e.g., the second flush in migrateBadgerDBKeys) + if m.parent.batchFlushCount == m.parent.targetFlushCount { + m.parent.hasFailed = true + return fmt.Errorf("injected batch flush failure") + } + } + return m.WriteBatch.Flush() +} -- GitLab