/* * MinIO Client (C) 2020 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package replication import ( "bytes" "encoding/xml" "fmt" "math" "strconv" "strings" "time" "unicode/utf8" "github.com/rs/xid" ) var errInvalidFilter = fmt.Errorf("invalid filter") // OptionType specifies operation to be performed on config type OptionType string const ( // AddOption specifies addition of rule to config AddOption OptionType = "Add" // SetOption specifies modification of existing rule to config SetOption OptionType = "Set" // RemoveOption specifies rule options are for removing a rule RemoveOption OptionType = "Remove" // ImportOption is for getting current config ImportOption OptionType = "Import" ) // Options represents options to set a replication configuration rule type Options struct { Op OptionType RoleArn string ID string Prefix string RuleStatus string Priority string TagString string StorageClass string DestBucket string IsTagSet bool IsSCSet bool ReplicateDeletes string // replicate versioned deletes ReplicateDeleteMarkers string // replicate soft deletes ReplicaSync string // replicate replica metadata modifications ExistingObjectReplicate string } // Tags returns a slice of tags for a rule func (opts Options) Tags() ([]Tag, error) { var tagList []Tag tagTokens := strings.Split(opts.TagString, "&") for _, tok := range tagTokens { if tok == "" { break } kv := strings.SplitN(tok, "=", 2) if len(kv) != 2 { return []Tag{}, fmt.Errorf("tags should be entered as comma separated k=v pairs") } tagList = append(tagList, Tag{ Key: kv[0], Value: kv[1], }) } return tagList, nil } // Config - replication configuration specified in // https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html type Config struct { XMLName xml.Name `xml:"ReplicationConfiguration" json:"-"` Rules []Rule `xml:"Rule" json:"Rules"` Role string `xml:"Role" json:"Role"` } // Empty returns true if config is not set func (c *Config) Empty() bool { return len(c.Rules) == 0 } // AddRule adds a new rule to existing replication config. If a rule exists with the // same ID, then the rule is replaced. func (c *Config) AddRule(opts Options) error { priority, err := strconv.Atoi(opts.Priority) if err != nil { return err } var compatSw bool // true if RoleArn is used with new mc client and older minio version prior to multisite if opts.RoleArn != "" { tokens := strings.Split(opts.RoleArn, ":") if len(tokens) != 6 { return fmt.Errorf("invalid format for replication Role Arn: %v", opts.RoleArn) } switch { case strings.HasPrefix(opts.RoleArn, "arn:minio:replication") && len(c.Rules) == 0: c.Role = opts.RoleArn compatSw = true case strings.HasPrefix(opts.RoleArn, "arn:aws:iam"): c.Role = opts.RoleArn default: return fmt.Errorf("RoleArn invalid for AWS replication configuration: %v", opts.RoleArn) } } var status Status // toggle rule status for edit option switch opts.RuleStatus { case "enable": status = Enabled case "disable": status = Disabled default: return fmt.Errorf("rule state should be either [enable|disable]") } tags, err := opts.Tags() if err != nil { return err } andVal := And{ Tags: tags, } filter := Filter{Prefix: opts.Prefix} // only a single tag is set. if opts.Prefix == "" && len(tags) == 1 { filter.Tag = tags[0] } // both prefix and tag are present if len(andVal.Tags) > 1 || opts.Prefix != "" { filter.And = andVal filter.And.Prefix = opts.Prefix filter.Prefix = "" filter.Tag = Tag{} } if opts.ID == "" { opts.ID = xid.New().String() } destBucket := opts.DestBucket // ref https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-arn-format.html if btokens := strings.Split(destBucket, ":"); len(btokens) != 6 { if len(btokens) == 1 && compatSw { destBucket = fmt.Sprintf("arn:aws:s3:::%s", destBucket) } else { return fmt.Errorf("destination bucket needs to be in Arn format") } } dmStatus := Disabled if opts.ReplicateDeleteMarkers != "" { switch opts.ReplicateDeleteMarkers { case "enable": dmStatus = Enabled case "disable": dmStatus = Disabled default: return fmt.Errorf("ReplicateDeleteMarkers should be either enable|disable") } } vDeleteStatus := Disabled if opts.ReplicateDeletes != "" { switch opts.ReplicateDeletes { case "enable": vDeleteStatus = Enabled case "disable": vDeleteStatus = Disabled default: return fmt.Errorf("ReplicateDeletes should be either enable|disable") } } var replicaSync Status // replica sync is by default Enabled, unless specified. switch opts.ReplicaSync { case "enable", "": replicaSync = Enabled case "disable": replicaSync = Disabled default: return fmt.Errorf("replica metadata sync should be either [enable|disable]") } var existingStatus Status if opts.ExistingObjectReplicate != "" { switch opts.ExistingObjectReplicate { case "enable": existingStatus = Enabled case "disable", "": existingStatus = Disabled default: return fmt.Errorf("existingObjectReplicate should be either enable|disable") } } newRule := Rule{ ID: opts.ID, Priority: priority, Status: status, Filter: filter, Destination: Destination{ Bucket: destBucket, StorageClass: opts.StorageClass, }, DeleteMarkerReplication: DeleteMarkerReplication{Status: dmStatus}, DeleteReplication: DeleteReplication{Status: vDeleteStatus}, // MinIO enables replica metadata syncing by default in the case of bi-directional replication to allow // automatic failover as the expectation in this case is that replica and source should be identical. // However AWS leaves this configurable https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-for-metadata-changes.html SourceSelectionCriteria: SourceSelectionCriteria{ ReplicaModifications: ReplicaModifications{ Status: replicaSync, }, }, // By default disable existing object replication unless selected ExistingObjectReplication: ExistingObjectReplication{ Status: existingStatus, }, } // validate rule after overlaying priority for pre-existing rule being disabled. if err := newRule.Validate(); err != nil { return err } // if replication config uses RoleArn, migrate this to the destination element as target ARN for remote bucket for MinIO configuration if c.Role != "" && !strings.HasPrefix(c.Role, "arn:aws:iam") && !compatSw { for i := range c.Rules { c.Rules[i].Destination.Bucket = c.Role } c.Role = "" } for _, rule := range c.Rules { if rule.Priority == newRule.Priority { return fmt.Errorf("priority must be unique. Replication configuration already has a rule with this priority") } if rule.ID == newRule.ID { return fmt.Errorf("a rule exists with this ID") } } c.Rules = append(c.Rules, newRule) return nil } // EditRule modifies an existing rule in replication config func (c *Config) EditRule(opts Options) error { if opts.ID == "" { return fmt.Errorf("rule ID missing") } // if replication config uses RoleArn, migrate this to the destination element as target ARN for remote bucket for non AWS. if c.Role != "" && !strings.HasPrefix(c.Role, "arn:aws:iam") && len(c.Rules) > 1 { for i := range c.Rules { c.Rules[i].Destination.Bucket = c.Role } c.Role = "" } rIdx := -1 var newRule Rule for i, rule := range c.Rules { if rule.ID == opts.ID { rIdx = i newRule = rule break } } if rIdx < 0 { return fmt.Errorf("rule with ID %s not found in replication configuration", opts.ID) } prefixChg := opts.Prefix != newRule.Prefix() if opts.IsTagSet || prefixChg { prefix := newRule.Prefix() if prefix != opts.Prefix { prefix = opts.Prefix } tags := []Tag{newRule.Filter.Tag} if len(newRule.Filter.And.Tags) != 0 { tags = newRule.Filter.And.Tags } var err error if opts.IsTagSet { tags, err = opts.Tags() if err != nil { return err } } andVal := And{ Tags: tags, } filter := Filter{Prefix: prefix} // only a single tag is set. if prefix == "" && len(tags) == 1 { filter.Tag = tags[0] } // both prefix and tag are present if len(andVal.Tags) > 1 || prefix != "" { filter.And = andVal filter.And.Prefix = prefix filter.Prefix = "" filter.Tag = Tag{} } newRule.Filter = filter } // toggle rule status for edit option if opts.RuleStatus != "" { switch opts.RuleStatus { case "enable": newRule.Status = Enabled case "disable": newRule.Status = Disabled default: return fmt.Errorf("rule state should be either [enable|disable]") } } // set DeleteMarkerReplication rule status for edit option if opts.ReplicateDeleteMarkers != "" { switch opts.ReplicateDeleteMarkers { case "enable": newRule.DeleteMarkerReplication.Status = Enabled case "disable": newRule.DeleteMarkerReplication.Status = Disabled default: return fmt.Errorf("ReplicateDeleteMarkers state should be either [enable|disable]") } } // set DeleteReplication rule status for edit option. This is a MinIO specific // option to replicate versioned deletes if opts.ReplicateDeletes != "" { switch opts.ReplicateDeletes { case "enable": newRule.DeleteReplication.Status = Enabled case "disable": newRule.DeleteReplication.Status = Disabled default: return fmt.Errorf("ReplicateDeletes state should be either [enable|disable]") } } if opts.ReplicaSync != "" { switch opts.ReplicaSync { case "enable", "": newRule.SourceSelectionCriteria.ReplicaModifications.Status = Enabled case "disable": newRule.SourceSelectionCriteria.ReplicaModifications.Status = Disabled default: return fmt.Errorf("replica metadata sync should be either [enable|disable]") } } if opts.ExistingObjectReplicate != "" { switch opts.ExistingObjectReplicate { case "enable": newRule.ExistingObjectReplication.Status = Enabled case "disable": newRule.ExistingObjectReplication.Status = Disabled default: return fmt.Errorf("existingObjectsReplication state should be either [enable|disable]") } } if opts.IsSCSet { newRule.Destination.StorageClass = opts.StorageClass } if opts.Priority != "" { priority, err := strconv.Atoi(opts.Priority) if err != nil { return err } newRule.Priority = priority } if opts.DestBucket != "" { destBucket := opts.DestBucket // ref https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-arn-format.html if btokens := strings.Split(opts.DestBucket, ":"); len(btokens) != 6 { return fmt.Errorf("destination bucket needs to be in Arn format") } newRule.Destination.Bucket = destBucket } // validate rule if err := newRule.Validate(); err != nil { return err } // ensure priority and destination bucket restrictions are not violated for idx, rule := range c.Rules { if rule.Priority == newRule.Priority && rIdx != idx { return fmt.Errorf("priority must be unique. Replication configuration already has a rule with this priority") } if rule.Destination.Bucket != newRule.Destination.Bucket && rule.ID == newRule.ID { if c.Role == newRule.Destination.Bucket { continue } return fmt.Errorf("invalid destination bucket for this rule") } } c.Rules[rIdx] = newRule return nil } // RemoveRule removes a rule from replication config. func (c *Config) RemoveRule(opts Options) error { var newRules []Rule ruleFound := false for _, rule := range c.Rules { if rule.ID != opts.ID { newRules = append(newRules, rule) continue } ruleFound = true } if !ruleFound { return fmt.Errorf("Rule with ID %s not found", opts.ID) } if len(newRules) == 0 { return fmt.Errorf("replication configuration should have at least one rule") } c.Rules = newRules return nil } // Rule - a rule for replication configuration. type Rule struct { XMLName xml.Name `xml:"Rule" json:"-"` ID string `xml:"ID,omitempty"` Status Status `xml:"Status"` Priority int `xml:"Priority"` DeleteMarkerReplication DeleteMarkerReplication `xml:"DeleteMarkerReplication"` DeleteReplication DeleteReplication `xml:"DeleteReplication"` Destination Destination `xml:"Destination"` Filter Filter `xml:"Filter" json:"Filter"` SourceSelectionCriteria SourceSelectionCriteria `xml:"SourceSelectionCriteria" json:"SourceSelectionCriteria"` ExistingObjectReplication ExistingObjectReplication `xml:"ExistingObjectReplication,omitempty" json:"ExistingObjectReplication,omitempty"` } // Validate validates the rule for correctness func (r Rule) Validate() error { if err := r.validateID(); err != nil { return err } if err := r.validateStatus(); err != nil { return err } if err := r.validateFilter(); err != nil { return err } if r.Priority < 0 && r.Status == Enabled { return fmt.Errorf("priority must be set for the rule") } if err := r.validateStatus(); err != nil { return err } return r.ExistingObjectReplication.Validate() } // validateID - checks if ID is valid or not. func (r Rule) validateID() error { // cannot be longer than 255 characters if len(r.ID) > 255 { return fmt.Errorf("ID must be less than 255 characters") } return nil } // validateStatus - checks if status is valid or not. func (r Rule) validateStatus() error { // Status can't be empty if len(r.Status) == 0 { return fmt.Errorf("status cannot be empty") } // Status must be one of Enabled or Disabled if r.Status != Enabled && r.Status != Disabled { return fmt.Errorf("status must be set to either Enabled or Disabled") } return nil } func (r Rule) validateFilter() error { return r.Filter.Validate() } // Prefix - a rule can either have prefix under or under // . This method returns the prefix from the // location where it is available func (r Rule) Prefix() string { if r.Filter.Prefix != "" { return r.Filter.Prefix } return r.Filter.And.Prefix } // Tags - a rule can either have tag under or under // . This method returns all the tags from the // rule in the format tag1=value1&tag2=value2 func (r Rule) Tags() string { ts := []Tag{r.Filter.Tag} if len(r.Filter.And.Tags) != 0 { ts = r.Filter.And.Tags } var buf bytes.Buffer for _, t := range ts { if buf.Len() > 0 { buf.WriteString("&") } buf.WriteString(t.String()) } return buf.String() } // Filter - a filter for a replication configuration Rule. type Filter struct { XMLName xml.Name `xml:"Filter" json:"-"` Prefix string `json:"Prefix,omitempty"` And And `xml:"And,omitempty" json:"And,omitempty"` Tag Tag `xml:"Tag,omitempty" json:"Tag,omitempty"` } // Validate - validates the filter element func (f Filter) Validate() error { // A Filter must have exactly one of Prefix, Tag, or And specified. if !f.And.isEmpty() { if f.Prefix != "" { return errInvalidFilter } if !f.Tag.IsEmpty() { return errInvalidFilter } } if f.Prefix != "" { if !f.Tag.IsEmpty() { return errInvalidFilter } } if !f.Tag.IsEmpty() { if err := f.Tag.Validate(); err != nil { return err } } return nil } // Tag - a tag for a replication configuration Rule filter. type Tag struct { XMLName xml.Name `json:"-"` Key string `xml:"Key,omitempty" json:"Key,omitempty"` Value string `xml:"Value,omitempty" json:"Value,omitempty"` } func (tag Tag) String() string { if tag.IsEmpty() { return "" } return tag.Key + "=" + tag.Value } // IsEmpty returns whether this tag is empty or not. func (tag Tag) IsEmpty() bool { return tag.Key == "" } // Validate checks this tag. func (tag Tag) Validate() error { if len(tag.Key) == 0 || utf8.RuneCountInString(tag.Key) > 128 { return fmt.Errorf("invalid Tag Key") } if utf8.RuneCountInString(tag.Value) > 256 { return fmt.Errorf("invalid Tag Value") } return nil } // Destination - destination in ReplicationConfiguration. type Destination struct { XMLName xml.Name `xml:"Destination" json:"-"` Bucket string `xml:"Bucket" json:"Bucket"` StorageClass string `xml:"StorageClass,omitempty" json:"StorageClass,omitempty"` } // And - a tag to combine a prefix and multiple tags for replication configuration rule. type And struct { XMLName xml.Name `xml:"And,omitempty" json:"-"` Prefix string `xml:"Prefix,omitempty" json:"Prefix,omitempty"` Tags []Tag `xml:"Tag,omitempty" json:"Tag,omitempty"` } // isEmpty returns true if Tags field is null func (a And) isEmpty() bool { return len(a.Tags) == 0 && a.Prefix == "" } // Status represents Enabled/Disabled status type Status string // Supported status types const ( Enabled Status = "Enabled" Disabled Status = "Disabled" ) // DeleteMarkerReplication - whether delete markers are replicated - https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html type DeleteMarkerReplication struct { Status Status `xml:"Status" json:"Status"` // should be set to "Disabled" by default } // IsEmpty returns true if DeleteMarkerReplication is not set func (d DeleteMarkerReplication) IsEmpty() bool { return len(d.Status) == 0 } // DeleteReplication - whether versioned deletes are replicated - this // is a MinIO specific extension type DeleteReplication struct { Status Status `xml:"Status" json:"Status"` // should be set to "Disabled" by default } // IsEmpty returns true if DeleteReplication is not set func (d DeleteReplication) IsEmpty() bool { return len(d.Status) == 0 } // ReplicaModifications specifies if replica modification sync is enabled type ReplicaModifications struct { Status Status `xml:"Status" json:"Status"` // should be set to "Enabled" by default } // SourceSelectionCriteria - specifies additional source selection criteria in ReplicationConfiguration. type SourceSelectionCriteria struct { ReplicaModifications ReplicaModifications `xml:"ReplicaModifications" json:"ReplicaModifications"` } // IsValid - checks whether SourceSelectionCriteria is valid or not. func (s SourceSelectionCriteria) IsValid() bool { return s.ReplicaModifications.Status == Enabled || s.ReplicaModifications.Status == Disabled } // Validate source selection criteria func (s SourceSelectionCriteria) Validate() error { if (s == SourceSelectionCriteria{}) { return nil } if !s.IsValid() { return fmt.Errorf("invalid ReplicaModification status") } return nil } // ExistingObjectReplication - whether existing object replication is enabled type ExistingObjectReplication struct { Status Status `xml:"Status"` // should be set to "Disabled" by default } // IsEmpty returns true if DeleteMarkerReplication is not set func (e ExistingObjectReplication) IsEmpty() bool { return len(e.Status) == 0 } // Validate validates whether the status is disabled. func (e ExistingObjectReplication) Validate() error { if e.IsEmpty() { return nil } if e.Status != Disabled && e.Status != Enabled { return fmt.Errorf("invalid ExistingObjectReplication status") } return nil } // TargetMetrics represents inline replication metrics // such as pending, failed and completed bytes in total for a bucket remote target type TargetMetrics struct { // Completed count ReplicatedCount uint64 `json:"replicationCount,omitempty"` // Completed size in bytes ReplicatedSize uint64 `json:"completedReplicationSize,omitempty"` // Bandwidth limit in bytes/sec for this target BandWidthLimitInBytesPerSecond int64 `json:"limitInBits,omitempty"` // Current bandwidth used in bytes/sec for this target CurrentBandwidthInBytesPerSecond float64 `json:"currentBandwidth,omitempty"` // errors seen in replication in last minute, hour and total Failed TimedErrStats `json:"failed,omitempty"` // Deprecated fields // Pending size in bytes PendingSize uint64 `json:"pendingReplicationSize,omitempty"` // Total Replica size in bytes ReplicaSize uint64 `json:"replicaSize,omitempty"` // Failed size in bytes FailedSize uint64 `json:"failedReplicationSize,omitempty"` // Total number of pending operations including metadata updates PendingCount uint64 `json:"pendingReplicationCount,omitempty"` // Total number of failed operations including metadata updates FailedCount uint64 `json:"failedReplicationCount,omitempty"` } // Metrics represents inline replication metrics for a bucket. type Metrics struct { Stats map[string]TargetMetrics // Completed size in bytes across targets ReplicatedSize uint64 `json:"completedReplicationSize,omitempty"` // Total Replica size in bytes across targets ReplicaSize uint64 `json:"replicaSize,omitempty"` // Total Replica counts ReplicaCount int64 `json:"replicaCount,omitempty"` // Total Replicated count ReplicatedCount int64 `json:"replicationCount,omitempty"` // errors seen in replication in last minute, hour and total Errors TimedErrStats `json:"failed,omitempty"` // Total number of entries that are queued for replication QStats InQueueMetric `json:"queued"` // Deprecated fields // Total Pending size in bytes across targets PendingSize uint64 `json:"pendingReplicationSize,omitempty"` // Failed size in bytes across targets FailedSize uint64 `json:"failedReplicationSize,omitempty"` // Total number of pending operations including metadata updates across targets PendingCount uint64 `json:"pendingReplicationCount,omitempty"` // Total number of failed operations including metadata updates across targets FailedCount uint64 `json:"failedReplicationCount,omitempty"` } // RStat - has count and bytes for replication metrics type RStat struct { Count float64 `json:"count"` Bytes int64 `json:"bytes"` } // Add two RStat func (r RStat) Add(r1 RStat) RStat { return RStat{ Count: r.Count + r1.Count, Bytes: r.Bytes + r1.Bytes, } } // TimedErrStats holds error stats for a time period type TimedErrStats struct { LastMinute RStat `json:"lastMinute"` LastHour RStat `json:"lastHour"` Totals RStat `json:"totals"` } // Add two TimedErrStats func (te TimedErrStats) Add(o TimedErrStats) TimedErrStats { return TimedErrStats{ LastMinute: te.LastMinute.Add(o.LastMinute), LastHour: te.LastHour.Add(o.LastHour), Totals: te.Totals.Add(o.Totals), } } // ResyncTargetsInfo provides replication target information to resync replicated data. type ResyncTargetsInfo struct { Targets []ResyncTarget `json:"target,omitempty"` } // ResyncTarget provides the replica resources and resetID to initiate resync replication. type ResyncTarget struct { Arn string `json:"arn"` ResetID string `json:"resetid"` StartTime time.Time `json:"startTime,omitempty"` EndTime time.Time `json:"endTime,omitempty"` // Status of resync operation ResyncStatus string `json:"resyncStatus,omitempty"` // Completed size in bytes ReplicatedSize int64 `json:"completedReplicationSize,omitempty"` // Failed size in bytes FailedSize int64 `json:"failedReplicationSize,omitempty"` // Total number of failed operations FailedCount int64 `json:"failedReplicationCount,omitempty"` // Total number of completed operations ReplicatedCount int64 `json:"replicationCount,omitempty"` // Last bucket/object replicated. Bucket string `json:"bucket,omitempty"` Object string `json:"object,omitempty"` } // XferStats holds transfer rate info for uploads/sec type XferStats struct { AvgRate float64 `json:"avgRate"` PeakRate float64 `json:"peakRate"` CurrRate float64 `json:"currRate"` } // Merge two XferStats func (x *XferStats) Merge(x1 XferStats) { x.AvgRate += x1.AvgRate x.PeakRate += x1.PeakRate x.CurrRate += x1.CurrRate } // QStat holds count and bytes for objects in replication queue type QStat struct { Count float64 `json:"count"` Bytes float64 `json:"bytes"` } // Add 2 QStat entries func (q *QStat) Add(q1 QStat) { q.Count += q1.Count q.Bytes += q1.Bytes } // InQueueMetric holds stats for objects in replication queue type InQueueMetric struct { Curr QStat `json:"curr" msg:"cq"` Avg QStat `json:"avg" msg:"aq"` Max QStat `json:"peak" msg:"pq"` } // MetricName name of replication metric type MetricName string const ( // Large is a metric name for large objects >=128MiB Large MetricName = "Large" // Small is a metric name for objects <128MiB size Small MetricName = "Small" // Total is a metric name for total objects Total MetricName = "Total" ) // WorkerStat has stats on number of replication workers type WorkerStat struct { Curr int32 `json:"curr"` Avg float32 `json:"avg"` Max int32 `json:"max"` } // ReplMRFStats holds stats of MRF backlog saved to disk in the last 5 minutes // and number of entries that failed replication after 3 retries type ReplMRFStats struct { LastFailedCount uint64 `json:"failedCount_last5min"` // Count of unreplicated entries that were dropped after MRF retry limit reached since cluster start. TotalDroppedCount uint64 `json:"droppedCount_since_uptime"` // Bytes of unreplicated entries that were dropped after MRF retry limit reached since cluster start. TotalDroppedBytes uint64 `json:"droppedBytes_since_uptime"` } // ReplQNodeStats holds stats for a node in replication queue type ReplQNodeStats struct { NodeName string `json:"nodeName"` Uptime int64 `json:"uptime"` Workers WorkerStat `json:"activeWorkers"` XferStats map[MetricName]XferStats `json:"transferSummary"` TgtXferStats map[string]map[MetricName]XferStats `json:"tgtTransferStats"` QStats InQueueMetric `json:"queueStats"` MRFStats ReplMRFStats `json:"mrfStats"` } // ReplQueueStats holds stats for replication queue across nodes type ReplQueueStats struct { Nodes []ReplQNodeStats `json:"nodes"` } // Workers returns number of workers across all nodes func (q ReplQueueStats) Workers() (tot WorkerStat) { for _, node := range q.Nodes { tot.Avg += node.Workers.Avg tot.Curr += node.Workers.Curr if tot.Max < node.Workers.Max { tot.Max = node.Workers.Max } } if len(q.Nodes) > 0 { tot.Avg /= float32(len(q.Nodes)) tot.Curr /= int32(len(q.Nodes)) } return tot } // qStatSummary returns cluster level stats for objects in replication queue func (q ReplQueueStats) qStatSummary() InQueueMetric { m := InQueueMetric{} for _, v := range q.Nodes { m.Avg.Add(v.QStats.Avg) m.Curr.Add(v.QStats.Curr) if m.Max.Count < v.QStats.Max.Count { m.Max.Add(v.QStats.Max) } } return m } // ReplQStats holds stats for objects in replication queue type ReplQStats struct { Uptime int64 `json:"uptime"` Workers WorkerStat `json:"workers"` XferStats map[MetricName]XferStats `json:"xferStats"` TgtXferStats map[string]map[MetricName]XferStats `json:"tgtXferStats"` QStats InQueueMetric `json:"qStats"` MRFStats ReplMRFStats `json:"mrfStats"` } // QStats returns cluster level stats for objects in replication queue func (q ReplQueueStats) QStats() (r ReplQStats) { r.QStats = q.qStatSummary() r.XferStats = make(map[MetricName]XferStats) r.TgtXferStats = make(map[string]map[MetricName]XferStats) r.Workers = q.Workers() for _, node := range q.Nodes { for arn := range node.TgtXferStats { xmap, ok := node.TgtXferStats[arn] if !ok { xmap = make(map[MetricName]XferStats) } for m, v := range xmap { st, ok := r.XferStats[m] if !ok { st = XferStats{} } st.AvgRate += v.AvgRate st.CurrRate += v.CurrRate st.PeakRate = math.Max(st.PeakRate, v.PeakRate) if _, ok := r.TgtXferStats[arn]; !ok { r.TgtXferStats[arn] = make(map[MetricName]XferStats) } r.TgtXferStats[arn][m] = st } } for k, v := range node.XferStats { st, ok := r.XferStats[k] if !ok { st = XferStats{} } st.AvgRate += v.AvgRate st.CurrRate += v.CurrRate st.PeakRate = math.Max(st.PeakRate, v.PeakRate) r.XferStats[k] = st } r.MRFStats.LastFailedCount += node.MRFStats.LastFailedCount r.MRFStats.TotalDroppedCount += node.MRFStats.TotalDroppedCount r.MRFStats.TotalDroppedBytes += node.MRFStats.TotalDroppedBytes r.Uptime += node.Uptime } if len(q.Nodes) > 0 { r.Uptime /= int64(len(q.Nodes)) // average uptime } return } // MetricsV2 represents replication metrics for a bucket. type MetricsV2 struct { Uptime int64 `json:"uptime"` CurrentStats Metrics `json:"currStats"` QueueStats ReplQueueStats `json:"queueStats"` }