mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2024-12-27 09:36:31 +00:00
975 lines
29 KiB
Go
975 lines
29 KiB
Go
/*
|
|
* MinIO Client (C) 2020 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package replication
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/xml"
|
|
"fmt"
|
|
"math"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
"unicode/utf8"
|
|
|
|
"github.com/rs/xid"
|
|
)
|
|
|
|
var errInvalidFilter = fmt.Errorf("invalid filter")
|
|
|
|
// OptionType specifies operation to be performed on config
|
|
type OptionType string
|
|
|
|
const (
|
|
// AddOption specifies addition of rule to config
|
|
AddOption OptionType = "Add"
|
|
// SetOption specifies modification of existing rule to config
|
|
SetOption OptionType = "Set"
|
|
|
|
// RemoveOption specifies rule options are for removing a rule
|
|
RemoveOption OptionType = "Remove"
|
|
// ImportOption is for getting current config
|
|
ImportOption OptionType = "Import"
|
|
)
|
|
|
|
// Options represents options to set a replication configuration rule
|
|
type Options struct {
|
|
Op OptionType
|
|
RoleArn string
|
|
ID string
|
|
Prefix string
|
|
RuleStatus string
|
|
Priority string
|
|
TagString string
|
|
StorageClass string
|
|
DestBucket string
|
|
IsTagSet bool
|
|
IsSCSet bool
|
|
ReplicateDeletes string // replicate versioned deletes
|
|
ReplicateDeleteMarkers string // replicate soft deletes
|
|
ReplicaSync string // replicate replica metadata modifications
|
|
ExistingObjectReplicate string
|
|
}
|
|
|
|
// Tags returns a slice of tags for a rule
|
|
func (opts Options) Tags() ([]Tag, error) {
|
|
var tagList []Tag
|
|
tagTokens := strings.Split(opts.TagString, "&")
|
|
for _, tok := range tagTokens {
|
|
if tok == "" {
|
|
break
|
|
}
|
|
kv := strings.SplitN(tok, "=", 2)
|
|
if len(kv) != 2 {
|
|
return []Tag{}, fmt.Errorf("tags should be entered as comma separated k=v pairs")
|
|
}
|
|
tagList = append(tagList, Tag{
|
|
Key: kv[0],
|
|
Value: kv[1],
|
|
})
|
|
}
|
|
return tagList, nil
|
|
}
|
|
|
|
// Config - replication configuration specified in
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html
|
|
type Config struct {
|
|
XMLName xml.Name `xml:"ReplicationConfiguration" json:"-"`
|
|
Rules []Rule `xml:"Rule" json:"Rules"`
|
|
Role string `xml:"Role" json:"Role"`
|
|
}
|
|
|
|
// Empty returns true if config is not set
|
|
func (c *Config) Empty() bool {
|
|
return len(c.Rules) == 0
|
|
}
|
|
|
|
// AddRule adds a new rule to existing replication config. If a rule exists with the
|
|
// same ID, then the rule is replaced.
|
|
func (c *Config) AddRule(opts Options) error {
|
|
priority, err := strconv.Atoi(opts.Priority)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var compatSw bool // true if RoleArn is used with new mc client and older minio version prior to multisite
|
|
if opts.RoleArn != "" {
|
|
tokens := strings.Split(opts.RoleArn, ":")
|
|
if len(tokens) != 6 {
|
|
return fmt.Errorf("invalid format for replication Role Arn: %v", opts.RoleArn)
|
|
}
|
|
switch {
|
|
case strings.HasPrefix(opts.RoleArn, "arn:minio:replication") && len(c.Rules) == 0:
|
|
c.Role = opts.RoleArn
|
|
compatSw = true
|
|
case strings.HasPrefix(opts.RoleArn, "arn:aws:iam"):
|
|
c.Role = opts.RoleArn
|
|
default:
|
|
return fmt.Errorf("RoleArn invalid for AWS replication configuration: %v", opts.RoleArn)
|
|
}
|
|
}
|
|
|
|
var status Status
|
|
// toggle rule status for edit option
|
|
switch opts.RuleStatus {
|
|
case "enable":
|
|
status = Enabled
|
|
case "disable":
|
|
status = Disabled
|
|
default:
|
|
return fmt.Errorf("rule state should be either [enable|disable]")
|
|
}
|
|
|
|
tags, err := opts.Tags()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
andVal := And{
|
|
Tags: tags,
|
|
}
|
|
filter := Filter{Prefix: opts.Prefix}
|
|
// only a single tag is set.
|
|
if opts.Prefix == "" && len(tags) == 1 {
|
|
filter.Tag = tags[0]
|
|
}
|
|
// both prefix and tag are present
|
|
if len(andVal.Tags) > 1 || opts.Prefix != "" {
|
|
filter.And = andVal
|
|
filter.And.Prefix = opts.Prefix
|
|
filter.Prefix = ""
|
|
filter.Tag = Tag{}
|
|
}
|
|
if opts.ID == "" {
|
|
opts.ID = xid.New().String()
|
|
}
|
|
|
|
destBucket := opts.DestBucket
|
|
// ref https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-arn-format.html
|
|
if btokens := strings.Split(destBucket, ":"); len(btokens) != 6 {
|
|
if len(btokens) == 1 && compatSw {
|
|
destBucket = fmt.Sprintf("arn:aws:s3:::%s", destBucket)
|
|
} else {
|
|
return fmt.Errorf("destination bucket needs to be in Arn format")
|
|
}
|
|
}
|
|
dmStatus := Disabled
|
|
if opts.ReplicateDeleteMarkers != "" {
|
|
switch opts.ReplicateDeleteMarkers {
|
|
case "enable":
|
|
dmStatus = Enabled
|
|
case "disable":
|
|
dmStatus = Disabled
|
|
default:
|
|
return fmt.Errorf("ReplicateDeleteMarkers should be either enable|disable")
|
|
}
|
|
}
|
|
|
|
vDeleteStatus := Disabled
|
|
if opts.ReplicateDeletes != "" {
|
|
switch opts.ReplicateDeletes {
|
|
case "enable":
|
|
vDeleteStatus = Enabled
|
|
case "disable":
|
|
vDeleteStatus = Disabled
|
|
default:
|
|
return fmt.Errorf("ReplicateDeletes should be either enable|disable")
|
|
}
|
|
}
|
|
var replicaSync Status
|
|
// replica sync is by default Enabled, unless specified.
|
|
switch opts.ReplicaSync {
|
|
case "enable", "":
|
|
replicaSync = Enabled
|
|
case "disable":
|
|
replicaSync = Disabled
|
|
default:
|
|
return fmt.Errorf("replica metadata sync should be either [enable|disable]")
|
|
}
|
|
|
|
var existingStatus Status
|
|
if opts.ExistingObjectReplicate != "" {
|
|
switch opts.ExistingObjectReplicate {
|
|
case "enable":
|
|
existingStatus = Enabled
|
|
case "disable", "":
|
|
existingStatus = Disabled
|
|
default:
|
|
return fmt.Errorf("existingObjectReplicate should be either enable|disable")
|
|
}
|
|
}
|
|
newRule := Rule{
|
|
ID: opts.ID,
|
|
Priority: priority,
|
|
Status: status,
|
|
Filter: filter,
|
|
Destination: Destination{
|
|
Bucket: destBucket,
|
|
StorageClass: opts.StorageClass,
|
|
},
|
|
DeleteMarkerReplication: DeleteMarkerReplication{Status: dmStatus},
|
|
DeleteReplication: DeleteReplication{Status: vDeleteStatus},
|
|
// MinIO enables replica metadata syncing by default in the case of bi-directional replication to allow
|
|
// automatic failover as the expectation in this case is that replica and source should be identical.
|
|
// However AWS leaves this configurable https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-for-metadata-changes.html
|
|
SourceSelectionCriteria: SourceSelectionCriteria{
|
|
ReplicaModifications: ReplicaModifications{
|
|
Status: replicaSync,
|
|
},
|
|
},
|
|
// By default disable existing object replication unless selected
|
|
ExistingObjectReplication: ExistingObjectReplication{
|
|
Status: existingStatus,
|
|
},
|
|
}
|
|
|
|
// validate rule after overlaying priority for pre-existing rule being disabled.
|
|
if err := newRule.Validate(); err != nil {
|
|
return err
|
|
}
|
|
// if replication config uses RoleArn, migrate this to the destination element as target ARN for remote bucket for MinIO configuration
|
|
if c.Role != "" && !strings.HasPrefix(c.Role, "arn:aws:iam") && !compatSw {
|
|
for i := range c.Rules {
|
|
c.Rules[i].Destination.Bucket = c.Role
|
|
}
|
|
c.Role = ""
|
|
}
|
|
|
|
for _, rule := range c.Rules {
|
|
if rule.Priority == newRule.Priority {
|
|
return fmt.Errorf("priority must be unique. Replication configuration already has a rule with this priority")
|
|
}
|
|
if rule.ID == newRule.ID {
|
|
return fmt.Errorf("a rule exists with this ID")
|
|
}
|
|
}
|
|
|
|
c.Rules = append(c.Rules, newRule)
|
|
return nil
|
|
}
|
|
|
|
// EditRule modifies an existing rule in replication config
|
|
func (c *Config) EditRule(opts Options) error {
|
|
if opts.ID == "" {
|
|
return fmt.Errorf("rule ID missing")
|
|
}
|
|
// if replication config uses RoleArn, migrate this to the destination element as target ARN for remote bucket for non AWS.
|
|
if c.Role != "" && !strings.HasPrefix(c.Role, "arn:aws:iam") && len(c.Rules) > 1 {
|
|
for i := range c.Rules {
|
|
c.Rules[i].Destination.Bucket = c.Role
|
|
}
|
|
c.Role = ""
|
|
}
|
|
|
|
rIdx := -1
|
|
var newRule Rule
|
|
for i, rule := range c.Rules {
|
|
if rule.ID == opts.ID {
|
|
rIdx = i
|
|
newRule = rule
|
|
break
|
|
}
|
|
}
|
|
if rIdx < 0 {
|
|
return fmt.Errorf("rule with ID %s not found in replication configuration", opts.ID)
|
|
}
|
|
prefixChg := opts.Prefix != newRule.Prefix()
|
|
if opts.IsTagSet || prefixChg {
|
|
prefix := newRule.Prefix()
|
|
if prefix != opts.Prefix {
|
|
prefix = opts.Prefix
|
|
}
|
|
tags := []Tag{newRule.Filter.Tag}
|
|
if len(newRule.Filter.And.Tags) != 0 {
|
|
tags = newRule.Filter.And.Tags
|
|
}
|
|
var err error
|
|
if opts.IsTagSet {
|
|
tags, err = opts.Tags()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
andVal := And{
|
|
Tags: tags,
|
|
}
|
|
|
|
filter := Filter{Prefix: prefix}
|
|
// only a single tag is set.
|
|
if prefix == "" && len(tags) == 1 {
|
|
filter.Tag = tags[0]
|
|
}
|
|
// both prefix and tag are present
|
|
if len(andVal.Tags) > 1 || prefix != "" {
|
|
filter.And = andVal
|
|
filter.And.Prefix = prefix
|
|
filter.Prefix = ""
|
|
filter.Tag = Tag{}
|
|
}
|
|
newRule.Filter = filter
|
|
}
|
|
|
|
// toggle rule status for edit option
|
|
if opts.RuleStatus != "" {
|
|
switch opts.RuleStatus {
|
|
case "enable":
|
|
newRule.Status = Enabled
|
|
case "disable":
|
|
newRule.Status = Disabled
|
|
default:
|
|
return fmt.Errorf("rule state should be either [enable|disable]")
|
|
}
|
|
}
|
|
// set DeleteMarkerReplication rule status for edit option
|
|
if opts.ReplicateDeleteMarkers != "" {
|
|
switch opts.ReplicateDeleteMarkers {
|
|
case "enable":
|
|
newRule.DeleteMarkerReplication.Status = Enabled
|
|
case "disable":
|
|
newRule.DeleteMarkerReplication.Status = Disabled
|
|
default:
|
|
return fmt.Errorf("ReplicateDeleteMarkers state should be either [enable|disable]")
|
|
}
|
|
}
|
|
|
|
// set DeleteReplication rule status for edit option. This is a MinIO specific
|
|
// option to replicate versioned deletes
|
|
if opts.ReplicateDeletes != "" {
|
|
switch opts.ReplicateDeletes {
|
|
case "enable":
|
|
newRule.DeleteReplication.Status = Enabled
|
|
case "disable":
|
|
newRule.DeleteReplication.Status = Disabled
|
|
default:
|
|
return fmt.Errorf("ReplicateDeletes state should be either [enable|disable]")
|
|
}
|
|
}
|
|
|
|
if opts.ReplicaSync != "" {
|
|
switch opts.ReplicaSync {
|
|
case "enable", "":
|
|
newRule.SourceSelectionCriteria.ReplicaModifications.Status = Enabled
|
|
case "disable":
|
|
newRule.SourceSelectionCriteria.ReplicaModifications.Status = Disabled
|
|
default:
|
|
return fmt.Errorf("replica metadata sync should be either [enable|disable]")
|
|
}
|
|
}
|
|
|
|
if opts.ExistingObjectReplicate != "" {
|
|
switch opts.ExistingObjectReplicate {
|
|
case "enable":
|
|
newRule.ExistingObjectReplication.Status = Enabled
|
|
case "disable":
|
|
newRule.ExistingObjectReplication.Status = Disabled
|
|
default:
|
|
return fmt.Errorf("existingObjectsReplication state should be either [enable|disable]")
|
|
}
|
|
}
|
|
if opts.IsSCSet {
|
|
newRule.Destination.StorageClass = opts.StorageClass
|
|
}
|
|
if opts.Priority != "" {
|
|
priority, err := strconv.Atoi(opts.Priority)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
newRule.Priority = priority
|
|
}
|
|
if opts.DestBucket != "" {
|
|
destBucket := opts.DestBucket
|
|
// ref https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-arn-format.html
|
|
if btokens := strings.Split(opts.DestBucket, ":"); len(btokens) != 6 {
|
|
return fmt.Errorf("destination bucket needs to be in Arn format")
|
|
}
|
|
newRule.Destination.Bucket = destBucket
|
|
}
|
|
// validate rule
|
|
if err := newRule.Validate(); err != nil {
|
|
return err
|
|
}
|
|
// ensure priority and destination bucket restrictions are not violated
|
|
for idx, rule := range c.Rules {
|
|
if rule.Priority == newRule.Priority && rIdx != idx {
|
|
return fmt.Errorf("priority must be unique. Replication configuration already has a rule with this priority")
|
|
}
|
|
if rule.Destination.Bucket != newRule.Destination.Bucket && rule.ID == newRule.ID {
|
|
if c.Role == newRule.Destination.Bucket {
|
|
continue
|
|
}
|
|
return fmt.Errorf("invalid destination bucket for this rule")
|
|
}
|
|
}
|
|
|
|
c.Rules[rIdx] = newRule
|
|
return nil
|
|
}
|
|
|
|
// RemoveRule removes a rule from replication config.
|
|
func (c *Config) RemoveRule(opts Options) error {
|
|
var newRules []Rule
|
|
ruleFound := false
|
|
for _, rule := range c.Rules {
|
|
if rule.ID != opts.ID {
|
|
newRules = append(newRules, rule)
|
|
continue
|
|
}
|
|
ruleFound = true
|
|
}
|
|
if !ruleFound {
|
|
return fmt.Errorf("Rule with ID %s not found", opts.ID)
|
|
}
|
|
if len(newRules) == 0 {
|
|
return fmt.Errorf("replication configuration should have at least one rule")
|
|
}
|
|
c.Rules = newRules
|
|
return nil
|
|
}
|
|
|
|
// Rule - a rule for replication configuration.
|
|
type Rule struct {
|
|
XMLName xml.Name `xml:"Rule" json:"-"`
|
|
ID string `xml:"ID,omitempty"`
|
|
Status Status `xml:"Status"`
|
|
Priority int `xml:"Priority"`
|
|
DeleteMarkerReplication DeleteMarkerReplication `xml:"DeleteMarkerReplication"`
|
|
DeleteReplication DeleteReplication `xml:"DeleteReplication"`
|
|
Destination Destination `xml:"Destination"`
|
|
Filter Filter `xml:"Filter" json:"Filter"`
|
|
SourceSelectionCriteria SourceSelectionCriteria `xml:"SourceSelectionCriteria" json:"SourceSelectionCriteria"`
|
|
ExistingObjectReplication ExistingObjectReplication `xml:"ExistingObjectReplication,omitempty" json:"ExistingObjectReplication,omitempty"`
|
|
}
|
|
|
|
// Validate validates the rule for correctness
|
|
func (r Rule) Validate() error {
|
|
if err := r.validateID(); err != nil {
|
|
return err
|
|
}
|
|
if err := r.validateStatus(); err != nil {
|
|
return err
|
|
}
|
|
if err := r.validateFilter(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if r.Priority < 0 && r.Status == Enabled {
|
|
return fmt.Errorf("priority must be set for the rule")
|
|
}
|
|
|
|
if err := r.validateStatus(); err != nil {
|
|
return err
|
|
}
|
|
return r.ExistingObjectReplication.Validate()
|
|
}
|
|
|
|
// validateID - checks if ID is valid or not.
|
|
func (r Rule) validateID() error {
|
|
// cannot be longer than 255 characters
|
|
if len(r.ID) > 255 {
|
|
return fmt.Errorf("ID must be less than 255 characters")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// validateStatus - checks if status is valid or not.
|
|
func (r Rule) validateStatus() error {
|
|
// Status can't be empty
|
|
if len(r.Status) == 0 {
|
|
return fmt.Errorf("status cannot be empty")
|
|
}
|
|
|
|
// Status must be one of Enabled or Disabled
|
|
if r.Status != Enabled && r.Status != Disabled {
|
|
return fmt.Errorf("status must be set to either Enabled or Disabled")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r Rule) validateFilter() error {
|
|
return r.Filter.Validate()
|
|
}
|
|
|
|
// Prefix - a rule can either have prefix under <filter></filter> or under
|
|
// <filter><and></and></filter>. This method returns the prefix from the
|
|
// location where it is available
|
|
func (r Rule) Prefix() string {
|
|
if r.Filter.Prefix != "" {
|
|
return r.Filter.Prefix
|
|
}
|
|
return r.Filter.And.Prefix
|
|
}
|
|
|
|
// Tags - a rule can either have tag under <filter></filter> or under
|
|
// <filter><and></and></filter>. This method returns all the tags from the
|
|
// rule in the format tag1=value1&tag2=value2
|
|
func (r Rule) Tags() string {
|
|
ts := []Tag{r.Filter.Tag}
|
|
if len(r.Filter.And.Tags) != 0 {
|
|
ts = r.Filter.And.Tags
|
|
}
|
|
|
|
var buf bytes.Buffer
|
|
for _, t := range ts {
|
|
if buf.Len() > 0 {
|
|
buf.WriteString("&")
|
|
}
|
|
buf.WriteString(t.String())
|
|
}
|
|
return buf.String()
|
|
}
|
|
|
|
// Filter - a filter for a replication configuration Rule.
|
|
type Filter struct {
|
|
XMLName xml.Name `xml:"Filter" json:"-"`
|
|
Prefix string `json:"Prefix,omitempty"`
|
|
And And `xml:"And,omitempty" json:"And,omitempty"`
|
|
Tag Tag `xml:"Tag,omitempty" json:"Tag,omitempty"`
|
|
}
|
|
|
|
// Validate - validates the filter element
|
|
func (f Filter) Validate() error {
|
|
// A Filter must have exactly one of Prefix, Tag, or And specified.
|
|
if !f.And.isEmpty() {
|
|
if f.Prefix != "" {
|
|
return errInvalidFilter
|
|
}
|
|
if !f.Tag.IsEmpty() {
|
|
return errInvalidFilter
|
|
}
|
|
}
|
|
if f.Prefix != "" {
|
|
if !f.Tag.IsEmpty() {
|
|
return errInvalidFilter
|
|
}
|
|
}
|
|
if !f.Tag.IsEmpty() {
|
|
if err := f.Tag.Validate(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Tag - a tag for a replication configuration Rule filter.
|
|
type Tag struct {
|
|
XMLName xml.Name `json:"-"`
|
|
Key string `xml:"Key,omitempty" json:"Key,omitempty"`
|
|
Value string `xml:"Value,omitempty" json:"Value,omitempty"`
|
|
}
|
|
|
|
func (tag Tag) String() string {
|
|
if tag.IsEmpty() {
|
|
return ""
|
|
}
|
|
return tag.Key + "=" + tag.Value
|
|
}
|
|
|
|
// IsEmpty returns whether this tag is empty or not.
|
|
func (tag Tag) IsEmpty() bool {
|
|
return tag.Key == ""
|
|
}
|
|
|
|
// Validate checks this tag.
|
|
func (tag Tag) Validate() error {
|
|
if len(tag.Key) == 0 || utf8.RuneCountInString(tag.Key) > 128 {
|
|
return fmt.Errorf("invalid Tag Key")
|
|
}
|
|
|
|
if utf8.RuneCountInString(tag.Value) > 256 {
|
|
return fmt.Errorf("invalid Tag Value")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Destination - destination in ReplicationConfiguration.
|
|
type Destination struct {
|
|
XMLName xml.Name `xml:"Destination" json:"-"`
|
|
Bucket string `xml:"Bucket" json:"Bucket"`
|
|
StorageClass string `xml:"StorageClass,omitempty" json:"StorageClass,omitempty"`
|
|
}
|
|
|
|
// And - a tag to combine a prefix and multiple tags for replication configuration rule.
|
|
type And struct {
|
|
XMLName xml.Name `xml:"And,omitempty" json:"-"`
|
|
Prefix string `xml:"Prefix,omitempty" json:"Prefix,omitempty"`
|
|
Tags []Tag `xml:"Tag,omitempty" json:"Tag,omitempty"`
|
|
}
|
|
|
|
// isEmpty returns true if Tags field is null
|
|
func (a And) isEmpty() bool {
|
|
return len(a.Tags) == 0 && a.Prefix == ""
|
|
}
|
|
|
|
// Status represents Enabled/Disabled status
|
|
type Status string
|
|
|
|
// Supported status types
|
|
const (
|
|
Enabled Status = "Enabled"
|
|
Disabled Status = "Disabled"
|
|
)
|
|
|
|
// DeleteMarkerReplication - whether delete markers are replicated - https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html
|
|
type DeleteMarkerReplication struct {
|
|
Status Status `xml:"Status" json:"Status"` // should be set to "Disabled" by default
|
|
}
|
|
|
|
// IsEmpty returns true if DeleteMarkerReplication is not set
|
|
func (d DeleteMarkerReplication) IsEmpty() bool {
|
|
return len(d.Status) == 0
|
|
}
|
|
|
|
// DeleteReplication - whether versioned deletes are replicated - this
|
|
// is a MinIO specific extension
|
|
type DeleteReplication struct {
|
|
Status Status `xml:"Status" json:"Status"` // should be set to "Disabled" by default
|
|
}
|
|
|
|
// IsEmpty returns true if DeleteReplication is not set
|
|
func (d DeleteReplication) IsEmpty() bool {
|
|
return len(d.Status) == 0
|
|
}
|
|
|
|
// ReplicaModifications specifies if replica modification sync is enabled
|
|
type ReplicaModifications struct {
|
|
Status Status `xml:"Status" json:"Status"` // should be set to "Enabled" by default
|
|
}
|
|
|
|
// SourceSelectionCriteria - specifies additional source selection criteria in ReplicationConfiguration.
|
|
type SourceSelectionCriteria struct {
|
|
ReplicaModifications ReplicaModifications `xml:"ReplicaModifications" json:"ReplicaModifications"`
|
|
}
|
|
|
|
// IsValid - checks whether SourceSelectionCriteria is valid or not.
|
|
func (s SourceSelectionCriteria) IsValid() bool {
|
|
return s.ReplicaModifications.Status == Enabled || s.ReplicaModifications.Status == Disabled
|
|
}
|
|
|
|
// Validate source selection criteria
|
|
func (s SourceSelectionCriteria) Validate() error {
|
|
if (s == SourceSelectionCriteria{}) {
|
|
return nil
|
|
}
|
|
if !s.IsValid() {
|
|
return fmt.Errorf("invalid ReplicaModification status")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ExistingObjectReplication - whether existing object replication is enabled
|
|
type ExistingObjectReplication struct {
|
|
Status Status `xml:"Status"` // should be set to "Disabled" by default
|
|
}
|
|
|
|
// IsEmpty returns true if DeleteMarkerReplication is not set
|
|
func (e ExistingObjectReplication) IsEmpty() bool {
|
|
return len(e.Status) == 0
|
|
}
|
|
|
|
// Validate validates whether the status is disabled.
|
|
func (e ExistingObjectReplication) Validate() error {
|
|
if e.IsEmpty() {
|
|
return nil
|
|
}
|
|
if e.Status != Disabled && e.Status != Enabled {
|
|
return fmt.Errorf("invalid ExistingObjectReplication status")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TargetMetrics represents inline replication metrics
|
|
// such as pending, failed and completed bytes in total for a bucket remote target
|
|
type TargetMetrics struct {
|
|
// Completed count
|
|
ReplicatedCount uint64 `json:"replicationCount,omitempty"`
|
|
// Completed size in bytes
|
|
ReplicatedSize uint64 `json:"completedReplicationSize,omitempty"`
|
|
// Bandwidth limit in bytes/sec for this target
|
|
BandWidthLimitInBytesPerSecond int64 `json:"limitInBits,omitempty"`
|
|
// Current bandwidth used in bytes/sec for this target
|
|
CurrentBandwidthInBytesPerSecond float64 `json:"currentBandwidth,omitempty"`
|
|
// errors seen in replication in last minute, hour and total
|
|
Failed TimedErrStats `json:"failed,omitempty"`
|
|
// Deprecated fields
|
|
// Pending size in bytes
|
|
PendingSize uint64 `json:"pendingReplicationSize,omitempty"`
|
|
// Total Replica size in bytes
|
|
ReplicaSize uint64 `json:"replicaSize,omitempty"`
|
|
// Failed size in bytes
|
|
FailedSize uint64 `json:"failedReplicationSize,omitempty"`
|
|
// Total number of pending operations including metadata updates
|
|
PendingCount uint64 `json:"pendingReplicationCount,omitempty"`
|
|
// Total number of failed operations including metadata updates
|
|
FailedCount uint64 `json:"failedReplicationCount,omitempty"`
|
|
}
|
|
|
|
// Metrics represents inline replication metrics for a bucket.
|
|
type Metrics struct {
|
|
Stats map[string]TargetMetrics
|
|
// Completed size in bytes across targets
|
|
ReplicatedSize uint64 `json:"completedReplicationSize,omitempty"`
|
|
// Total Replica size in bytes across targets
|
|
ReplicaSize uint64 `json:"replicaSize,omitempty"`
|
|
// Total Replica counts
|
|
ReplicaCount int64 `json:"replicaCount,omitempty"`
|
|
// Total Replicated count
|
|
ReplicatedCount int64 `json:"replicationCount,omitempty"`
|
|
// errors seen in replication in last minute, hour and total
|
|
Errors TimedErrStats `json:"failed,omitempty"`
|
|
// Total number of entries that are queued for replication
|
|
QStats InQueueMetric `json:"queued"`
|
|
// Deprecated fields
|
|
// Total Pending size in bytes across targets
|
|
PendingSize uint64 `json:"pendingReplicationSize,omitempty"`
|
|
// Failed size in bytes across targets
|
|
FailedSize uint64 `json:"failedReplicationSize,omitempty"`
|
|
// Total number of pending operations including metadata updates across targets
|
|
PendingCount uint64 `json:"pendingReplicationCount,omitempty"`
|
|
// Total number of failed operations including metadata updates across targets
|
|
FailedCount uint64 `json:"failedReplicationCount,omitempty"`
|
|
}
|
|
|
|
// RStat - has count and bytes for replication metrics
|
|
type RStat struct {
|
|
Count float64 `json:"count"`
|
|
Bytes int64 `json:"bytes"`
|
|
}
|
|
|
|
// Add two RStat
|
|
func (r RStat) Add(r1 RStat) RStat {
|
|
return RStat{
|
|
Count: r.Count + r1.Count,
|
|
Bytes: r.Bytes + r1.Bytes,
|
|
}
|
|
}
|
|
|
|
// TimedErrStats holds error stats for a time period
|
|
type TimedErrStats struct {
|
|
LastMinute RStat `json:"lastMinute"`
|
|
LastHour RStat `json:"lastHour"`
|
|
Totals RStat `json:"totals"`
|
|
}
|
|
|
|
// Add two TimedErrStats
|
|
func (te TimedErrStats) Add(o TimedErrStats) TimedErrStats {
|
|
return TimedErrStats{
|
|
LastMinute: te.LastMinute.Add(o.LastMinute),
|
|
LastHour: te.LastHour.Add(o.LastHour),
|
|
Totals: te.Totals.Add(o.Totals),
|
|
}
|
|
}
|
|
|
|
// ResyncTargetsInfo provides replication target information to resync replicated data.
|
|
type ResyncTargetsInfo struct {
|
|
Targets []ResyncTarget `json:"target,omitempty"`
|
|
}
|
|
|
|
// ResyncTarget provides the replica resources and resetID to initiate resync replication.
|
|
type ResyncTarget struct {
|
|
Arn string `json:"arn"`
|
|
ResetID string `json:"resetid"`
|
|
StartTime time.Time `json:"startTime,omitempty"`
|
|
EndTime time.Time `json:"endTime,omitempty"`
|
|
// Status of resync operation
|
|
ResyncStatus string `json:"resyncStatus,omitempty"`
|
|
// Completed size in bytes
|
|
ReplicatedSize int64 `json:"completedReplicationSize,omitempty"`
|
|
// Failed size in bytes
|
|
FailedSize int64 `json:"failedReplicationSize,omitempty"`
|
|
// Total number of failed operations
|
|
FailedCount int64 `json:"failedReplicationCount,omitempty"`
|
|
// Total number of completed operations
|
|
ReplicatedCount int64 `json:"replicationCount,omitempty"`
|
|
// Last bucket/object replicated.
|
|
Bucket string `json:"bucket,omitempty"`
|
|
Object string `json:"object,omitempty"`
|
|
}
|
|
|
|
// XferStats holds transfer rate info for uploads/sec
|
|
type XferStats struct {
|
|
AvgRate float64 `json:"avgRate"`
|
|
PeakRate float64 `json:"peakRate"`
|
|
CurrRate float64 `json:"currRate"`
|
|
}
|
|
|
|
// Merge two XferStats
|
|
func (x *XferStats) Merge(x1 XferStats) {
|
|
x.AvgRate += x1.AvgRate
|
|
x.PeakRate += x1.PeakRate
|
|
x.CurrRate += x1.CurrRate
|
|
}
|
|
|
|
// QStat holds count and bytes for objects in replication queue
|
|
type QStat struct {
|
|
Count float64 `json:"count"`
|
|
Bytes float64 `json:"bytes"`
|
|
}
|
|
|
|
// Add 2 QStat entries
|
|
func (q *QStat) Add(q1 QStat) {
|
|
q.Count += q1.Count
|
|
q.Bytes += q1.Bytes
|
|
}
|
|
|
|
// InQueueMetric holds stats for objects in replication queue
|
|
type InQueueMetric struct {
|
|
Curr QStat `json:"curr" msg:"cq"`
|
|
Avg QStat `json:"avg" msg:"aq"`
|
|
Max QStat `json:"peak" msg:"pq"`
|
|
}
|
|
|
|
// MetricName name of replication metric
|
|
type MetricName string
|
|
|
|
const (
|
|
// Large is a metric name for large objects >=128MiB
|
|
Large MetricName = "Large"
|
|
// Small is a metric name for objects <128MiB size
|
|
Small MetricName = "Small"
|
|
// Total is a metric name for total objects
|
|
Total MetricName = "Total"
|
|
)
|
|
|
|
// WorkerStat has stats on number of replication workers
|
|
type WorkerStat struct {
|
|
Curr int32 `json:"curr"`
|
|
Avg float32 `json:"avg"`
|
|
Max int32 `json:"max"`
|
|
}
|
|
|
|
// ReplMRFStats holds stats of MRF backlog saved to disk in the last 5 minutes
|
|
// and number of entries that failed replication after 3 retries
|
|
type ReplMRFStats struct {
|
|
LastFailedCount uint64 `json:"failedCount_last5min"`
|
|
// Count of unreplicated entries that were dropped after MRF retry limit reached since cluster start.
|
|
TotalDroppedCount uint64 `json:"droppedCount_since_uptime"`
|
|
// Bytes of unreplicated entries that were dropped after MRF retry limit reached since cluster start.
|
|
TotalDroppedBytes uint64 `json:"droppedBytes_since_uptime"`
|
|
}
|
|
|
|
// ReplQNodeStats holds stats for a node in replication queue
|
|
type ReplQNodeStats struct {
|
|
NodeName string `json:"nodeName"`
|
|
Uptime int64 `json:"uptime"`
|
|
Workers WorkerStat `json:"activeWorkers"`
|
|
|
|
XferStats map[MetricName]XferStats `json:"transferSummary"`
|
|
TgtXferStats map[string]map[MetricName]XferStats `json:"tgtTransferStats"`
|
|
|
|
QStats InQueueMetric `json:"queueStats"`
|
|
MRFStats ReplMRFStats `json:"mrfStats"`
|
|
}
|
|
|
|
// ReplQueueStats holds stats for replication queue across nodes
|
|
type ReplQueueStats struct {
|
|
Nodes []ReplQNodeStats `json:"nodes"`
|
|
}
|
|
|
|
// Workers returns number of workers across all nodes
|
|
func (q ReplQueueStats) Workers() (tot WorkerStat) {
|
|
for _, node := range q.Nodes {
|
|
tot.Avg += node.Workers.Avg
|
|
tot.Curr += node.Workers.Curr
|
|
if tot.Max < node.Workers.Max {
|
|
tot.Max = node.Workers.Max
|
|
}
|
|
}
|
|
if len(q.Nodes) > 0 {
|
|
tot.Avg /= float32(len(q.Nodes))
|
|
tot.Curr /= int32(len(q.Nodes))
|
|
}
|
|
return tot
|
|
}
|
|
|
|
// qStatSummary returns cluster level stats for objects in replication queue
|
|
func (q ReplQueueStats) qStatSummary() InQueueMetric {
|
|
m := InQueueMetric{}
|
|
for _, v := range q.Nodes {
|
|
m.Avg.Add(v.QStats.Avg)
|
|
m.Curr.Add(v.QStats.Curr)
|
|
if m.Max.Count < v.QStats.Max.Count {
|
|
m.Max.Add(v.QStats.Max)
|
|
}
|
|
}
|
|
return m
|
|
}
|
|
|
|
// ReplQStats holds stats for objects in replication queue
|
|
type ReplQStats struct {
|
|
Uptime int64 `json:"uptime"`
|
|
Workers WorkerStat `json:"workers"`
|
|
|
|
XferStats map[MetricName]XferStats `json:"xferStats"`
|
|
TgtXferStats map[string]map[MetricName]XferStats `json:"tgtXferStats"`
|
|
|
|
QStats InQueueMetric `json:"qStats"`
|
|
MRFStats ReplMRFStats `json:"mrfStats"`
|
|
}
|
|
|
|
// QStats returns cluster level stats for objects in replication queue
|
|
func (q ReplQueueStats) QStats() (r ReplQStats) {
|
|
r.QStats = q.qStatSummary()
|
|
r.XferStats = make(map[MetricName]XferStats)
|
|
r.TgtXferStats = make(map[string]map[MetricName]XferStats)
|
|
r.Workers = q.Workers()
|
|
|
|
for _, node := range q.Nodes {
|
|
for arn := range node.TgtXferStats {
|
|
xmap, ok := node.TgtXferStats[arn]
|
|
if !ok {
|
|
xmap = make(map[MetricName]XferStats)
|
|
}
|
|
for m, v := range xmap {
|
|
st, ok := r.XferStats[m]
|
|
if !ok {
|
|
st = XferStats{}
|
|
}
|
|
st.AvgRate += v.AvgRate
|
|
st.CurrRate += v.CurrRate
|
|
st.PeakRate = math.Max(st.PeakRate, v.PeakRate)
|
|
if _, ok := r.TgtXferStats[arn]; !ok {
|
|
r.TgtXferStats[arn] = make(map[MetricName]XferStats)
|
|
}
|
|
r.TgtXferStats[arn][m] = st
|
|
}
|
|
}
|
|
for k, v := range node.XferStats {
|
|
st, ok := r.XferStats[k]
|
|
if !ok {
|
|
st = XferStats{}
|
|
}
|
|
st.AvgRate += v.AvgRate
|
|
st.CurrRate += v.CurrRate
|
|
st.PeakRate = math.Max(st.PeakRate, v.PeakRate)
|
|
r.XferStats[k] = st
|
|
}
|
|
r.MRFStats.LastFailedCount += node.MRFStats.LastFailedCount
|
|
r.MRFStats.TotalDroppedCount += node.MRFStats.TotalDroppedCount
|
|
r.MRFStats.TotalDroppedBytes += node.MRFStats.TotalDroppedBytes
|
|
r.Uptime += node.Uptime
|
|
}
|
|
if len(q.Nodes) > 0 {
|
|
r.Uptime /= int64(len(q.Nodes)) // average uptime
|
|
}
|
|
return
|
|
}
|
|
|
|
// MetricsV2 represents replication metrics for a bucket.
|
|
type MetricsV2 struct {
|
|
Uptime int64 `json:"uptime"`
|
|
CurrentStats Metrics `json:"currStats"`
|
|
QueueStats ReplQueueStats `json:"queueStats"`
|
|
}
|