"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "pkg/provider/consulcatalog/consul_catalog.go" between
traefik-v2.3.2.src.tar.gz and traefik-v2.3.3.src.tar.gz

About: Traefik is a cloud native edge router, a reverse proxy and load balancer for HTTP and TCP-based applications.

consul_catalog.go  (traefik-v2.3.2.src):consul_catalog.go  (traefik-v2.3.3.src)
skipping to change at line 58 skipping to change at line 58
Cache bool `description:"Use local agent caching f or catalog reads." json:"cache,omitempty" toml:"cache,omitempty" yaml:"cache,omi tempty" export:"true"` Cache bool `description:"Use local agent caching f or catalog reads." json:"cache,omitempty" toml:"cache,omitempty" yaml:"cache,omi tempty" export:"true"`
ExposedByDefault bool `description:"Expose containers by defa ult." json:"exposedByDefault,omitempty" toml:"exposedByDefault,omitempty" yaml:" exposedByDefault,omitempty" export:"true"` ExposedByDefault bool `description:"Expose containers by defa ult." json:"exposedByDefault,omitempty" toml:"exposedByDefault,omitempty" yaml:" exposedByDefault,omitempty" export:"true"`
DefaultRule string `description:"Default rule." json:"defa ultRule,omitempty" toml:"defaultRule,omitempty" yaml:"defaultRule,omitempty"` DefaultRule string `description:"Default rule." json:"defa ultRule,omitempty" toml:"defaultRule,omitempty" yaml:"defaultRule,omitempty"`
client *api.Client client *api.Client
defaultRuleTpl *template.Template defaultRuleTpl *template.Template
} }
// EndpointConfig holds configurations of the endpoint. // EndpointConfig holds configurations of the endpoint.
type EndpointConfig struct { type EndpointConfig struct {
Address string `description:"The address of the Address string `description:"The address of the
Consul server" json:"address,omitempty" toml:"address,omitempty" yaml:"address, Consul server" json:"address,omitempty" toml:"address,omitempty" yaml:"address,
omitempty" export:"true"` omitempty"`
Scheme string `description:"The URI scheme for Scheme string `description:"The URI scheme for
the Consul server" json:"scheme,omitempty" toml:"scheme,omitempty" yaml:"scheme the Consul server" json:"scheme,omitempty" toml:"scheme,omitempty" yaml:"scheme
,omitempty" export:"true"` ,omitempty"`
DataCenter string `description:"Data center to use DataCenter string `description:"Data center to use
. If not provided, the default agent data center is used" json:"datacenter,omite . If not provided, the default agent data center is used" json:"datacenter,omite
mpty" toml:"datacenter,omitempty" yaml:"datacenter,omitempty" export:"true"` mpty" toml:"datacenter,omitempty" yaml:"datacenter,omitempty"`
Token string `description:"Token is used to p Token string `description:"Token is used to p
rovide a per-request ACL token which overrides the agent's default token" json:" rovide a per-request ACL token which overrides the agent's default token" json:"
token,omitempty" toml:"token,omitempty" yaml:"token,omitempty" export:"true"` token,omitempty" toml:"token,omitempty" yaml:"token,omitempty"`
TLS *types.ClientTLS `description:"Enable TLS support ." json:"tls,omitempty" toml:"tls,omitempty" yaml:"tls,omitempty" export:"true"` TLS *types.ClientTLS `description:"Enable TLS support ." json:"tls,omitempty" toml:"tls,omitempty" yaml:"tls,omitempty" export:"true"`
HTTPAuth *EndpointHTTPAuthConfig `description:"Auth info to use f or http access" json:"httpAuth,omitempty" toml:"httpAuth,omitempty" yaml:"httpAu th,omitempty" export:"true"` HTTPAuth *EndpointHTTPAuthConfig `description:"Auth info to use f or http access" json:"httpAuth,omitempty" toml:"httpAuth,omitempty" yaml:"httpAu th,omitempty" export:"true"`
EndpointWaitTime ptypes.Duration `description:"WaitTime limits ho w long a Watch will block. If not provided, the agent default values will be use d" json:"endpointWaitTime,omitempty" toml:"endpointWaitTime,omitempty" yaml:"end pointWaitTime,omitempty" export:"true"` EndpointWaitTime ptypes.Duration `description:"WaitTime limits ho w long a Watch will block. If not provided, the agent default values will be use d" json:"endpointWaitTime,omitempty" toml:"endpointWaitTime,omitempty" yaml:"end pointWaitTime,omitempty" export:"true"`
} }
// SetDefaults sets the default values. // SetDefaults sets the default values.
func (c *EndpointConfig) SetDefaults() { func (c *EndpointConfig) SetDefaults() {
c.Address = "127.0.0.1:8500" c.Address = "127.0.0.1:8500"
} }
// EndpointHTTPAuthConfig holds configurations of the authentication. // EndpointHTTPAuthConfig holds configurations of the authentication.
type EndpointHTTPAuthConfig struct { type EndpointHTTPAuthConfig struct {
Username string `description:"Basic Auth username" json:"username,omitemp Username string `description:"Basic Auth username" json:"username,omitemp
ty" toml:"username,omitempty" yaml:"username,omitempty" export:"true"` ty" toml:"username,omitempty" yaml:"username,omitempty"`
Password string `description:"Basic Auth password" json:"password,omitemp Password string `description:"Basic Auth password" json:"password,omitemp
ty" toml:"password,omitempty" yaml:"password,omitempty" export:"true"` ty" toml:"password,omitempty" yaml:"password,omitempty"`
} }
// SetDefaults sets the default values. // SetDefaults sets the default values.
func (p *Provider) SetDefaults() { func (p *Provider) SetDefaults() {
endpoint := &EndpointConfig{} endpoint := &EndpointConfig{}
endpoint.SetDefaults() endpoint.SetDefaults()
p.Endpoint = endpoint p.Endpoint = endpoint
p.RefreshInterval = ptypes.Duration(15 * time.Second) p.RefreshInterval = ptypes.Duration(15 * time.Second)
p.Prefix = "traefik" p.Prefix = "traefik"
p.ExposedByDefault = true p.ExposedByDefault = true
skipping to change at line 111 skipping to change at line 111
func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe. Pool) error { func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe. Pool) error {
pool.GoCtx(func(routineCtx context.Context) { pool.GoCtx(func(routineCtx context.Context) {
ctxLog := log.With(routineCtx, log.Str(log.ProviderName, "consulc atalog")) ctxLog := log.With(routineCtx, log.Str(log.ProviderName, "consulc atalog"))
logger := log.FromContext(ctxLog) logger := log.FromContext(ctxLog)
operation := func() error { operation := func() error {
var err error var err error
p.client, err = createClient(p.Endpoint) p.client, err = createClient(p.Endpoint)
if err != nil { if err != nil {
return fmt.Errorf("error create consul client, %w ", err) return fmt.Errorf("unable to create consul client : %w", err)
} }
// get configuration at the provider's startup.
err = p.loadConfiguration(routineCtx, configurationChan)
if err != nil {
return fmt.Errorf("failed to get consul catalog d
ata: %w", err)
}
// Periodic refreshes.
ticker := time.NewTicker(time.Duration(p.RefreshInterval) ) ticker := time.NewTicker(time.Duration(p.RefreshInterval) )
defer ticker.Stop()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
data, err := p.getConsulServicesData(rout ineCtx) err = p.loadConfiguration(routineCtx, con figurationChan)
if err != nil { if err != nil {
logger.Errorf("error get consul c return fmt.Errorf("failed to refr
atalog data, %v", err) esh consul catalog data: %w", err)
return err
} }
configuration := p.buildConfiguration(rou
tineCtx, data)
configurationChan <- dynamic.Message{
ProviderName: "consulcatalog",
Configuration: configuration,
}
case <-routineCtx.Done(): case <-routineCtx.Done():
ticker.Stop()
return nil return nil
} }
} }
} }
notify := func(err error, time time.Duration) { notify := func(err error, time time.Duration) {
logger.Errorf("Provider connection error %+v, retrying in %s", err, time) logger.Errorf("Provider connection error %+v, retrying in %s", err, time)
} }
err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctxLog), no tify) err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctxLog), no tify)
if err != nil { if err != nil {
logger.Errorf("Cannot connect to consul catalog server %+ v", err) logger.Errorf("Cannot connect to consul catalog server %+ v", err)
} }
}) })
return nil return nil
} }
func (p *Provider) loadConfiguration(ctx context.Context, configurationChan chan
<- dynamic.Message) error {
data, err := p.getConsulServicesData(ctx)
if err != nil {
return err
}
configurationChan <- dynamic.Message{
ProviderName: "consulcatalog",
Configuration: p.buildConfiguration(ctx, data),
}
return nil
}
func (p *Provider) getConsulServicesData(ctx context.Context) ([]itemData, error ) { func (p *Provider) getConsulServicesData(ctx context.Context) ([]itemData, error ) {
consulServiceNames, err := p.fetchServices(ctx) consulServiceNames, err := p.fetchServices(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var data []itemData var data []itemData
for _, name := range consulServiceNames { for _, name := range consulServiceNames {
consulServices, healthServices, err := p.fetchService(ctx, name) consulServices, statuses, err := p.fetchService(ctx, name)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for i, consulService := range consulServices { for _, consulService := range consulServices {
address := consulService.ServiceAddress address := consulService.ServiceAddress
if address == "" { if address == "" {
address = consulService.Address address = consulService.Address
} }
status, exists := statuses[consulService.ID+consulService
.ServiceID]
if !exists {
status = api.HealthAny
}
item := itemData{ item := itemData{
ID: consulService.ServiceID, ID: consulService.ServiceID,
Node: consulService.Node, Node: consulService.Node,
Name: consulService.ServiceName, Name: consulService.ServiceName,
Address: address, Address: address,
Port: strconv.Itoa(consulService.ServicePort), Port: strconv.Itoa(consulService.ServicePort),
Labels: tagsToNeutralLabels(consulService.Servic eTags, p.Prefix), Labels: tagsToNeutralLabels(consulService.Servic eTags, p.Prefix),
Tags: consulService.ServiceTags, Tags: consulService.ServiceTags,
Status: healthServices[i].Checks.AggregatedStatu s(), Status: status,
} }
extraConf, err := p.getConfiguration(item) extraConf, err := p.getConfiguration(item)
if err != nil { if err != nil {
log.FromContext(ctx).Errorf("Skip item %s: %v", i tem.Name, err) log.FromContext(ctx).Errorf("Skip item %s: %v", i tem.Name, err)
continue continue
} }
item.ExtraConf = extraConf item.ExtraConf = extraConf
data = append(data, item) data = append(data, item)
} }
} }
return data, nil return data, nil
} }
func (p *Provider) fetchService(ctx context.Context, name string) ([]*api.Catalo gService, []*api.ServiceEntry, error) { func (p *Provider) fetchService(ctx context.Context, name string) ([]*api.Catalo gService, map[string]string, error) {
var tagFilter string var tagFilter string
if !p.ExposedByDefault { if !p.ExposedByDefault {
tagFilter = p.Prefix + ".enable=true" tagFilter = p.Prefix + ".enable=true"
} }
opts := &api.QueryOptions{AllowStale: p.Stale, RequireConsistent: p.Requi reConsistent, UseCache: p.Cache} opts := &api.QueryOptions{AllowStale: p.Stale, RequireConsistent: p.Requi reConsistent, UseCache: p.Cache}
opts = opts.WithContext(ctx)
consulServices, _, err := p.client.Catalog().Service(name, tagFilter, opt s) consulServices, _, err := p.client.Catalog().Service(name, tagFilter, opt s)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
healthServices, _, err := p.client.Health().Service(name, tagFilter, fals e, opts) healthServices, _, err := p.client.Health().Service(name, tagFilter, fals e, opts)
return consulServices, healthServices, err if err != nil {
return nil, nil, err
}
// Index status by service and node so it can be retrieved from a Catalog
Service even if the health and services
// are not in sync.
statuses := make(map[string]string)
for _, health := range healthServices {
if health.Service == nil || health.Node == nil {
continue
}
statuses[health.Node.ID+health.Service.ID] = health.Checks.Aggreg
atedStatus()
}
return consulServices, statuses, err
} }
func (p *Provider) fetchServices(ctx context.Context) ([]string, error) { func (p *Provider) fetchServices(ctx context.Context) ([]string, error) {
// The query option "Filter" is not supported by /catalog/services. // The query option "Filter" is not supported by /catalog/services.
// https://www.consul.io/api/catalog.html#list-services // https://www.consul.io/api/catalog.html#list-services
opts := &api.QueryOptions{AllowStale: p.Stale, RequireConsistent: p.Requi reConsistent, UseCache: p.Cache} opts := &api.QueryOptions{AllowStale: p.Stale, RequireConsistent: p.Requi reConsistent, UseCache: p.Cache}
serviceNames, _, err := p.client.Catalog().Services(opts) serviceNames, _, err := p.client.Catalog().Services(opts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
 End of changes. 17 change blocks. 
33 lines changed or deleted 73 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)