ecs.go (traefik-v2.3.2.src) | : | ecs.go (traefik-v2.3.3.src) | ||
---|---|---|---|---|
skipping to change at line 154 | skipping to change at line 154 | |||
// Provide configuration to traefik from ECS. | // Provide configuration to traefik from ECS. | |||
func (p Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.P ool) error { | func (p Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.P ool) error { | |||
pool.GoCtx(func(routineCtx context.Context) { | pool.GoCtx(func(routineCtx context.Context) { | |||
ctxLog := log.With(routineCtx, log.Str(log.ProviderName, "ecs")) | ctxLog := log.With(routineCtx, log.Str(log.ProviderName, "ecs")) | |||
logger := log.FromContext(ctxLog) | logger := log.FromContext(ctxLog) | |||
operation := func() error { | operation := func() error { | |||
awsClient, err := p.createClient(logger) | awsClient, err := p.createClient(logger) | |||
if err != nil { | if err != nil { | |||
return err | return fmt.Errorf("unable to create AWS client: % w", err) | |||
} | } | |||
configuration, err := p.loadECSConfig(ctxLog, awsClient) | err = p.loadConfiguration(ctxLog, awsClient, configuratio nChan) | |||
if err != nil { | if err != nil { | |||
return err | return fmt.Errorf("failed to get ECS configuratio n: %w", err) | |||
} | } | |||
configurationChan <- dynamic.Message{ | ticker := time.NewTicker(time.Second * time.Duration(p.Re | |||
ProviderName: "ecs", | freshSeconds)) | |||
Configuration: configuration, | defer ticker.Stop() | |||
} | ||||
reload := time.NewTicker(time.Second * time.Duration(p.Re | ||||
freshSeconds)) | ||||
defer reload.Stop() | ||||
for { | for { | |||
select { | select { | |||
case <-reload.C: | case <-ticker.C: | |||
configuration, err := p.loadECSConfig(ctx | err = p.loadConfiguration(ctxLog, awsClie | |||
Log, awsClient) | nt, configurationChan) | |||
if err != nil { | if err != nil { | |||
logger.Errorf("Failed to load ECS | return fmt.Errorf("failed to refr | |||
configuration, error %s", err) | esh ECS configuration: %w", err) | |||
return err | ||||
} | } | |||
configurationChan <- dynamic.Message{ | ||||
ProviderName: "ecs", | ||||
Configuration: configuration, | ||||
} | ||||
case <-routineCtx.Done(): | case <-routineCtx.Done(): | |||
return nil | return nil | |||
} | } | |||
} | } | |||
} | } | |||
notify := func(err error, time time.Duration) { | notify := func(err error, time time.Duration) { | |||
logger.Errorf("Provider connection error %+v, retrying in %s", err, time) | logger.Errorf("Provider connection error %+v, retrying in %s", err, time) | |||
} | } | |||
err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), routineCtx) , notify) | err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), routineCtx) , notify) | |||
if err != nil { | if err != nil { | |||
logger.Errorf("Cannot connect to Provider api %+v", err) | logger.Errorf("Cannot connect to Provider api %+v", err) | |||
} | } | |||
}) | }) | |||
return nil | return nil | |||
} | } | |||
func (p *Provider) loadConfiguration(ctx context.Context, client *awsClient, con | ||||
figurationChan chan<- dynamic.Message) error { | ||||
instances, err := p.listInstances(ctx, client) | ||||
if err != nil { | ||||
return err | ||||
} | ||||
configurationChan <- dynamic.Message{ | ||||
ProviderName: "ecs", | ||||
Configuration: p.buildConfiguration(ctx, instances), | ||||
} | ||||
return nil | ||||
} | ||||
// Find all running Provider tasks in a cluster, also collect the task definitio ns (for docker labels) | // Find all running Provider tasks in a cluster, also collect the task definitio ns (for docker labels) | |||
// and the EC2 instance data. | // and the EC2 instance data. | |||
func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsI nstance, error) { | func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsI nstance, error) { | |||
logger := log.FromContext(ctx) | logger := log.FromContext(ctx) | |||
var clustersArn []*string | var clustersArn []*string | |||
var clusters []string | var clusters []string | |||
if p.AutoDiscoverClusters { | if p.AutoDiscoverClusters { | |||
input := &ecs.ListClustersInput{} | input := &ecs.ListClustersInput{} | |||
skipping to change at line 368 | skipping to change at line 372 | |||
instance.ExtraConf = extraConf | instance.ExtraConf = extraConf | |||
instances = append(instances, instance) | instances = append(instances, instance) | |||
} | } | |||
} | } | |||
} | } | |||
return instances, nil | return instances, nil | |||
} | } | |||
func (p *Provider) loadECSConfig(ctx context.Context, client *awsClient) (*dynam | ||||
ic.Configuration, error) { | ||||
instances, err := p.listInstances(ctx, client) | ||||
if err != nil { | ||||
return nil, err | ||||
} | ||||
return p.buildConfiguration(ctx, instances), nil | ||||
} | ||||
func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, cl usterName *string, ecsDatas map[string]*ecs.Task) (map[string]*ec2.Instance, err or) { | func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, cl usterName *string, ecsDatas map[string]*ecs.Task) (map[string]*ec2.Instance, err or) { | |||
logger := log.FromContext(ctx) | logger := log.FromContext(ctx) | |||
instanceIds := make(map[string]string) | instanceIds := make(map[string]string) | |||
ec2Instances := make(map[string]*ec2.Instance) | ec2Instances := make(map[string]*ec2.Instance) | |||
var containerInstancesArns []*string | var containerInstancesArns []*string | |||
var instanceArns []*string | var instanceArns []*string | |||
for _, task := range ecsDatas { | for _, task := range ecsDatas { | |||
if task.ContainerInstanceArn != nil { | if task.ContainerInstanceArn != nil { | |||
End of changes. 9 change blocks. | ||||
31 lines changed or deleted | 26 lines changed or added |