client.go (traefik-v2.3.2.src) | : | client.go (traefik-v2.3.3.src) | ||
---|---|---|---|---|
skipping to change at line 62 | skipping to change at line 62 | |||
GetIngresses() []*networkingv1beta1.Ingress | GetIngresses() []*networkingv1beta1.Ingress | |||
GetIngressClass() (*networkingv1beta1.IngressClass, error) | GetIngressClass() (*networkingv1beta1.IngressClass, error) | |||
GetService(namespace, name string) (*corev1.Service, bool, error) | GetService(namespace, name string) (*corev1.Service, bool, error) | |||
GetSecret(namespace, name string) (*corev1.Secret, bool, error) | GetSecret(namespace, name string) (*corev1.Secret, bool, error) | |||
GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) | GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) | |||
UpdateIngressStatus(ing *networkingv1beta1.Ingress, ip, hostname string) error | UpdateIngressStatus(ing *networkingv1beta1.Ingress, ip, hostname string) error | |||
GetServerVersion() (*version.Version, error) | GetServerVersion() (*version.Version, error) | |||
} | } | |||
type clientWrapper struct { | type clientWrapper struct { | |||
clientset *kubernetes.Clientset | clientset kubernetes.Interface | |||
factories map[string]informers.SharedInformerFactory | factoriesKube map[string]informers.SharedInformerFactory | |||
factoriesSecret map[string]informers.SharedInformerFactory | ||||
clusterFactory informers.SharedInformerFactory | clusterFactory informers.SharedInformerFactory | |||
ingressLabelSelector labels.Selector | ingressLabelSelector labels.Selector | |||
isNamespaceAll bool | isNamespaceAll bool | |||
watchedNamespaces []string | watchedNamespaces []string | |||
} | } | |||
// newInClusterClient returns a new Provider client that is expected to run | // newInClusterClient returns a new Provider client that is expected to run | |||
// inside the cluster. | // inside the cluster. | |||
func newInClusterClient(endpoint string) (*clientWrapper, error) { | func newInClusterClient(endpoint string) (*clientWrapper, error) { | |||
config, err := rest.InClusterConfig() | config, err := rest.InClusterConfig() | |||
skipping to change at line 126 | skipping to change at line 127 | |||
func createClientFromConfig(c *rest.Config) (*clientWrapper, error) { | func createClientFromConfig(c *rest.Config) (*clientWrapper, error) { | |||
clientset, err := kubernetes.NewForConfig(c) | clientset, err := kubernetes.NewForConfig(c) | |||
if err != nil { | if err != nil { | |||
return nil, err | return nil, err | |||
} | } | |||
return newClientImpl(clientset), nil | return newClientImpl(clientset), nil | |||
} | } | |||
func newClientImpl(clientset *kubernetes.Clientset) *clientWrapper { | func newClientImpl(clientset kubernetes.Interface) *clientWrapper { | |||
return &clientWrapper{ | return &clientWrapper{ | |||
clientset: clientset, | clientset: clientset, | |||
factories: make(map[string]informers.SharedInformerFactory), | factoriesKube: make(map[string]informers.SharedInformerFactory) | |||
, | ||||
factoriesSecret: make(map[string]informers.SharedInformerFactory) | ||||
, | ||||
} | } | |||
} | } | |||
// WatchAll starts namespace-specific controllers for all relevant kinds. | // WatchAll starts namespace-specific controllers for all relevant kinds. | |||
func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (< -chan interface{}, error) { | func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (< -chan interface{}, error) { | |||
eventCh := make(chan interface{}, 1) | eventCh := make(chan interface{}, 1) | |||
eventHandler := c.newResourceEventHandler(eventCh) | eventHandler := c.newResourceEventHandler(eventCh) | |||
if len(namespaces) == 0 { | if len(namespaces) == 0 { | |||
namespaces = []string{metav1.NamespaceAll} | namespaces = []string{metav1.NamespaceAll} | |||
c.isNamespaceAll = true | c.isNamespaceAll = true | |||
} | } | |||
c.watchedNamespaces = namespaces | c.watchedNamespaces = namespaces | |||
notOwnedByHelm := func(opts *metav1.ListOptions) { | ||||
opts.LabelSelector = "owner!=helm" | ||||
} | ||||
for _, ns := range namespaces { | for _, ns := range namespaces { | |||
factory := informers.NewSharedInformerFactoryWithOptions(c.client | factoryKube := informers.NewSharedInformerFactoryWithOptions(c.cl | |||
set, resyncPeriod, informers.WithNamespace(ns)) | ientset, resyncPeriod, informers.WithNamespace(ns)) | |||
factory.Extensions().V1beta1().Ingresses().Informer().AddEventHan | factoryKube.Extensions().V1beta1().Ingresses().Informer().AddEven | |||
dler(eventHandler) | tHandler(eventHandler) | |||
factory.Core().V1().Services().Informer().AddEventHandler(eventHa | factoryKube.Core().V1().Services().Informer().AddEventHandler(eve | |||
ndler) | ntHandler) | |||
factory.Core().V1().Endpoints().Informer().AddEventHandler(eventH | factoryKube.Core().V1().Endpoints().Informer().AddEventHandler(ev | |||
andler) | entHandler) | |||
factory.Core().V1().Secrets().Informer().AddEventHandler(eventHan | ||||
dler) | factorySecret := informers.NewSharedInformerFactoryWithOptions(c. | |||
c.factories[ns] = factory | clientset, resyncPeriod, informers.WithNamespace(ns), informers.WithTweakListOpt | |||
ions(notOwnedByHelm)) | ||||
factorySecret.Core().V1().Secrets().Informer().AddEventHandler(ev | ||||
entHandler) | ||||
c.factoriesKube[ns] = factoryKube | ||||
c.factoriesSecret[ns] = factorySecret | ||||
} | } | |||
for _, ns := range namespaces { | for _, ns := range namespaces { | |||
c.factories[ns].Start(stopCh) | c.factoriesKube[ns].Start(stopCh) | |||
c.factoriesSecret[ns].Start(stopCh) | ||||
} | } | |||
for _, ns := range namespaces { | for _, ns := range namespaces { | |||
for typ, ok := range c.factories[ns].WaitForCacheSync(stopCh) { | for typ, ok := range c.factoriesKube[ns].WaitForCacheSync(stopCh) | |||
{ | ||||
if !ok { | ||||
return nil, fmt.Errorf("timed out waiting for con | ||||
troller caches to sync %s in namespace %q", typ, ns) | ||||
} | ||||
} | ||||
for typ, ok := range c.factoriesSecret[ns].WaitForCacheSync(stopC | ||||
h) { | ||||
if !ok { | if !ok { | |||
return nil, fmt.Errorf("timed out waiting for con troller caches to sync %s in namespace %q", typ, ns) | return nil, fmt.Errorf("timed out waiting for con troller caches to sync %s in namespace %q", typ, ns) | |||
} | } | |||
} | } | |||
} | } | |||
serverVersion, err := c.GetServerVersion() | serverVersion, err := c.GetServerVersion() | |||
if err != nil { | if err != nil { | |||
log.WithoutContext().Errorf("Failed to get server version: %v", e rr) | log.WithoutContext().Errorf("Failed to get server version: %v", e rr) | |||
return eventCh, nil | return eventCh, nil | |||
skipping to change at line 191 | skipping to change at line 208 | |||
} | } | |||
} | } | |||
return eventCh, nil | return eventCh, nil | |||
} | } | |||
// GetIngresses returns all Ingresses for observed namespaces in the cluster. | // GetIngresses returns all Ingresses for observed namespaces in the cluster. | |||
func (c *clientWrapper) GetIngresses() []*networkingv1beta1.Ingress { | func (c *clientWrapper) GetIngresses() []*networkingv1beta1.Ingress { | |||
var results []*networkingv1beta1.Ingress | var results []*networkingv1beta1.Ingress | |||
for ns, factory := range c.factories { | for ns, factory := range c.factoriesKube { | |||
// extensions | // extensions | |||
ings, err := factory.Extensions().V1beta1().Ingresses().Lister(). List(c.ingressLabelSelector) | ings, err := factory.Extensions().V1beta1().Ingresses().Lister(). List(c.ingressLabelSelector) | |||
if err != nil { | if err != nil { | |||
log.Errorf("Failed to list ingresses in namespace %s: %v" , ns, err) | log.Errorf("Failed to list ingresses in namespace %s: %v" , ns, err) | |||
} | } | |||
for _, ing := range ings { | for _, ing := range ings { | |||
n, err := extensionsToNetworking(ing) | n, err := extensionsToNetworking(ing) | |||
if err != nil { | if err != nil { | |||
log.Errorf("Failed to convert ingress %s from ext ensions/v1beta1 to networking/v1beta1: %v", ns, err) | log.Errorf("Failed to convert ingress %s from ext ensions/v1beta1 to networking/v1beta1: %v", ns, err) | |||
skipping to change at line 242 | skipping to change at line 259 | |||
// UpdateIngressStatus updates an Ingress with a provided status. | // UpdateIngressStatus updates an Ingress with a provided status. | |||
func (c *clientWrapper) UpdateIngressStatus(src *networkingv1beta1.Ingress, ip, hostname string) error { | func (c *clientWrapper) UpdateIngressStatus(src *networkingv1beta1.Ingress, ip, hostname string) error { | |||
if !c.isWatchedNamespace(src.Namespace) { | if !c.isWatchedNamespace(src.Namespace) { | |||
return fmt.Errorf("failed to get ingress %s/%s: namespace is not within watched namespaces", src.Namespace, src.Name) | return fmt.Errorf("failed to get ingress %s/%s: namespace is not within watched namespaces", src.Namespace, src.Name) | |||
} | } | |||
if src.GetObjectKind().GroupVersionKind().Group != "networking.k8s.io" { | if src.GetObjectKind().GroupVersionKind().Group != "networking.k8s.io" { | |||
return c.updateIngressStatusOld(src, ip, hostname) | return c.updateIngressStatusOld(src, ip, hostname) | |||
} | } | |||
ing, err := c.factories[c.lookupNamespace(src.Namespace)].Networking().V1 beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name) | ing, err := c.factoriesKube[c.lookupNamespace(src.Namespace)].Networking( ).V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name) | |||
if err != nil { | if err != nil { | |||
return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespac e, src.Name, err) | return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespac e, src.Name, err) | |||
} | } | |||
if len(ing.Status.LoadBalancer.Ingress) > 0 { | if len(ing.Status.LoadBalancer.Ingress) > 0 { | |||
if ing.Status.LoadBalancer.Ingress[0].Hostname == hostname && ing .Status.LoadBalancer.Ingress[0].IP == ip { | if ing.Status.LoadBalancer.Ingress[0].Hostname == hostname && ing .Status.LoadBalancer.Ingress[0].IP == ip { | |||
// If status is already set, skip update | // If status is already set, skip update | |||
log.Debugf("Skipping status update on ingress %s/%s", ing .Namespace, ing.Name) | log.Debugf("Skipping status update on ingress %s/%s", ing .Namespace, ing.Name) | |||
return nil | return nil | |||
} | } | |||
skipping to change at line 271 | skipping to change at line 288 | |||
_, err = c.clientset.NetworkingV1beta1().Ingresses(ingCopy.Namespace).Upd ateStatus(ctx, ingCopy, metav1.UpdateOptions{}) | _, err = c.clientset.NetworkingV1beta1().Ingresses(ingCopy.Namespace).Upd ateStatus(ctx, ingCopy, metav1.UpdateOptions{}) | |||
if err != nil { | if err != nil { | |||
return fmt.Errorf("failed to update ingress status %s/%s: %w", sr c.Namespace, src.Name, err) | return fmt.Errorf("failed to update ingress status %s/%s: %w", sr c.Namespace, src.Name, err) | |||
} | } | |||
log.Infof("Updated status on ingress %s/%s", src.Namespace, src.Name) | log.Infof("Updated status on ingress %s/%s", src.Namespace, src.Name) | |||
return nil | return nil | |||
} | } | |||
func (c *clientWrapper) updateIngressStatusOld(src *networkingv1beta1.Ingress, i p, hostname string) error { | func (c *clientWrapper) updateIngressStatusOld(src *networkingv1beta1.Ingress, i p, hostname string) error { | |||
ing, err := c.factories[c.lookupNamespace(src.Namespace)].Extensions().V1 beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name) | ing, err := c.factoriesKube[c.lookupNamespace(src.Namespace)].Extensions( ).V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name) | |||
if err != nil { | if err != nil { | |||
return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespac e, src.Name, err) | return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespac e, src.Name, err) | |||
} | } | |||
if len(ing.Status.LoadBalancer.Ingress) > 0 { | if len(ing.Status.LoadBalancer.Ingress) > 0 { | |||
if ing.Status.LoadBalancer.Ingress[0].Hostname == hostname && ing .Status.LoadBalancer.Ingress[0].IP == ip { | if ing.Status.LoadBalancer.Ingress[0].Hostname == hostname && ing .Status.LoadBalancer.Ingress[0].IP == ip { | |||
// If status is already set, skip update | // If status is already set, skip update | |||
log.Debugf("Skipping status update on ingress %s/%s", ing .Namespace, ing.Name) | log.Debugf("Skipping status update on ingress %s/%s", ing .Namespace, ing.Name) | |||
return nil | return nil | |||
} | } | |||
skipping to change at line 305 | skipping to change at line 322 | |||
log.Infof("Updated status on ingress %s/%s", src.Namespace, src.Name) | log.Infof("Updated status on ingress %s/%s", src.Namespace, src.Name) | |||
return nil | return nil | |||
} | } | |||
// GetService returns the named service from the given namespace. | // GetService returns the named service from the given namespace. | |||
func (c *clientWrapper) GetService(namespace, name string) (*corev1.Service, boo l, error) { | func (c *clientWrapper) GetService(namespace, name string) (*corev1.Service, boo l, error) { | |||
if !c.isWatchedNamespace(namespace) { | if !c.isWatchedNamespace(namespace) { | |||
return nil, false, fmt.Errorf("failed to get service %s/%s: names pace is not within watched namespaces", namespace, name) | return nil, false, fmt.Errorf("failed to get service %s/%s: names pace is not within watched namespaces", namespace, name) | |||
} | } | |||
service, err := c.factories[c.lookupNamespace(namespace)].Core().V1().Ser vices().Lister().Services(namespace).Get(name) | service, err := c.factoriesKube[c.lookupNamespace(namespace)].Core().V1() .Services().Lister().Services(namespace).Get(name) | |||
exist, err := translateNotFoundError(err) | exist, err := translateNotFoundError(err) | |||
return service, exist, err | return service, exist, err | |||
} | } | |||
// GetEndpoints returns the named endpoints from the given namespace. | // GetEndpoints returns the named endpoints from the given namespace. | |||
func (c *clientWrapper) GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) { | func (c *clientWrapper) GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) { | |||
if !c.isWatchedNamespace(namespace) { | if !c.isWatchedNamespace(namespace) { | |||
return nil, false, fmt.Errorf("failed to get endpoints %s/%s: nam espace is not within watched namespaces", namespace, name) | return nil, false, fmt.Errorf("failed to get endpoints %s/%s: nam espace is not within watched namespaces", namespace, name) | |||
} | } | |||
endpoint, err := c.factories[c.lookupNamespace(namespace)].Core().V1().En dpoints().Lister().Endpoints(namespace).Get(name) | endpoint, err := c.factoriesKube[c.lookupNamespace(namespace)].Core().V1( ).Endpoints().Lister().Endpoints(namespace).Get(name) | |||
exist, err := translateNotFoundError(err) | exist, err := translateNotFoundError(err) | |||
return endpoint, exist, err | return endpoint, exist, err | |||
} | } | |||
// GetSecret returns the named secret from the given namespace. | // GetSecret returns the named secret from the given namespace. | |||
func (c *clientWrapper) GetSecret(namespace, name string) (*corev1.Secret, bool, error) { | func (c *clientWrapper) GetSecret(namespace, name string) (*corev1.Secret, bool, error) { | |||
if !c.isWatchedNamespace(namespace) { | if !c.isWatchedNamespace(namespace) { | |||
return nil, false, fmt.Errorf("failed to get secret %s/%s: namesp ace is not within watched namespaces", namespace, name) | return nil, false, fmt.Errorf("failed to get secret %s/%s: namesp ace is not within watched namespaces", namespace, name) | |||
} | } | |||
secret, err := c.factories[c.lookupNamespace(namespace)].Core().V1().Secr ets().Lister().Secrets(namespace).Get(name) | secret, err := c.factoriesSecret[c.lookupNamespace(namespace)].Core().V1( ).Secrets().Lister().Secrets(namespace).Get(name) | |||
exist, err := translateNotFoundError(err) | exist, err := translateNotFoundError(err) | |||
return secret, exist, err | return secret, exist, err | |||
} | } | |||
func (c *clientWrapper) GetIngressClass() (*networkingv1beta1.IngressClass, erro r) { | func (c *clientWrapper) GetIngressClass() (*networkingv1beta1.IngressClass, erro r) { | |||
if c.clusterFactory == nil { | if c.clusterFactory == nil { | |||
return nil, errors.New("failed to find ingressClass: factory not loaded") | return nil, errors.New("failed to find ingressClass: factory not loaded") | |||
} | } | |||
ingressClasses, err := c.clusterFactory.Networking().V1beta1().IngressCla sses().Lister().List(labels.Everything()) | ingressClasses, err := c.clusterFactory.Networking().V1beta1().IngressCla sses().Lister().List(labels.Everything()) | |||
End of changes. 13 change blocks. | ||||
24 lines changed or deleted | 48 lines changed or added |