1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
|
// Copyright 2016 Google LLC. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package log holds the code that is specific to Trillian logs core operation,
// particularly the code for sequencing.
package log
import (
"context"
"fmt"
"reflect"
"sort"
"strconv"
"sync"
"time"
"github.com/google/trillian/extension"
"github.com/google/trillian/monitoring"
"github.com/google/trillian/storage"
"github.com/google/trillian/util/clock"
"github.com/google/trillian/util/election"
"golang.org/x/sync/semaphore"
"k8s.io/klog/v2"
)
var (
// DefaultTimeout is the default timeout on a single log operation run.
DefaultTimeout = 60 * time.Second
once sync.Once
knownLogs monitoring.Gauge
resignations monitoring.Counter
isMaster monitoring.Gauge
signingRuns monitoring.Counter
failedSigningRuns monitoring.Counter
entriesAdded monitoring.Counter
batchesAdded monitoring.Counter
)
func createMetrics(mf monitoring.MetricFactory) {
if mf == nil {
mf = monitoring.InertMetricFactory{}
}
knownLogs = mf.NewGauge("known_logs", "Set to 1 for known logs (whether this instance is master or not)", logIDLabel)
resignations = mf.NewCounter("master_resignations", "Number of mastership resignations", logIDLabel)
isMaster = mf.NewGauge("is_master", "Whether this instance is master (0/1)", logIDLabel)
signingRuns = mf.NewCounter("signing_runs", "Number of times a signing run has succeeded", logIDLabel)
failedSigningRuns = mf.NewCounter("failed_signing_runs", "Number of times a signing run has failed", logIDLabel)
// entriesAdded is the total number of entries that have been added to the
// log during the lifetime of a signer. This allows an operator to determine
// that the queue is empty for a particular log; if signing runs are succeeding
// but nothing is being processed then this counter will stop increasing.
entriesAdded = mf.NewCounter("entries_added", "Number of entries added to the log", logIDLabel)
// batchesAdded is the number of times a signing run caused entries to be
// integrated into the log. The value batchesAdded / signingRuns is an
// indication of how often the signer runs but does no work. The value of
// entriesAdded / batchesAdded is average batch size. These can be used for
// tuning sequencing or evaluating performance.
batchesAdded = mf.NewCounter("batches_added", "Number of times a non zero number of entries was added", logIDLabel)
}
// Operation defines a task that operates on a log. Examples are scheduling, signing,
// consistency checking or cleanup.
type Operation interface {
// ExecutePass performs a single pass of processing on a single log. It returns
// a count of items processed (for logging) and an error.
ExecutePass(ctx context.Context, logID int64, info *OperationInfo) (int, error)
}
// OperationInfo bundles up information needed for running a set of Operations.
type OperationInfo struct {
// Registry provides access to Trillian storage.
Registry extension.Registry
// The following parameters are passed to individual Operations.
// BatchSize is the batch size to be passed to tasks run by this manager.
BatchSize int
// TimeSource should be used by the Operation to allow mocking for tests.
TimeSource clock.TimeSource
// The following parameters govern the overall scheduling of Operations
// by a OperationManager.
// Election-related configuration. Copied for each log.
ElectionConfig election.RunnerConfig
// RunInterval is the time between starting batches of processing. If a
// batch takes longer than this interval to complete, the next batch
// will start immediately.
RunInterval time.Duration
// NumWorkers is the number of worker goroutines to run in parallel.
NumWorkers int
// Timeout sets an optional timeout on each operation run.
// If unset, default to the value of DefaultTimeout.
Timeout time.Duration
}
// OperationManager controls scheduling activities for logs.
type OperationManager struct {
info OperationInfo
// logOperation is the task that gets run for active logs.
logOperation Operation
// runnerWG groups all goroutines with election Runners.
runnerWG sync.WaitGroup
// runnerCancels contains cancel function for each logID election Runner.
runnerCancels map[string]context.CancelFunc
// pendingResignations delivers resignation requests from election Runners.
pendingResignations chan election.Resignation
tracker *election.MasterTracker
// Cache of logID => name. Names are assumed not to change during runtime.
logNames map[int64]string
// A recent list of active logs that this instance is master for.
lastHeld []int64
// idsMutex guards logNames and lastHeld fields.
idsMutex sync.Mutex
}
// NewOperationManager creates a new OperationManager instance.
func NewOperationManager(info OperationInfo, logOperation Operation) *OperationManager {
once.Do(func() {
createMetrics(info.Registry.MetricFactory)
})
if info.Timeout == 0 {
info.Timeout = DefaultTimeout
}
tracker := election.NewMasterTracker(nil, func(id string, v bool) {
val := 0.0
if v {
val = 1.0
}
isMaster.Set(val, id)
})
return &OperationManager{
info: info,
logOperation: logOperation,
runnerCancels: make(map[string]context.CancelFunc),
pendingResignations: make(chan election.Resignation, 100),
tracker: tracker,
logNames: make(map[int64]string),
}
}
// logName maps a logID to a human-readable name, caching results along the way.
// The human-readable name may non-unique so should only be used for diagnostics.
func (o *OperationManager) logName(ctx context.Context, logID int64) string {
o.idsMutex.Lock()
defer o.idsMutex.Unlock()
if name, ok := o.logNames[logID]; ok {
return name
}
tree, err := storage.GetTree(ctx, o.info.Registry.AdminStorage, logID)
if err != nil {
klog.Errorf("%v: failed to get log info: %v", logID, err)
return "<err>"
}
name := tree.DisplayName
if name == "" {
name = fmt.Sprintf("<log-%d>", logID)
}
o.logNames[logID] = name
return o.logNames[logID]
}
func (o *OperationManager) heldInfo(ctx context.Context, logIDs []int64) string {
names := make([]string, 0, len(logIDs))
for _, logID := range logIDs {
names = append(names, o.logName(ctx, logID))
}
sort.Strings(names)
result := "master for:"
for _, name := range names {
result += " " + name
}
return result
}
// masterFor returns the list of log IDs among allIDs that this instance is
// master for. Note that the instance may hold mastership for logs that are not
// listed in allIDs, but such logs are skipped.
func (o *OperationManager) masterFor(ctx context.Context, allIDs []int64) ([]int64, error) {
if o.info.Registry.ElectionFactory == nil {
return allIDs, nil
}
allStringIDs := make([]string, 0, len(allIDs))
for _, id := range allIDs {
s := strconv.FormatInt(id, 10)
allStringIDs = append(allStringIDs, s)
}
// Synchronize the set of log IDs with those we are tracking mastership for.
for _, logID := range allStringIDs {
knownLogs.Set(1, logID)
if o.runnerCancels[logID] == nil {
o.tracker.Set(logID, false) // Initialise tracking for this ID.
o.runnerCancels[logID] = o.runElectionWithRestarts(ctx, logID)
}
}
held := o.tracker.Held()
heldIDs := make([]int64, 0, len(allIDs))
sort.Strings(allStringIDs)
for _, s := range held {
// Skip the log if it is not present in allIDs.
if i := sort.SearchStrings(allStringIDs, s); i >= len(allStringIDs) || allStringIDs[i] != s {
continue
}
id, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse logID %v as int64", s)
}
heldIDs = append(heldIDs, id)
}
return heldIDs, nil
}
// runElectionWithRestarts runs the election/resignation loop for the given log
// indefinitely, until the returned CancelFunc is invoked. Any failure during
// the loop leads to a restart of the loop with a few seconds delay.
//
// TODO(pavelkalinnikov): Restart the whole log operation rather than just the
// election, and have a metric for restarts.
func (o *OperationManager) runElectionWithRestarts(ctx context.Context, logID string) context.CancelFunc {
klog.Infof("create master election goroutine for %v", logID)
cctx, cancel := context.WithCancel(ctx)
run := func(ctx context.Context) {
e, err := o.info.Registry.ElectionFactory.NewElection(ctx, logID)
if err != nil {
klog.Errorf("failed to create election for %v: %v", logID, err)
return
}
// Warning: NewRunner can attempt to modify the config. Make a separate
// copy of the config for each log, to avoid data races.
config := o.info.ElectionConfig
// TODO(pavelkalinnikov): Passing the cancel function is not needed here.
r := election.NewRunner(logID, &config, o.tracker, cancel, e)
r.Run(ctx, o.pendingResignations)
}
o.runnerWG.Add(1)
go func(ctx context.Context) {
defer o.runnerWG.Done()
// Continue only while the context is active.
for ctx.Err() == nil {
run(ctx)
// Sleep before restarts, to not spam the log with errors.
// TODO(pavelkalinnikov): Make the interval configurable.
const pause = time.Duration(5 * time.Second)
if err := clock.SleepSource(ctx, pause, o.info.TimeSource); err != nil {
break // The context has been canceled during the sleep.
}
}
}(cctx)
return cancel
}
// updateHeldIDs updates the process status with the number/list of logs that
// the instance holds mastership for.
func (o *OperationManager) updateHeldIDs(ctx context.Context, logIDs, activeIDs []int64) {
heldInfo := o.heldInfo(ctx, logIDs)
msg := fmt.Sprintf("Acting as master for %d / %d active logs: %s", len(logIDs), len(activeIDs), heldInfo)
o.idsMutex.Lock()
defer o.idsMutex.Unlock()
if !reflect.DeepEqual(logIDs, o.lastHeld) {
o.lastHeld = make([]int64, len(logIDs))
copy(o.lastHeld, logIDs)
klog.Info(msg)
if o.info.Registry.SetProcessStatus != nil {
o.info.Registry.SetProcessStatus(heldInfo)
}
} else {
klog.V(1).Info(msg)
}
}
func (o *OperationManager) getLogsAndExecutePass(ctx context.Context) error {
runCtx, cancel := context.WithTimeout(ctx, o.info.Timeout)
defer cancel()
activeIDs, err := o.info.Registry.LogStorage.GetActiveLogIDs(ctx)
if err != nil {
return fmt.Errorf("failed to list active log IDs: %v", err)
}
// Find the logs we are master for, skipping those logs that are not active,
// e.g. deleted or FROZEN ones.
// TODO(pavelkalinnikov): Resign mastership for the inactive logs.
logIDs, err := o.masterFor(ctx, activeIDs)
if err != nil {
return fmt.Errorf("failed to determine log IDs we're master for: %v", err)
}
o.updateHeldIDs(ctx, logIDs, activeIDs)
executePassForAll(runCtx, &o.info, o.logOperation, logIDs)
return nil
}
// OperationSingle performs a single pass of the manager.
//
// TODO(pavelkalinnikov): Deprecate this because it doesn't clean up any state,
// and is used only for testing.
func (o *OperationManager) OperationSingle(ctx context.Context) {
if err := o.getLogsAndExecutePass(ctx); err != nil {
klog.Errorf("failed to perform operation: %v", err)
}
}
// OperationLoop starts the manager working. It continues until told to exit.
// TODO(Martin2112): No mechanism for error reporting etc., this is OK for v1 but needs work
func (o *OperationManager) OperationLoop(ctx context.Context) {
klog.Infof("Log operation manager starting")
// Outer loop, runs until terminated.
for {
if err := o.operateOnce(ctx); err != nil {
klog.Infof("Log operation manager shutting down")
break
}
}
// Terminate all the election Runners.
for logID, cancel := range o.runnerCancels {
if cancel != nil {
klog.V(1).Infof("cancel election runner for %s", logID)
cancel()
}
}
// Drain any remaining resignations which might have triggered.
close(o.pendingResignations)
for r := range o.pendingResignations {
resignations.Inc(r.ID)
r.Execute(ctx)
}
klog.Infof("wait for termination of election runners...")
o.runnerWG.Wait()
klog.Infof("wait for termination of election runners...done")
}
// operateOnce runs a single round of operation for each of the active logs
// that this instance is master for. Returns an error only if the context is
// canceled, i.e. the operation is being shut down.
func (o *OperationManager) operateOnce(ctx context.Context) error {
// TODO(alcutter): want a child context with deadline here?
start := o.info.TimeSource.Now()
if err := o.getLogsAndExecutePass(ctx); err != nil {
// Suppress the error if ctx is done (ctx.Err != nil) as we're exiting.
if ctx.Err() != nil {
klog.Errorf("failed to execute operation on logs: %v", err)
}
}
klog.V(1).Infof("Log operation manager pass complete")
// Process any pending resignations while there's no activity.
doneResigning := false
for !doneResigning {
select {
case r := <-o.pendingResignations:
resignations.Inc(r.ID)
r.Execute(ctx)
default:
doneResigning = true
}
}
// See if it's time to quit.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Wait for the configured time before going for another pass.
duration := o.info.TimeSource.Now().Sub(start)
wait := o.info.RunInterval - duration
if wait > 0 {
klog.V(1).Infof("Processing started at %v for %v; wait %v before next run", start, duration, wait)
if err := clock.SleepContext(ctx, wait); err != nil {
return err
}
} else {
klog.V(1).Infof("Processing started at %v for %v; start next run immediately", start, duration)
}
return nil
}
// executePassForAll runs ExecutePass of the given operation for each of the
// passed-in logs, allowing up to a configurable number of parallel operations.
func executePassForAll(ctx context.Context, info *OperationInfo, op Operation, logIDs []int64) {
startBatch := info.TimeSource.Now()
numWorkers := info.NumWorkers
if numWorkers <= 0 {
klog.Warning("Running executor with NumWorkers <= 0, assuming 1")
numWorkers = 1
}
klog.V(1).Infof("Running executor with %d worker(s)", numWorkers)
sem := semaphore.NewWeighted(int64(numWorkers))
var wg sync.WaitGroup
for _, logID := range logIDs {
if err := sem.Acquire(ctx, 1); err != nil {
break // Terminate because the context is canceled.
}
wg.Add(1)
go func(logID int64) {
defer wg.Done()
defer sem.Release(1)
if err := executePass(ctx, info, op, logID); err != nil {
klog.Errorf("ExecutePass(%v) failed: %v", logID, err)
}
}(logID)
}
// Wait for the workers to consume all of the logIDs.
wg.Wait()
d := clock.SecondsSince(info.TimeSource, startBatch)
klog.V(1).Infof("Group run completed in %.2f seconds", d)
}
// executePass runs ExecutePass of the given operation for the passed-in log.
func executePass(ctx context.Context, info *OperationInfo, op Operation, logID int64) error {
label := strconv.FormatInt(logID, 10)
start := info.TimeSource.Now()
count, err := op.ExecutePass(ctx, logID, info)
if err != nil {
failedSigningRuns.Inc(label)
return err
}
// This indicates signing activity is proceeding on the logID.
signingRuns.Inc(label)
if count > 0 {
d := clock.SecondsSince(info.TimeSource, start)
klog.Infof("%v: processed %d items in %.2f seconds (%.2f qps)", logID, count, d, float64(count)/d)
entriesAdded.Add(float64(count), label)
batchesAdded.Inc(label)
} else {
klog.V(1).Infof("%v: no items to process", logID)
}
return nil
}
|