diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 0531be28fa4fc4b51fd8adfa6dab70fbe332dc16..70b238bb527356aae5af161f204b0aa4b53b2bb0 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -497,7 +497,7 @@ func run(appCtx *cli.Command, cfg config.Cfg, logger log.Logger) error { node = raftNode } - if err := replicaPartitionMigration(cfg, locator, dbMgr, logger, assignmentWorkerErrCh); err != nil { + if err := partitionIDMigration(cfg, locator, dbMgr, logger, assignmentWorkerErrCh); err != nil { return err } @@ -868,3 +868,91 @@ func replicaPartitionMigration(cfg config.Cfg, locator storage.Locator, dbMgr *d } return nil } + +// partitionIDMigration performs partition ID migration for all storages. +func partitionIDMigration(cfg config.Cfg, locator storage.Locator, dbMgr *databasemgr.DBManager, logger log.Logger, assignmentWorkerErrCh <-chan error) error { + for _, storageCfg := range cfg.Storages { + partitionsDir, err := locator.PartitionsDir(storageCfg.Name) + if err != nil { + return fmt.Errorf("retrieving partitions dir for storage %s: %w", storageCfg.Name, err) + } + + db, err := dbMgr.GetDB(storageCfg.Name) + if err != nil { + return fmt.Errorf("getting db: %w", err) + } + + migrator, err := partition.NewIDMigrator(logger, partitionsDir, storageCfg.Name, db) + if err != nil { + return fmt.Errorf("creating partition ID migrator: %w", err) + } + + migrated, err := migrator.CheckMigrationStatus() + if err != nil { + return fmt.Errorf("partition ID migrator status check: %w", err) + } + + var shouldMigrate bool + var migrate func() error + var actionDesc, completionDesc string + + // Unlike RAFT migration, We don't need this to be controlled by config, + // because this is a structural change transparent to users. + // + // It is initially hardcoded to false for controlled rollout. + // This two-phase deployment strategy ensures backward compatibility: + // + // Phase 1 (Current): Deploy migration code in disabled state + // - Introduces forward/backward migration logic without activating it + // - Establishes rollback safety net + // + // Phase 2 (Next MR): Enable migration to transform partition ID structure + // - Set flag to true to activate the new partition ID structure + // - Migration runs automatically on deployment + // + // Rollback Safety: If issues arise after Phase 2, reverting to Phase 1 code + // will trigger backward migration to restore original partition structure. + partitionMigrationEnabled := false + if partitionMigrationEnabled { + shouldMigrate = !migrated + migrate = migrator.Forward + actionDesc = "starting partition ID migration" + completionDesc = "completed partition ID migration" + } else { + shouldMigrate = migrated + migrate = migrator.Backward + actionDesc = "starting partition ID migration rollback" + completionDesc = "completed rollback partition ID migration" + } + + if shouldMigrate { + // Block and wait for assignment worker to complete before migration + if err := <-assignmentWorkerErrCh; err != nil { + return fmt.Errorf("partition assignment worker: %w", err) + } + + logger.Info(fmt.Sprintf("%s for storage: %s", actionDesc, storageCfg.Name)) + startTime := time.Now() + + if err := migrate(); err != nil { + if partitionMigrationEnabled { + return fmt.Errorf("performing partition ID migration: %w", err) + } + return fmt.Errorf("undoing partition ID migration: %w", err) + } + + logger.Info(fmt.Sprintf("%s for storage %s in %v", completionDesc, storageCfg.Name, time.Since(startTime))) + } + + if !partitionMigrationEnabled && !shouldMigrate { + // Partition ID migration is neither enabled, nor previously run (doesn't require rollback). + // Fallback to replicaPartitionMigration to not block RAFT testing. + // Once we enable partitionIDMigration, we can phase out replicaPartitionMigration entirely, + // as partitionIDMigration encompasses the essential functions of replicaPartitionMigration. + if err := replicaPartitionMigration(cfg, locator, dbMgr, logger, assignmentWorkerErrCh); err != nil { + return err + } + } + } + return nil +} diff --git a/internal/gitaly/storage/storagemgr/partition/partition_id_migration.go b/internal/gitaly/storage/storagemgr/partition/partition_id_migration.go new file mode 100644 index 0000000000000000000000000000000000000000..be3f6a4b4321b4beb52c51e2d2fae16953cac39d --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/partition_id_migration.go @@ -0,0 +1,850 @@ +package partition + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/binary" + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "regexp" + "strconv" + "strings" + + "github.com/dgraph-io/badger/v4" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v18/internal/log" + "gitlab.com/gitlab-org/gitaly/v18/internal/safe" +) + +var ( + // db key to track partition ID format migration + partitionIDFormatMigrationKey = []byte("partition_id_format_migration_status") + // db key prefix to track hash ID to uint64 ID mappings + prefixIDMigrationMapping = []byte("id_migration_hash_mapping/") + // matches xx/yy/hash (new path for pathToMigrate - full SHA256 hash, 64 hex chars) + hashPartitionPattern = regexp.MustCompile(`^([a-z0-9]{2})/([a-z0-9]{2})/([a-z0-9]{64})$`) +) + +// migrationEntry contains the values to apply for migration +type migrationEntry struct { + oldKey []byte + newKey []byte + value []byte +} + +// IDMigrator handles migrations between partition structures and partition ID formats +type IDMigrator struct { + logger log.Logger + storageName string + partitionsDir string + db keyvalue.Store +} + +// NewIDMigrator creates a new partition ID migrator instance +func NewIDMigrator(logger log.Logger, absoluteStateDir, storageName string, db keyvalue.Store) (*IDMigrator, error) { + partitionsDir, err := getPartitionsDir(absoluteStateDir) + if err != nil { + return nil, fmt.Errorf("determining partitions directory: %w", err) + } + + return &IDMigrator{ + logger: logger, + storageName: storageName, + partitionsDir: partitionsDir, + db: db, + }, nil +} + +// Forward migrates from the legacy to new partition structure and partition ID format +func (m *IDMigrator) Forward() error { + // Then, migrate partition structure for raft replica model + if err := m.partitionIDMigration(); err != nil { + if backwardErr := m.Backward(); backwardErr != nil { + return fmt.Errorf("partition ID migration failed: %w, and reversion also failed: %w", err, backwardErr) + } + return fmt.Errorf("partition ID migration: %w", err) + } + + if err := cleanupOldPartitionStructure(m.partitionsDir); err != nil { + return fmt.Errorf("cleanup legacy partition structure: %w", err) + } + m.logger.Info("ID Migration: cleaned up the legacy disk paths for the partitions") + + if err := m.updateMigrationInDB(); err != nil { + return fmt.Errorf("update migration status: %w", err) + } + + return nil +} + +// Backward handles the reverse migration to restore the legacy structure +// from the new one. +// Note: This assumes that the new structure is correctly set up and working. +func (m *IDMigrator) Backward() error { + // First, undo partition structure migration + if err := m.undoPartitionIDMigration(); err != nil { + return fmt.Errorf("undoing partition restructure: %w", err) + } + + if err := m.cleanupHashBasedPartitionStructure(); err != nil { + return fmt.Errorf("cleanup hash-based partition structure: %w", err) + } + m.logger.Info("ID Migration: cleaned up the new disk paths for the partitions") + + if err := m.deleteMigrationInDB(); err != nil { + return fmt.Errorf("delete migration status: %w", err) + } + + // Delete the hash mapping from the database + if err := m.deleteHashMapping(); err != nil { + return fmt.Errorf("delete hash mapping: %w", err) + } + + return nil +} + +// BEFORE MIGRATION: +// +// ── partitions +// ├── 59 # First two chars of hash(partitionID) +// │ └── 94 # Next two chars of hash(partitionID) +// │ └── 12345 # Numeric partitionID +// │ └── wal # Write-ahead log directory +// │ ├── 0000000000000001 # Log sequence number +// │ │ ├── MANIFEST +// │ │ └── RAFT +// │ └── 0000000000000002 +// │ ├── MANIFEST +// │ └── RAFT +// +// AFTER MIGRATION: +// +// ── partitions +// ├── 59 +// │ └── 94 +// │ └── 12345 +// │ └── wal +// │ ├── 0000000000000001 +// │ │ ├── MANIFEST +// │ │ └── RAFT +// │ └── 0000000000000002 +// │ ├── MANIFEST +// │ └── RAFT +// └── a8 +// └── 42 +// └── a842e11d8535f4506d1c65a940b19ecb5ade7ec76c48b6f407b8644c987fcf5c +// └── wal +// ├── 0000000000000001 +// │ ├── MANIFEST +// │ └── RAFT +// └── 0000000000000002 +// ├── MANIFEST +// └── RAFT +// +// partitionIDMigration restructures partitions from the legacy directory structure +// to correspond to new partition ID format. It also updates the badger DB keys. +func (m *IDMigrator) partitionIDMigration() error { + // Track all directories that need to be synced + dirsToSync := make(map[string]struct{}) + // Track hash to partition ID mapping for undo operation as we can not + // reverse the hash to figure out the original partition ID + hashToPartitionID := make(map[string]string) + + err := filepath.Walk(m.partitionsDir, func(path string, info fs.FileInfo, err error) error { + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + // Skip the base path itself + if path == m.partitionsDir { + return nil + } + + // Get relative path from state directory + relPath, err := filepath.Rel(m.partitionsDir, path) + if err != nil { + return err + } + + matches := pathPattern.FindStringSubmatch(relPath) + if len(matches) == 0 { + // Path doesn't match our pattern, skip it + return nil + } + // It matched, third capture group will be partitionID + partitionID := matches[3] + newPartitionID, newWalDir := pathToMigrate(m.storageName, m.partitionsDir, partitionID) + + // Store the mapping for undo operation + hashToPartitionID[newPartitionID] = partitionID + + // Add dir to be synced + dirsToSync[newWalDir] = struct{}{} + + // For files and directories beyond the /wal level + // Get components after /wal by removing the matched prefix + subPath := strings.TrimPrefix(relPath, matches[0]) + // Remove leading separator if present + subPath = strings.TrimPrefix(subPath, string(os.PathSeparator)) + newPath := filepath.Join(newWalDir, subPath) + if info.IsDir() { + if err := os.MkdirAll(newPath, info.Mode().Perm()); err != nil { + return fmt.Errorf("failed to create directory %s: %w", newPath, err) + } + } else if info.Mode().IsRegular() { + if err := os.Link(path, newPath); err != nil { + return fmt.Errorf("failed to hardlink file from %s to %s: %w", path, newPath, err) + } + } + + return nil + }) + if err != nil { + return err + } + + // Store the hash to partition ID mapping in the DB for undo operation + if err := m.storeHashMapping(hashToPartitionID); err != nil { + return fmt.Errorf("store hash mapping: %w", err) + } + + syncer := safe.NewSyncer() + for dir := range dirsToSync { + if err := syncer.SyncRecursive(context.Background(), dir); err != nil { + return fmt.Errorf("syncing new replica structure: %w", err) + } + } + + m.logger.Info(fmt.Sprintf("ID Migration: migrated disk paths for %d partitions", len(hashToPartitionID))) + + // Migrate badger DB keys from uint64 to string format + if err := m.migrateBadgerDBKeys(); err != nil { + return fmt.Errorf("migrate badger DB keys: %w", err) + } + + return nil +} + +// BEFORE MIGRATION: +// +// ── partitions +// └── a8 +// └── 42 +// └── a842e11d8535f4506d1c65a940b19ecb5ade7ec76c48b6f407b8644c987fcf5c +// └── wal +// └── 0000000000000001 +// ├── MANIFEST +// └── RAFT +// +// AFTER MIGRATION: +// +// ── partitions +// ├── 59 +// │ └── 94 +// │ └── 12345 +// │ └── wal +// │ └── 0000000000000001 +// │ ├── MANIFEST +// │ └── RAFT +// └── a8 +// └── 42 +// └── a842e11d8535f4506d1c65a940b19ecb5ade7ec76c48b6f407b8644c987fcf5c +// └── wal +// └── 0000000000000001 +// ├── MANIFEST +// └── RAFT +// +// undoPartitionIDMigration reverses the partition migration by creating hardlinks +// from the new structure back to the legacy structure. This is the opposite of PartitionIDMigration. +func (m *IDMigrator) undoPartitionIDMigration() error { + // Track directories that need to be synced + dirsToSync := make(map[string]struct{}) + + // Load the hash to partition ID mapping from the database + hashToPartitionID, err := m.loadHashMapping() + if err != nil { + return fmt.Errorf("load hash mapping: %w", err) + } + + err = filepath.Walk(m.partitionsDir, func(path string, info fs.FileInfo, err error) error { + if err != nil { + if os.IsNotExist(err) { + return os.MkdirAll(path, info.Mode().Perm()) + } + return err + } + + // Skip the base path itself + if path == m.partitionsDir { + return nil + } + + // Get relative path from partitionsDir + relPath, err := filepath.Rel(m.partitionsDir, path) + if err != nil { + return err + } + + // Check if this is a directory in the new hash-based structure + matches := hashPartitionPattern.FindStringSubmatch(relPath) + if len(matches) == 0 { + return nil // Skip if not matching the expected pattern + } + + // Extract the hash from the matches + hash := matches[3] + + // Look up the partition ID from our map + partitionID, ok := hashToPartitionID[hash] + if !ok { + // If we can't find it in the map, skip this directory + return nil + } + + oldPartition := storage.ComputePartition(partitionID) + oldWalPath := filepath.Join(m.partitionsDir, oldPartition) + + // Add the old WAL path to directories to sync + dirsToSync[oldWalPath] = struct{}{} + + // Use filepath.Walk again to process all subdirectories and files in the WAL directory + return filepath.Walk(path, func(subPath string, subInfo fs.FileInfo, err error) error { + if err != nil { + return err + } + + // Skip the WAL directory itself as we've already created it + if subPath == path { + return nil + } + + // Get the relative path from the new WAL directory + relSubPath, err := filepath.Rel(path, subPath) + if err != nil { + return fmt.Errorf("failed to get relative path for %s: %w", subPath, err) + } + + // Create the corresponding path in the old structure + oldSubPath := filepath.Join(oldWalPath, relSubPath) + + if subInfo.IsDir() { + // Create directory with same permissions + if err := os.MkdirAll(oldSubPath, subInfo.Mode().Perm()); err != nil { + return fmt.Errorf("failed to create directory %s: %w", oldSubPath, err) + } + } else if subInfo.Mode().IsRegular() { + // Check if file already exists (could be a hardlink from the forward migration) + if _, statErr := os.Stat(oldSubPath); statErr == nil { + // File already exists, skip creating hardlink + return nil + } else if !os.IsNotExist(statErr) { + return fmt.Errorf("failed to stat %s: %w", oldSubPath, statErr) + } + + // Create hardlink for the file + if err := os.Link(subPath, oldSubPath); err != nil { + return fmt.Errorf("failed to create hardlink from %s to %s: %w", subPath, oldSubPath, err) + } + } + + return nil + }) + }) + if err != nil { + return err + } + + // Sync all directories at once after all files have been created + syncer := safe.NewSyncer() + for dir := range dirsToSync { + if err := syncer.SyncRecursive(context.Background(), dir); err != nil { + return fmt.Errorf("syncing legacy partition structure: %w", err) + } + } + + m.logger.Info("ID Migration: rolled back partition disk path migration") + + // Undo badger DB key migration + if err := m.undoBadgerDBKeyMigration(hashToPartitionID); err != nil { + return fmt.Errorf("undo badger DB key migration: %w", err) + } + + m.logger.Info("ID Migration: rolled back partition KV pair migration") + + return nil +} + +func (m *IDMigrator) updateMigrationInDB() error { + return m.db.Update(func(txn keyvalue.ReadWriter) error { + // just set any value as the presence of a key is sufficient + if err := txn.Set(partitionIDFormatMigrationKey, []byte(nil)); err != nil { + return fmt.Errorf("set entry: %w", err) + } + + return nil + }) +} + +// CheckMigrationStatus is used to validate whether the entire migration was complete +func (m *IDMigrator) CheckMigrationStatus() (bool, error) { + var migrated bool + err := m.db.View(func(txn keyvalue.ReadWriter) error { + _, err := txn.Get(partitionIDFormatMigrationKey) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + migrated = false + return nil + } + return fmt.Errorf("get: %w", err) + } + migrated = true + return nil + }) + + return migrated, err +} + +func (m *IDMigrator) deleteMigrationInDB() error { + return m.db.Update(func(txn keyvalue.ReadWriter) error { + if err := txn.Delete(partitionIDFormatMigrationKey); err != nil { + return fmt.Errorf("delete partition ID format migration key: %w", err) + } + + return nil + }) +} + +// migrateBadgerDBKeys migrates all badger DB keys from uint64 to string format +func (m *IDMigrator) migrateBadgerDBKeys() error { + return m.db.Update(func(txn keyvalue.ReadWriter) error { + // Create iterator to scan all partition-related keys + iter := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: []byte(storagemgr.PrefixPartition), + }) + defer iter.Close() + + var keysToMigrate []migrationEntry + + for iter.Rewind(); iter.Valid(); iter.Next() { + item := iter.Item() + oldKey := item.Key() + + splitOldKey := bytes.SplitN(oldKey, []byte("/"), 3) + // Partition IDs in legacy format are always 8 bytes + if len(splitOldKey) < 3 { + continue // Skip malformed keys + } + + partitionIDBytes := splitOldKey[1] + // Only migrate if partition ID is exactly 8 bytes (legacy uint64 format) + if len(partitionIDBytes) != 8 { + continue // Already migrated or not in legacy format + } + + keySuffix := splitOldKey[2] // Everything after the 8-byte partition ID + + // Parse as uint64 (legacy format) + val := binary.BigEndian.Uint64(partitionIDBytes) + partitionID := strconv.FormatUint(val, 10) + + // Get the value + value, err := item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("copy value for key %q: %w", oldKey, err) + } + + keysToMigrate = append(keysToMigrate, migrationEntry{ + oldKey: append([]byte(nil), oldKey...), + newKey: createHashedPartitionKey(m.storageName, partitionID, keySuffix), + value: value, + }) + } + + // Migrate the keys using batch writer + batch := m.db.NewWriteBatch() + defer batch.Cancel() + + for _, km := range keysToMigrate { + // Set the new key + if err := batch.Set(km.newKey, km.value); err != nil { + return fmt.Errorf("set new key %q: %w", km.newKey, err) + } + + // Delete the old key + if err := batch.Delete(km.oldKey); err != nil { + return fmt.Errorf("delete old key %q: %w", km.oldKey, err) + } + } + + // Also handle partition assignments + assignmentIter := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: []byte("partition_assignment/"), + }) + defer assignmentIter.Close() + + var assignmentsToUpdate []struct { + key []byte + value []byte + } + + for assignmentIter.Rewind(); assignmentIter.Valid(); assignmentIter.Next() { + item := assignmentIter.Item() + key := item.Key() + + oldValue, err := item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("copy assignment value for key %q: %w", key, err) + } + + // If the value is 8 bytes, it is the legacy uint64 format + if len(oldValue) == 8 { + val := binary.BigEndian.Uint64(oldValue) + newPartitionID := getHashedPartitionID(m.storageName, strconv.FormatUint(val, 10)) + newValue := []byte(newPartitionID) + + assignmentsToUpdate = append(assignmentsToUpdate, struct { + key []byte + value []byte + }{ + key: append([]byte(nil), key...), + value: newValue, + }) + } + } + + for _, assignment := range assignmentsToUpdate { + if err := batch.Set(assignment.key, assignment.value); err != nil { + return fmt.Errorf("update assignment value for key %q: %w", assignment.key, err) + } + } + + if err := batch.Flush(); err != nil { + return fmt.Errorf("flush assignment batch: %w", err) + } + + m.logger.Info(fmt.Sprintf("ID Migration: migrated %d items (%d keys, %d assignments)", + len(keysToMigrate)+len(assignmentsToUpdate), + len(keysToMigrate), + len(assignmentsToUpdate))) + + return nil + }) +} + +// undoBadgerDBKeyMigration reverses the badger DB key migration from hash back to uint64 format +func (m *IDMigrator) undoBadgerDBKeyMigration(hashToPartitionID map[string]string) error { + return m.db.Update(func(txn keyvalue.ReadWriter) error { + // Create iterator to scan all partition-related keys + iter := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: []byte(storagemgr.PrefixPartition), + }) + defer iter.Close() + + // Collect keys to migrate (can't modify while iterating) + var keysToMigrate []migrationEntry + + for iter.Rewind(); iter.Valid(); iter.Next() { + item := iter.Item() + oldKey := item.Key() + + splitOldKey := bytes.SplitN(oldKey, []byte("/"), 3) + if len(splitOldKey) < 3 { + continue // Skip malformed keys + } + + hashBytes := splitOldKey[1] + // Check if this is already in legacy format (8 bytes) + // This can happen if the undo migration runs successfully, + // but fails at cleanup stage. + if len(hashBytes) == 8 { + continue + } + + keySuffix := splitOldKey[2] + + // Look up the partition ID from the hash + partitionID, ok := hashToPartitionID[string(hashBytes)] + if !ok { + return fmt.Errorf("missing ID mapping for %s", hashBytes) + } + + partitionKey, err := createLegacyPartitionKey(partitionID, keySuffix) + if err != nil { + return fmt.Errorf("create legacy partition key: %w", err) + } + + // Get the value + value, err := item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("copy value for key %q: %w", oldKey, err) + } + + keysToMigrate = append(keysToMigrate, migrationEntry{ + oldKey: append([]byte(nil), oldKey...), + newKey: partitionKey, + value: value, + }) + } + + // Migrate the keys using batch writer + batch := m.db.NewWriteBatch() + defer batch.Cancel() + + for _, km := range keysToMigrate { + // Set the new key + if err := batch.Set(km.newKey, km.value); err != nil { + return fmt.Errorf("set legacy key %q: %w", km.newKey, err) + } + + // Delete the old key + if err := batch.Delete(km.oldKey); err != nil { + return fmt.Errorf("delete new key %q: %w", km.oldKey, err) + } + } + + // Reverse migrate partition assignment values from string back to uint64 format + assignmentIter := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: []byte("partition_assignment/"), + }) + defer assignmentIter.Close() + + var assignmentsToUpdate []struct { + key []byte + value []byte + } + + for assignmentIter.Rewind(); assignmentIter.Valid(); assignmentIter.Next() { + item := assignmentIter.Item() + key := item.Key() + + oldValue, err := item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("copy assignment value for key %q: %w", key, err) + } + + // The value is a hash (64 hex chars) + hash := string(oldValue) + if len(hash) != 64 { + continue // Skip non-hash values + } + + // Look up the partition ID from the hash + partitionID, ok := hashToPartitionID[hash] + if !ok { + return fmt.Errorf("missing hash mapping for %s during rollback", hash) + } + + // Convert partition ID to uint64 + val, parseErr := strconv.ParseUint(partitionID, 10, 64) + if parseErr != nil { + continue // Skip if partition ID is not a valid uint64 + } + + // Convert back to uint64 binary format + marshaled := make([]byte, 8) + binary.BigEndian.PutUint64(marshaled, val) + + assignmentsToUpdate = append(assignmentsToUpdate, struct { + key []byte + value []byte + }{ + key: append([]byte(nil), key...), + value: marshaled, + }) + } + + for _, assignment := range assignmentsToUpdate { + if err := batch.Set(assignment.key, assignment.value); err != nil { + return fmt.Errorf("update assignment value for key %q: %w", assignment.key, err) + } + } + + if err := batch.Flush(); err != nil { + return fmt.Errorf("flush batch: %w", err) + } + + return nil + }) +} + +// storeHashMapping stores the hash to partition ID mapping in the database +func (m *IDMigrator) storeHashMapping(hashToPartitionID map[string]string) error { + batch := m.db.NewWriteBatch() + defer batch.Cancel() + + for hash, partitionID := range hashToPartitionID { + key := append(prefixIDMigrationMapping, []byte(hash)...) + if err := batch.Set(key, []byte(partitionID)); err != nil { + return fmt.Errorf("store hash mapping for %s: %w", hash, err) + } + } + + if err := batch.Flush(); err != nil { + return fmt.Errorf("flush batch: %w", err) + } + + return nil +} + +// loadHashMapping loads the hash to partition ID mapping from the database +func (m *IDMigrator) loadHashMapping() (map[string]string, error) { + hashToPartitionID := make(map[string]string) + err := m.db.View(func(txn keyvalue.ReadWriter) error { + iter := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: prefixIDMigrationMapping, + }) + defer iter.Close() + + for iter.Rewind(); iter.Valid(); iter.Next() { + item := iter.Item() + key := item.Key() + + hash := strings.TrimPrefix(string(key), string(prefixIDMigrationMapping)) + + value, err := item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("copy value for hash %s: %w", hash, err) + } + + hashToPartitionID[hash] = string(value) + } + + return nil + }) + + return hashToPartitionID, err +} + +// deleteHashMapping removes all hash mapping entries from the database +func (m *IDMigrator) deleteHashMapping() error { + batch := m.db.NewWriteBatch() + defer batch.Cancel() + + if err := m.db.View(func(txn keyvalue.ReadWriter) error { + iter := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: prefixIDMigrationMapping, + }) + defer iter.Close() + + for iter.Rewind(); iter.Valid(); iter.Next() { + key := append([]byte(nil), iter.Item().Key()...) + if err := batch.Delete(key); err != nil { + return fmt.Errorf("delete hash mapping key %q: %w", key, err) + } + } + + return nil + }); err != nil { + return fmt.Errorf("iterate migration mapping keys: %w", err) + } + + if err := batch.Flush(); err != nil { + return fmt.Errorf("flush batch: %w", err) + } + + return nil +} + +// cleanupHashBasedPartitionStructure removes the hash-based partition structure after undoing the migration +func (m *IDMigrator) cleanupHashBasedPartitionStructure() error { + // Walk through the new structure and remove directories that match the hash pattern + dirsToRemove := make(map[string]struct{}) + syncer := safe.NewSyncer() + + err := filepath.Walk(m.partitionsDir, func(path string, info fs.FileInfo, err error) error { + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + // Skip the base path itself + if path == m.partitionsDir { + return nil + } + + // Only look at directories + if !info.IsDir() { + return nil + } + + relPath, err := filepath.Rel(m.partitionsDir, path) + if err != nil { + return err + } + + // Look for directories in the hash-based structure format: /xx/yy/hash + matches := hashPartitionPattern.FindStringSubmatch(relPath) + if len(matches) > 0 && relPath == matches[0] { + dirsToRemove[path] = struct{}{} + + // Skip processing this directory's contents + return filepath.SkipDir + } + + return nil + }) + if err != nil { + return err + } + + // Now remove all identified directories + for dir := range dirsToRemove { + if err := os.RemoveAll(dir); err != nil { + return fmt.Errorf("failed to remove hash-based directory structure %s: %w", dir, err) + } + + // Sync immediate parent + if err := syncer.SyncParent(context.Background(), dir); err != nil { + return fmt.Errorf("syncing deleted files: %w", err) + } + } + + return nil +} + +func getHashedPartitionID(storageName, partitionID string) string { + targetPartitionName := storage.GetRaftPartitionName(storageName, partitionID) + return fmt.Sprintf("%x", sha256.Sum256([]byte(targetPartitionName))) +} + +func pathToMigrate(storageName, partitionsBase, partitionID string) (newPartitionID string, absolutePath string) { + hash := getHashedPartitionID(storageName, partitionID) + hashedPath := fmt.Sprintf("%s/%s/%s", hash[:2], hash[2:4], hash) + + return hash, + filepath.Join( + partitionsBase, + hashedPath, + "wal", + ) +} + +// createLegacyPartitionKey creates a partition key in the legacy uint64 format +func createLegacyPartitionKey(partitionID string, keySuffix []byte) ([]byte, error) { + // Convert partition ID to uint64 + val, parseErr := strconv.ParseUint(partitionID, 10, 64) + if parseErr != nil { + return nil, fmt.Errorf("partition ID is in wrong format: %s", partitionID) + } + + marshaled := make([]byte, 8) + binary.BigEndian.PutUint64(marshaled, val) + return fmt.Appendf(nil, "%s%s/%s", storagemgr.PrefixPartition, marshaled, keySuffix), nil +} + +// createHashedPartitionKey creates a partition key in the new hashed id format +func createHashedPartitionKey(storageName, partitionID string, keySuffix []byte) []byte { + hashedID := getHashedPartitionID(storageName, partitionID) + return fmt.Appendf(nil, "%s%s/%s", storagemgr.PrefixPartition, hashedID, keySuffix) +} diff --git a/internal/gitaly/storage/storagemgr/partition/partition_id_migration_test.go b/internal/gitaly/storage/storagemgr/partition/partition_id_migration_test.go new file mode 100644 index 0000000000000000000000000000000000000000..3aa3eea8a3e8634f96fbf87b3c66e4efc58c1f79 --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/partition_id_migration_test.go @@ -0,0 +1,559 @@ +package partition + +import ( + "encoding/binary" + "fmt" + "os" + "path/filepath" + "strconv" + "testing" + + "github.com/dgraph-io/badger/v4" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testcfg" +) + +func TestPartitionIDMigrator_Forward(t *testing.T) { + t.Parallel() + + t.Run("successful migration", func(t *testing.T) { + t.Parallel() + // Create a new temp directory for this test + tempDir := testhelper.TempDir(t) + partitionsDir := filepath.Join(tempDir, "partitions") + cfg := testcfg.Build(t, testcfg.WithStorages("testStorage")) + + logger := testhelper.SharedLogger(t) + ctx := testhelper.Context(t) + + // Create partitions directory + err := os.MkdirAll(filepath.Join(tempDir, "partitions"), mode.Directory) + require.NoError(t, err, "Failed to create partitions directory") + + _, db := getTestDBManager(t, ctx, cfg, logger) + + // Setup old format DB keys for both partitions + setupLegacyFormatDBKeys(t, db, "123") + setupLegacyFormatDBKeys(t, db, "456") + + migrator, err := NewIDMigrator(logger, partitionsDir, cfg.Storages[0].Name, db) + require.NoError(t, err) + + // Setup old partition structure using the helper + setupDirectory(t, partitionsDir, map[string]bool{ + "xx/yy/123/wal/0000000000000001/RAFT": false, + "zz/yy/456/wal/0000000000000001/RAFT": false, + }) + + // Run the migration + require.NoError(t, migrator.Forward()) + + // Verify DB keys were migrated for both partitions + verifyNewFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + verifyNewFormatDBKeys(t, db, cfg.Storages[0].Name, "456") + + // Verify old structure is gone, new structure should exist + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/xx": {Mode: mode.Directory}, + "/xx/yy": {Mode: mode.Directory}, // parent of partition 123 still remains + "/zz": {Mode: mode.Directory}, + "/zz/yy": {Mode: mode.Directory}, // parent of partition 456 still remains + "/43": {Mode: mode.Directory}, + "/43/02": {Mode: mode.Directory}, + "/43/02/4302ef58917c73c82721690cbf90475f82c7863dbdf8c73fe012ce5aed77ef8c": {Mode: mode.Directory}, + "/43/02/4302ef58917c73c82721690cbf90475f82c7863dbdf8c73fe012ce5aed77ef8c/wal": {Mode: mode.Directory}, + "/43/02/4302ef58917c73c82721690cbf90475f82c7863dbdf8c73fe012ce5aed77ef8c/wal/0000000000000001": {Mode: mode.Directory}, + "/43/02/4302ef58917c73c82721690cbf90475f82c7863dbdf8c73fe012ce5aed77ef8c/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + "/7e": {Mode: mode.Directory}, + "/7e/8d": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal/0000000000000001": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + }) + + assertPartitionIDMigrationKeyExists(t, migrator.db) + migrated, err := migrator.CheckMigrationStatus() + require.NoError(t, err) + require.True(t, migrated) + }) + + t.Run("can be run multiple times", func(t *testing.T) { + t.Parallel() + + // Create a new temp directory for this test + tempDir := testhelper.TempDir(t) + partitionsDir := filepath.Join(tempDir, "partitions") + + cfg := testcfg.Build(t, testcfg.WithStorages("testStorage")) + + logger := testhelper.SharedLogger(t) + ctx := testhelper.Context(t) + + // Create partitions directory + err := os.MkdirAll(filepath.Join(tempDir, "partitions"), mode.Directory) + require.NoError(t, err, "Failed to create partitions directory") + + _, db := getTestDBManager(t, ctx, cfg, logger) + + setupLegacyFormatDBKeys(t, db, "123") + + migrator, err := NewIDMigrator(logger, partitionsDir, cfg.Storages[0].Name, db) + require.NoError(t, err) + + // Setup structure with read-only directory to cause cleanup error + setupDirectory(t, partitionsDir, map[string]bool{ + "xx/yy/123/wal/0000000000000001": true, // partition directory + }) + + // Make the directory read-only on the parent to cause error during cleanup + legacyPartitionPath := filepath.Join(partitionsDir, "xx/yy") + err = os.Chmod(legacyPartitionPath, 0o500) // read-only + require.NoError(t, err) + + // Run the migration - should complete the migration but fail during cleanup + require.Error(t, migrator.Forward()) + + // Restore permissions for cleanup + assert.NoError(t, os.Chmod(legacyPartitionPath, mode.Directory)) + + // Since cleanup failed, old structure still exists + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/xx": {Mode: mode.Directory}, + "/xx/yy": {Mode: mode.Directory}, + "/xx/yy/123": {Mode: mode.Directory}, // expected to not be cleaned up + "/7e": {Mode: mode.Directory}, + "/7e/8d": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal/0000000000000001": {Mode: mode.Directory}, + }) + assertPartitionIDMigrationKeyAbsent(t, migrator.db) + migrated, err := migrator.CheckMigrationStatus() + require.NoError(t, err) + require.False(t, migrated) + require.NoError(t, migrator.Forward()) + + // Verify DB keys were migrated + verifyNewFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + + // /xx/yy/123 got cleaned up + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/xx": {Mode: mode.Directory}, + "/xx/yy": {Mode: mode.Directory}, + "/7e": {Mode: mode.Directory}, + "/7e/8d": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal/0000000000000001": {Mode: mode.Directory}, + }) + assertPartitionIDMigrationKeyExists(t, migrator.db) + migrated, err = migrator.CheckMigrationStatus() + require.NoError(t, err) + require.True(t, migrated) + }) + + t.Run("rollback on BadgerDB migration failure", func(t *testing.T) { + t.Parallel() + + // Create a new temp directory for this test + tempDir := testhelper.TempDir(t) + partitionsDir := filepath.Join(tempDir, "partitions") + cfg := testcfg.Build(t, testcfg.WithStorages("testStorage")) + + logger := testhelper.SharedLogger(t) + ctx := testhelper.Context(t) + + // Create partitions directory + err := os.MkdirAll(filepath.Join(tempDir, "partitions"), mode.Directory) + require.NoError(t, err, "Failed to create partitions directory") + + _, db := getTestDBManager(t, ctx, cfg, logger) + + // Setup old format DB keys + setupLegacyFormatDBKeys(t, db, "123") + + // Setup old partition structure + setupDirectory(t, partitionsDir, map[string]bool{ + "a6/65/123/wal/0000000000000001/RAFT": false, + }) + + // Wrap the DB to inject failure during batch flush in migrateBadgerDBKeys. + // We skip the first flush which is from the storeHashMapping. + mockDB := &mockDBWithFailure{ + Store: db, + failOnBatchFlush: true, + targetFlushCount: 2, + } + migrator, err := NewIDMigrator(logger, partitionsDir, cfg.Storages[0].Name, mockDB) + require.NoError(t, err) + + // Run the migration - filesystem migration will succeed but BadgerDB migration will fail + // This should trigger Backward() to rollback the filesystem changes + err = migrator.Forward() + require.Error(t, err) + require.NotContains(t, err.Error(), "reversion also failed", "rollback should succeed") + + // Verify that the filesystem was rolled back to the old structure + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/7e": {Mode: mode.Directory}, + "/7e/8d": {Mode: mode.Directory}, + "/a6": {Mode: mode.Directory}, + "/a6/65": {Mode: mode.Directory}, + "/a6/65/123": {Mode: mode.Directory}, + "/a6/65/123/wal": {Mode: mode.Directory}, + "/a6/65/123/wal/0000000000000001": {Mode: mode.Directory}, + "/a6/65/123/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + }) + + // Verify DB keys are still in legacy format + verifyLegacyFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + + // Verify migration status is not set + assertPartitionIDMigrationKeyAbsent(t, migrator.db) + migrated, err := migrator.CheckMigrationStatus() + require.NoError(t, err) + require.False(t, migrated) + + // Trying again should succeed + require.NoError(t, migrator.Forward()) + + // Verify DB keys were migrated + verifyNewFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/a6": {Mode: mode.Directory}, + "/a6/65": {Mode: mode.Directory}, + "/7e": {Mode: mode.Directory}, + "/7e/8d": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal/0000000000000001": {Mode: mode.Directory}, + "/7e/8d/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + }) + assertPartitionIDMigrationKeyExists(t, migrator.db) + migrated, err = migrator.CheckMigrationStatus() + require.NoError(t, err) + require.True(t, migrated) + }) +} + +func TestPartitionIDMigrator_Backward(t *testing.T) { + t.Parallel() + + t.Run("successful migration", func(t *testing.T) { + t.Parallel() + // Create a new temp directory for this test + tempDir := testhelper.TempDir(t) + partitionsDir := filepath.Join(tempDir, "partitions") + cfg := testcfg.Build(t, testcfg.WithStorages("testStorage")) + logger := testhelper.SharedLogger(t) + ctx := testhelper.Context(t) + + // Create partitions directory + err := os.MkdirAll(filepath.Join(tempDir, "partitions"), mode.Directory) + require.NoError(t, err, "Failed to create partitions directory") + + _, db := getTestDBManager(t, ctx, cfg, logger) + + // Setup new format DB keys for both partitions + setupNewFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + setupNewFormatDBKeys(t, db, cfg.Storages[0].Name, "456") + + migrator, err := NewIDMigrator(logger, partitionsDir, cfg.Storages[0].Name, db) + require.NoError(t, err) + // Setup new partition structure using the helper + setupDirectory(t, partitionsDir, map[string]bool{ + "xx/yy/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal/0000000000000001/RAFT": false, + "zz/yy/4302ef58917c73c82721690cbf90475f82c7863dbdf8c73fe012ce5aed77ef8c/wal/0000000000000001/RAFT": false, + }) + + // Run the migration + err = migrator.Backward() + require.NoError(t, err) + + // Verify DB keys were reverted for both partitions + verifyLegacyFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + verifyLegacyFormatDBKeys(t, db, cfg.Storages[0].Name, "456") + + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/xx": {Mode: mode.Directory}, + "/xx/yy": {Mode: mode.Directory}, + "/zz": {Mode: mode.Directory}, + "/zz/yy": {Mode: mode.Directory}, + "/b3": {Mode: mode.Directory}, + "/b3/a8": {Mode: mode.Directory}, + "/b3/a8/456": {Mode: mode.Directory}, + "/b3/a8/456/wal": {Mode: mode.Directory}, + "/b3/a8/456/wal/0000000000000001": {Mode: mode.Directory}, + "/b3/a8/456/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + "/a6": {Mode: mode.Directory}, + "/a6/65": {Mode: mode.Directory}, + "/a6/65/123": {Mode: mode.Directory}, + "/a6/65/123/wal": {Mode: mode.Directory}, + "/a6/65/123/wal/0000000000000001": {Mode: mode.Directory}, + "/a6/65/123/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + }) + assertPartitionIDMigrationKeyAbsent(t, migrator.db) + }) + + t.Run("can be run multiple times", func(t *testing.T) { + t.Parallel() + + // Create a new temp directory for this test + tempDir := testhelper.TempDir(t) + partitionsDir := filepath.Join(tempDir, "partitions") + cfg := testcfg.Build(t, testcfg.WithStorages("testStorage")) + logger := testhelper.SharedLogger(t) + ctx := testhelper.Context(t) + + // Create partitions directory + err := os.MkdirAll(filepath.Join(tempDir, "partitions"), mode.Directory) + require.NoError(t, err, "Failed to create partitions directory") + + _, db := getTestDBManager(t, ctx, cfg, logger) + + // Setup new format DB keys + setupNewFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + + migrator, err := NewIDMigrator(logger, partitionsDir, cfg.Storages[0].Name, db) + require.NoError(t, err) + // Setup structure with read-only directory to cause cleanup error + structure := map[string]bool{ + "qq/yy/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a/wal/0000000000000001/RAFT": false, // partition directory + } + + // Creates all the parent dirs in the path specified + setupDirectory(t, partitionsDir, structure) + + // Make the directory read-only on the parent to cause error during cleanup + newPartitionPath := filepath.Join(partitionsDir, "qq/yy") + require.NoError(t, os.Chmod(newPartitionPath, 0o500)) // read-only + + // Run the migration - should complete the migration but fail during cleanup + require.Error(t, migrator.Backward()) + + _, err = os.Stat(filepath.Join(partitionsDir, "qq/yy/7e8d5c8eba5c5e3f2f0e9a64e0f4537a7892f5991123a79e289af47ed1c7c75a")) + assert.NoError(t, err, "Dir should exist as it didn't get cleaned up") + + // Restore permissions for cleanup + assert.NoError(t, os.Chmod(newPartitionPath, mode.Directory)) + require.NoError(t, migrator.Backward()) + + // Verify DB keys were reverted + verifyLegacyFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/qq": {Mode: mode.Directory}, + "/qq/yy": {Mode: mode.Directory}, + "/a6": {Mode: mode.Directory}, + "/a6/65": {Mode: mode.Directory}, + "/a6/65/123": {Mode: mode.Directory}, + "/a6/65/123/wal": {Mode: mode.Directory}, + "/a6/65/123/wal/0000000000000001": {Mode: mode.Directory}, + "/a6/65/123/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + }) + assertPartitionIDMigrationKeyAbsent(t, migrator.db) + }) +} + +// assertPartitionIDMigrationKeyExists checks if the partition ID format migration key exists in the database +func assertPartitionIDMigrationKeyExists(t *testing.T, db keyvalue.Store) { + require.NoError(t, db.View(func(txn keyvalue.ReadWriter) error { + item, err := txn.Get(partitionIDFormatMigrationKey) + require.NoError(t, err) + require.NotNil(t, item) // Ensure we got a valid item + return nil + })) +} + +// assertPartitionIDMigrationKeyAbsent checks if the partition ID format migration key is absent from the database +func assertPartitionIDMigrationKeyAbsent(t *testing.T, db keyvalue.Store) { + require.NoError(t, db.View(func(txn keyvalue.ReadWriter) error { + _, getErr := txn.Get(partitionIDFormatMigrationKey) + require.ErrorIs(t, getErr, badger.ErrKeyNotFound) + return nil + })) +} + +// setupLegacyFormatDBKeys creates DB keys in the legacy uint64 format for testing +func setupLegacyFormatDBKeys(t *testing.T, db keyvalue.Store, partitionID string) { + t.Helper() + + require.NoError(t, db.Update(func(txn keyvalue.ReadWriter) error { + // Create legacy format partition key (uint64) + legacyPartitionKey, err := createLegacyPartitionKey(partitionID, []byte("test_key")) + require.NoError(t, err) + require.NoError(t, txn.Set(legacyPartitionKey, []byte("test_value"))) + + val, err := strconv.ParseUint(partitionID, 10, 64) + require.NoError(t, err) + marshaled := make([]byte, 8) + binary.BigEndian.PutUint64(marshaled, val) + + // Set a partition assignment key with legacy format value (use unique key per partition) + assignmentKey := []byte("partition_assignment/test_repo_" + partitionID) + require.NoError(t, txn.Set(assignmentKey, marshaled)) + + return nil + })) +} + +// setupNewFormatDBKeys creates DB keys in the new hash format for testing +func setupNewFormatDBKeys(t *testing.T, db keyvalue.Store, storageName, partitionID string) { + t.Helper() + + require.NoError(t, db.Update(func(txn keyvalue.ReadWriter) error { + // Create new format partition key (hash) + newPartitionKey := createHashedPartitionKey(storageName, partitionID, []byte("test_key")) + require.NoError(t, txn.Set(newPartitionKey, []byte("test_value"))) + + // Set a partition assignment key with new format value (use unique key per partition) + hashedPartitionID := getHashedPartitionID(storageName, partitionID) + assignmentKey := []byte("partition_assignment/test_repo_" + partitionID) + require.NoError(t, txn.Set(assignmentKey, []byte(hashedPartitionID))) + + // Store the hash mapping for undo operation + hashMappingKey := append(prefixIDMigrationMapping, []byte(hashedPartitionID)...) + require.NoError(t, txn.Set(hashMappingKey, []byte(partitionID))) + + return nil + })) +} + +// verifyNewFormatDBKeys verifies that DB keys have been migrated to the new hash format +func verifyNewFormatDBKeys(t *testing.T, db keyvalue.Store, storageName, partitionID string) { + t.Helper() + + require.NoError(t, db.View(func(txn keyvalue.ReadWriter) error { + // Verify new format partition key exists (hash-based) + newPartitionKey := createHashedPartitionKey(storageName, partitionID, []byte("test_key")) + + item, err := txn.Get(newPartitionKey) + require.NoError(t, err, "new format partition key should exist") + + value, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, "test_value", string(value)) + + // Verify legacy format partition key does not exist + legacyPartitionKey, err := createLegacyPartitionKey(partitionID, []byte("test_key")) + require.NoError(t, err) + + _, err = txn.Get(legacyPartitionKey) + require.ErrorIs(t, err, badger.ErrKeyNotFound, "legacy format partition key should not exist") + + // Verify partition assignment value is in new hash format (use unique key per partition) + assignmentKey := []byte("partition_assignment/test_repo_" + partitionID) + item, err = txn.Get(assignmentKey) + require.NoError(t, err) + + hashedPartitionID := getHashedPartitionID(storageName, partitionID) + assignmentValue, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, hashedPartitionID, string(assignmentValue), "assignment value should be in new hash format") + + // Verify hash mapping exists + hashMappingKey := append(prefixIDMigrationMapping, []byte(hashedPartitionID)...) + item, err = txn.Get(hashMappingKey) + require.NoError(t, err, "hash mapping should exist") + + mappingValue, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, partitionID, string(mappingValue), "hash mapping should map to original partition ID") + + return nil + })) +} + +// verifyLegacyFormatDBKeys verifies that DB keys have been reverted to the legacy uint64 format +func verifyLegacyFormatDBKeys(t *testing.T, db keyvalue.Store, storageName, partitionID string) { + t.Helper() + + require.NoError(t, db.View(func(txn keyvalue.ReadWriter) error { + // Verify legacy format partition key exists + legacyPartitionKey, err := createLegacyPartitionKey(partitionID, []byte("test_key")) + require.NoError(t, err) + + item, err := txn.Get(legacyPartitionKey) + require.NoError(t, err, "legacy format partition key should exist") + + value, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, "test_value", string(value)) + + // Verify new format partition key does not exist (hash-based) + newPartitionKey := createHashedPartitionKey(storageName, partitionID, []byte("test_key")) + + _, err = txn.Get(newPartitionKey) + require.ErrorIs(t, err, badger.ErrKeyNotFound, "new format partition key should not exist") + + // Verify partition assignment value is in legacy format (use unique key per partition) + assignmentKey := []byte("partition_assignment/test_repo_" + partitionID) + item, err = txn.Get(assignmentKey) + require.NoError(t, err) + + val, err := strconv.ParseUint(partitionID, 10, 64) + require.NoError(t, err) + marshaled := make([]byte, 8) + binary.BigEndian.PutUint64(marshaled, val) + + assignmentValue, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, marshaled, assignmentValue, "assignment value should be in legacy uint64 format") + + // Verify hash mapping does not exist + hashMappingKey := append(prefixIDMigrationMapping, []byte(getHashedPartitionID(storageName, partitionID))...) + _, err = txn.Get(hashMappingKey) + require.ErrorIs(t, err, badger.ErrKeyNotFound, "hash mapping should not exist after backward migration") + + return nil + })) +} + +// mockDBWithFailure wraps a real DB and injects failures at specific points +type mockDBWithFailure struct { + keyvalue.Store + failOnBatchFlush bool + batchFlushCount int + targetFlushCount int // Fail on the Nth flush + hasFailed bool // Track if we've already failed once +} + +func (m *mockDBWithFailure) NewWriteBatch() keyvalue.WriteBatch { + realBatch := m.Store.NewWriteBatch() + return &mockBatchWithFailure{ + WriteBatch: realBatch, + failOnFlush: m.failOnBatchFlush, + parent: m, + } +} + +type mockBatchWithFailure struct { + keyvalue.WriteBatch + failOnFlush bool + parent *mockDBWithFailure +} + +func (m *mockBatchWithFailure) Flush() error { + if m.failOnFlush && !m.parent.hasFailed { + m.parent.batchFlushCount++ + // Fail on the target flush (e.g., the second flush in migrateBadgerDBKeys) + if m.parent.batchFlushCount == m.parent.targetFlushCount { + m.parent.hasFailed = true + return fmt.Errorf("injected batch flush failure") + } + } + return m.WriteBatch.Flush() +}