consul_catalog.go (traefik-v2.3.2.src) | : | consul_catalog.go (traefik-v2.3.3.src) | ||
---|---|---|---|---|
skipping to change at line 58 | skipping to change at line 58 | |||
Cache bool `description:"Use local agent caching f or catalog reads." json:"cache,omitempty" toml:"cache,omitempty" yaml:"cache,omi tempty" export:"true"` | Cache bool `description:"Use local agent caching f or catalog reads." json:"cache,omitempty" toml:"cache,omitempty" yaml:"cache,omi tempty" export:"true"` | |||
ExposedByDefault bool `description:"Expose containers by defa ult." json:"exposedByDefault,omitempty" toml:"exposedByDefault,omitempty" yaml:" exposedByDefault,omitempty" export:"true"` | ExposedByDefault bool `description:"Expose containers by defa ult." json:"exposedByDefault,omitempty" toml:"exposedByDefault,omitempty" yaml:" exposedByDefault,omitempty" export:"true"` | |||
DefaultRule string `description:"Default rule." json:"defa ultRule,omitempty" toml:"defaultRule,omitempty" yaml:"defaultRule,omitempty"` | DefaultRule string `description:"Default rule." json:"defa ultRule,omitempty" toml:"defaultRule,omitempty" yaml:"defaultRule,omitempty"` | |||
client *api.Client | client *api.Client | |||
defaultRuleTpl *template.Template | defaultRuleTpl *template.Template | |||
} | } | |||
// EndpointConfig holds configurations of the endpoint. | // EndpointConfig holds configurations of the endpoint. | |||
type EndpointConfig struct { | type EndpointConfig struct { | |||
Address string `description:"The address of the | Address string `description:"The address of the | |||
Consul server" json:"address,omitempty" toml:"address,omitempty" yaml:"address, | Consul server" json:"address,omitempty" toml:"address,omitempty" yaml:"address, | |||
omitempty" export:"true"` | omitempty"` | |||
Scheme string `description:"The URI scheme for | Scheme string `description:"The URI scheme for | |||
the Consul server" json:"scheme,omitempty" toml:"scheme,omitempty" yaml:"scheme | the Consul server" json:"scheme,omitempty" toml:"scheme,omitempty" yaml:"scheme | |||
,omitempty" export:"true"` | ,omitempty"` | |||
DataCenter string `description:"Data center to use | DataCenter string `description:"Data center to use | |||
. If not provided, the default agent data center is used" json:"datacenter,omite | . If not provided, the default agent data center is used" json:"datacenter,omite | |||
mpty" toml:"datacenter,omitempty" yaml:"datacenter,omitempty" export:"true"` | mpty" toml:"datacenter,omitempty" yaml:"datacenter,omitempty"` | |||
Token string `description:"Token is used to p | Token string `description:"Token is used to p | |||
rovide a per-request ACL token which overrides the agent's default token" json:" | rovide a per-request ACL token which overrides the agent's default token" json:" | |||
token,omitempty" toml:"token,omitempty" yaml:"token,omitempty" export:"true"` | token,omitempty" toml:"token,omitempty" yaml:"token,omitempty"` | |||
TLS *types.ClientTLS `description:"Enable TLS support ." json:"tls,omitempty" toml:"tls,omitempty" yaml:"tls,omitempty" export:"true"` | TLS *types.ClientTLS `description:"Enable TLS support ." json:"tls,omitempty" toml:"tls,omitempty" yaml:"tls,omitempty" export:"true"` | |||
HTTPAuth *EndpointHTTPAuthConfig `description:"Auth info to use f or http access" json:"httpAuth,omitempty" toml:"httpAuth,omitempty" yaml:"httpAu th,omitempty" export:"true"` | HTTPAuth *EndpointHTTPAuthConfig `description:"Auth info to use f or http access" json:"httpAuth,omitempty" toml:"httpAuth,omitempty" yaml:"httpAu th,omitempty" export:"true"` | |||
EndpointWaitTime ptypes.Duration `description:"WaitTime limits ho w long a Watch will block. If not provided, the agent default values will be use d" json:"endpointWaitTime,omitempty" toml:"endpointWaitTime,omitempty" yaml:"end pointWaitTime,omitempty" export:"true"` | EndpointWaitTime ptypes.Duration `description:"WaitTime limits ho w long a Watch will block. If not provided, the agent default values will be use d" json:"endpointWaitTime,omitempty" toml:"endpointWaitTime,omitempty" yaml:"end pointWaitTime,omitempty" export:"true"` | |||
} | } | |||
// SetDefaults sets the default values. | // SetDefaults sets the default values. | |||
func (c *EndpointConfig) SetDefaults() { | func (c *EndpointConfig) SetDefaults() { | |||
c.Address = "127.0.0.1:8500" | c.Address = "127.0.0.1:8500" | |||
} | } | |||
// EndpointHTTPAuthConfig holds configurations of the authentication. | // EndpointHTTPAuthConfig holds configurations of the authentication. | |||
type EndpointHTTPAuthConfig struct { | type EndpointHTTPAuthConfig struct { | |||
Username string `description:"Basic Auth username" json:"username,omitemp | Username string `description:"Basic Auth username" json:"username,omitemp | |||
ty" toml:"username,omitempty" yaml:"username,omitempty" export:"true"` | ty" toml:"username,omitempty" yaml:"username,omitempty"` | |||
Password string `description:"Basic Auth password" json:"password,omitemp | Password string `description:"Basic Auth password" json:"password,omitemp | |||
ty" toml:"password,omitempty" yaml:"password,omitempty" export:"true"` | ty" toml:"password,omitempty" yaml:"password,omitempty"` | |||
} | } | |||
// SetDefaults sets the default values. | // SetDefaults sets the default values. | |||
func (p *Provider) SetDefaults() { | func (p *Provider) SetDefaults() { | |||
endpoint := &EndpointConfig{} | endpoint := &EndpointConfig{} | |||
endpoint.SetDefaults() | endpoint.SetDefaults() | |||
p.Endpoint = endpoint | p.Endpoint = endpoint | |||
p.RefreshInterval = ptypes.Duration(15 * time.Second) | p.RefreshInterval = ptypes.Duration(15 * time.Second) | |||
p.Prefix = "traefik" | p.Prefix = "traefik" | |||
p.ExposedByDefault = true | p.ExposedByDefault = true | |||
skipping to change at line 111 | skipping to change at line 111 | |||
func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe. Pool) error { | func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe. Pool) error { | |||
pool.GoCtx(func(routineCtx context.Context) { | pool.GoCtx(func(routineCtx context.Context) { | |||
ctxLog := log.With(routineCtx, log.Str(log.ProviderName, "consulc atalog")) | ctxLog := log.With(routineCtx, log.Str(log.ProviderName, "consulc atalog")) | |||
logger := log.FromContext(ctxLog) | logger := log.FromContext(ctxLog) | |||
operation := func() error { | operation := func() error { | |||
var err error | var err error | |||
p.client, err = createClient(p.Endpoint) | p.client, err = createClient(p.Endpoint) | |||
if err != nil { | if err != nil { | |||
return fmt.Errorf("error create consul client, %w ", err) | return fmt.Errorf("unable to create consul client : %w", err) | |||
} | } | |||
// get configuration at the provider's startup. | ||||
err = p.loadConfiguration(routineCtx, configurationChan) | ||||
if err != nil { | ||||
return fmt.Errorf("failed to get consul catalog d | ||||
ata: %w", err) | ||||
} | ||||
// Periodic refreshes. | ||||
ticker := time.NewTicker(time.Duration(p.RefreshInterval) ) | ticker := time.NewTicker(time.Duration(p.RefreshInterval) ) | |||
defer ticker.Stop() | ||||
for { | for { | |||
select { | select { | |||
case <-ticker.C: | case <-ticker.C: | |||
data, err := p.getConsulServicesData(rout ineCtx) | err = p.loadConfiguration(routineCtx, con figurationChan) | |||
if err != nil { | if err != nil { | |||
logger.Errorf("error get consul c | return fmt.Errorf("failed to refr | |||
atalog data, %v", err) | esh consul catalog data: %w", err) | |||
return err | ||||
} | } | |||
configuration := p.buildConfiguration(rou | ||||
tineCtx, data) | ||||
configurationChan <- dynamic.Message{ | ||||
ProviderName: "consulcatalog", | ||||
Configuration: configuration, | ||||
} | ||||
case <-routineCtx.Done(): | case <-routineCtx.Done(): | |||
ticker.Stop() | ||||
return nil | return nil | |||
} | } | |||
} | } | |||
} | } | |||
notify := func(err error, time time.Duration) { | notify := func(err error, time time.Duration) { | |||
logger.Errorf("Provider connection error %+v, retrying in %s", err, time) | logger.Errorf("Provider connection error %+v, retrying in %s", err, time) | |||
} | } | |||
err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctxLog), no tify) | err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctxLog), no tify) | |||
if err != nil { | if err != nil { | |||
logger.Errorf("Cannot connect to consul catalog server %+ v", err) | logger.Errorf("Cannot connect to consul catalog server %+ v", err) | |||
} | } | |||
}) | }) | |||
return nil | return nil | |||
} | } | |||
func (p *Provider) loadConfiguration(ctx context.Context, configurationChan chan | ||||
<- dynamic.Message) error { | ||||
data, err := p.getConsulServicesData(ctx) | ||||
if err != nil { | ||||
return err | ||||
} | ||||
configurationChan <- dynamic.Message{ | ||||
ProviderName: "consulcatalog", | ||||
Configuration: p.buildConfiguration(ctx, data), | ||||
} | ||||
return nil | ||||
} | ||||
func (p *Provider) getConsulServicesData(ctx context.Context) ([]itemData, error ) { | func (p *Provider) getConsulServicesData(ctx context.Context) ([]itemData, error ) { | |||
consulServiceNames, err := p.fetchServices(ctx) | consulServiceNames, err := p.fetchServices(ctx) | |||
if err != nil { | if err != nil { | |||
return nil, err | return nil, err | |||
} | } | |||
var data []itemData | var data []itemData | |||
for _, name := range consulServiceNames { | for _, name := range consulServiceNames { | |||
consulServices, healthServices, err := p.fetchService(ctx, name) | consulServices, statuses, err := p.fetchService(ctx, name) | |||
if err != nil { | if err != nil { | |||
return nil, err | return nil, err | |||
} | } | |||
for i, consulService := range consulServices { | for _, consulService := range consulServices { | |||
address := consulService.ServiceAddress | address := consulService.ServiceAddress | |||
if address == "" { | if address == "" { | |||
address = consulService.Address | address = consulService.Address | |||
} | } | |||
status, exists := statuses[consulService.ID+consulService | ||||
.ServiceID] | ||||
if !exists { | ||||
status = api.HealthAny | ||||
} | ||||
item := itemData{ | item := itemData{ | |||
ID: consulService.ServiceID, | ID: consulService.ServiceID, | |||
Node: consulService.Node, | Node: consulService.Node, | |||
Name: consulService.ServiceName, | Name: consulService.ServiceName, | |||
Address: address, | Address: address, | |||
Port: strconv.Itoa(consulService.ServicePort), | Port: strconv.Itoa(consulService.ServicePort), | |||
Labels: tagsToNeutralLabels(consulService.Servic eTags, p.Prefix), | Labels: tagsToNeutralLabels(consulService.Servic eTags, p.Prefix), | |||
Tags: consulService.ServiceTags, | Tags: consulService.ServiceTags, | |||
Status: healthServices[i].Checks.AggregatedStatu s(), | Status: status, | |||
} | } | |||
extraConf, err := p.getConfiguration(item) | extraConf, err := p.getConfiguration(item) | |||
if err != nil { | if err != nil { | |||
log.FromContext(ctx).Errorf("Skip item %s: %v", i tem.Name, err) | log.FromContext(ctx).Errorf("Skip item %s: %v", i tem.Name, err) | |||
continue | continue | |||
} | } | |||
item.ExtraConf = extraConf | item.ExtraConf = extraConf | |||
data = append(data, item) | data = append(data, item) | |||
} | } | |||
} | } | |||
return data, nil | return data, nil | |||
} | } | |||
func (p *Provider) fetchService(ctx context.Context, name string) ([]*api.Catalo gService, []*api.ServiceEntry, error) { | func (p *Provider) fetchService(ctx context.Context, name string) ([]*api.Catalo gService, map[string]string, error) { | |||
var tagFilter string | var tagFilter string | |||
if !p.ExposedByDefault { | if !p.ExposedByDefault { | |||
tagFilter = p.Prefix + ".enable=true" | tagFilter = p.Prefix + ".enable=true" | |||
} | } | |||
opts := &api.QueryOptions{AllowStale: p.Stale, RequireConsistent: p.Requi reConsistent, UseCache: p.Cache} | opts := &api.QueryOptions{AllowStale: p.Stale, RequireConsistent: p.Requi reConsistent, UseCache: p.Cache} | |||
opts = opts.WithContext(ctx) | ||||
consulServices, _, err := p.client.Catalog().Service(name, tagFilter, opt s) | consulServices, _, err := p.client.Catalog().Service(name, tagFilter, opt s) | |||
if err != nil { | if err != nil { | |||
return nil, nil, err | return nil, nil, err | |||
} | } | |||
healthServices, _, err := p.client.Health().Service(name, tagFilter, fals e, opts) | healthServices, _, err := p.client.Health().Service(name, tagFilter, fals e, opts) | |||
return consulServices, healthServices, err | if err != nil { | |||
return nil, nil, err | ||||
} | ||||
// Index status by service and node so it can be retrieved from a Catalog | ||||
Service even if the health and services | ||||
// are not in sync. | ||||
statuses := make(map[string]string) | ||||
for _, health := range healthServices { | ||||
if health.Service == nil || health.Node == nil { | ||||
continue | ||||
} | ||||
statuses[health.Node.ID+health.Service.ID] = health.Checks.Aggreg | ||||
atedStatus() | ||||
} | ||||
return consulServices, statuses, err | ||||
} | } | |||
func (p *Provider) fetchServices(ctx context.Context) ([]string, error) { | func (p *Provider) fetchServices(ctx context.Context) ([]string, error) { | |||
// The query option "Filter" is not supported by /catalog/services. | // The query option "Filter" is not supported by /catalog/services. | |||
// https://www.consul.io/api/catalog.html#list-services | // https://www.consul.io/api/catalog.html#list-services | |||
opts := &api.QueryOptions{AllowStale: p.Stale, RequireConsistent: p.Requi reConsistent, UseCache: p.Cache} | opts := &api.QueryOptions{AllowStale: p.Stale, RequireConsistent: p.Requi reConsistent, UseCache: p.Cache} | |||
serviceNames, _, err := p.client.Catalog().Services(opts) | serviceNames, _, err := p.client.Catalog().Services(opts) | |||
if err != nil { | if err != nil { | |||
return nil, err | return nil, err | |||
} | } | |||
End of changes. 17 change blocks. | ||||
33 lines changed or deleted | 73 lines changed or added |