"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "pkg/provider/kubernetes/ingress/client.go" between
traefik-v2.3.2.src.tar.gz and traefik-v2.3.3.src.tar.gz

About: Traefik is a cloud native edge router, a reverse proxy and load balancer for HTTP and TCP-based applications.

client.go  (traefik-v2.3.2.src):client.go  (traefik-v2.3.3.src)
skipping to change at line 62 skipping to change at line 62
GetIngresses() []*networkingv1beta1.Ingress GetIngresses() []*networkingv1beta1.Ingress
GetIngressClass() (*networkingv1beta1.IngressClass, error) GetIngressClass() (*networkingv1beta1.IngressClass, error)
GetService(namespace, name string) (*corev1.Service, bool, error) GetService(namespace, name string) (*corev1.Service, bool, error)
GetSecret(namespace, name string) (*corev1.Secret, bool, error) GetSecret(namespace, name string) (*corev1.Secret, bool, error)
GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error)
UpdateIngressStatus(ing *networkingv1beta1.Ingress, ip, hostname string) error UpdateIngressStatus(ing *networkingv1beta1.Ingress, ip, hostname string) error
GetServerVersion() (*version.Version, error) GetServerVersion() (*version.Version, error)
} }
type clientWrapper struct { type clientWrapper struct {
clientset *kubernetes.Clientset clientset kubernetes.Interface
factories map[string]informers.SharedInformerFactory factoriesKube map[string]informers.SharedInformerFactory
factoriesSecret map[string]informers.SharedInformerFactory
clusterFactory informers.SharedInformerFactory clusterFactory informers.SharedInformerFactory
ingressLabelSelector labels.Selector ingressLabelSelector labels.Selector
isNamespaceAll bool isNamespaceAll bool
watchedNamespaces []string watchedNamespaces []string
} }
// newInClusterClient returns a new Provider client that is expected to run // newInClusterClient returns a new Provider client that is expected to run
// inside the cluster. // inside the cluster.
func newInClusterClient(endpoint string) (*clientWrapper, error) { func newInClusterClient(endpoint string) (*clientWrapper, error) {
config, err := rest.InClusterConfig() config, err := rest.InClusterConfig()
skipping to change at line 126 skipping to change at line 127
func createClientFromConfig(c *rest.Config) (*clientWrapper, error) { func createClientFromConfig(c *rest.Config) (*clientWrapper, error) {
clientset, err := kubernetes.NewForConfig(c) clientset, err := kubernetes.NewForConfig(c)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newClientImpl(clientset), nil return newClientImpl(clientset), nil
} }
func newClientImpl(clientset *kubernetes.Clientset) *clientWrapper { func newClientImpl(clientset kubernetes.Interface) *clientWrapper {
return &clientWrapper{ return &clientWrapper{
clientset: clientset, clientset: clientset,
factories: make(map[string]informers.SharedInformerFactory), factoriesKube: make(map[string]informers.SharedInformerFactory)
,
factoriesSecret: make(map[string]informers.SharedInformerFactory)
,
} }
} }
// WatchAll starts namespace-specific controllers for all relevant kinds. // WatchAll starts namespace-specific controllers for all relevant kinds.
func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (< -chan interface{}, error) { func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (< -chan interface{}, error) {
eventCh := make(chan interface{}, 1) eventCh := make(chan interface{}, 1)
eventHandler := c.newResourceEventHandler(eventCh) eventHandler := c.newResourceEventHandler(eventCh)
if len(namespaces) == 0 { if len(namespaces) == 0 {
namespaces = []string{metav1.NamespaceAll} namespaces = []string{metav1.NamespaceAll}
c.isNamespaceAll = true c.isNamespaceAll = true
} }
c.watchedNamespaces = namespaces c.watchedNamespaces = namespaces
notOwnedByHelm := func(opts *metav1.ListOptions) {
opts.LabelSelector = "owner!=helm"
}
for _, ns := range namespaces { for _, ns := range namespaces {
factory := informers.NewSharedInformerFactoryWithOptions(c.client factoryKube := informers.NewSharedInformerFactoryWithOptions(c.cl
set, resyncPeriod, informers.WithNamespace(ns)) ientset, resyncPeriod, informers.WithNamespace(ns))
factory.Extensions().V1beta1().Ingresses().Informer().AddEventHan factoryKube.Extensions().V1beta1().Ingresses().Informer().AddEven
dler(eventHandler) tHandler(eventHandler)
factory.Core().V1().Services().Informer().AddEventHandler(eventHa factoryKube.Core().V1().Services().Informer().AddEventHandler(eve
ndler) ntHandler)
factory.Core().V1().Endpoints().Informer().AddEventHandler(eventH factoryKube.Core().V1().Endpoints().Informer().AddEventHandler(ev
andler) entHandler)
factory.Core().V1().Secrets().Informer().AddEventHandler(eventHan
dler) factorySecret := informers.NewSharedInformerFactoryWithOptions(c.
c.factories[ns] = factory clientset, resyncPeriod, informers.WithNamespace(ns), informers.WithTweakListOpt
ions(notOwnedByHelm))
factorySecret.Core().V1().Secrets().Informer().AddEventHandler(ev
entHandler)
c.factoriesKube[ns] = factoryKube
c.factoriesSecret[ns] = factorySecret
} }
for _, ns := range namespaces { for _, ns := range namespaces {
c.factories[ns].Start(stopCh) c.factoriesKube[ns].Start(stopCh)
c.factoriesSecret[ns].Start(stopCh)
} }
for _, ns := range namespaces { for _, ns := range namespaces {
for typ, ok := range c.factories[ns].WaitForCacheSync(stopCh) { for typ, ok := range c.factoriesKube[ns].WaitForCacheSync(stopCh)
{
if !ok {
return nil, fmt.Errorf("timed out waiting for con
troller caches to sync %s in namespace %q", typ, ns)
}
}
for typ, ok := range c.factoriesSecret[ns].WaitForCacheSync(stopC
h) {
if !ok { if !ok {
return nil, fmt.Errorf("timed out waiting for con troller caches to sync %s in namespace %q", typ, ns) return nil, fmt.Errorf("timed out waiting for con troller caches to sync %s in namespace %q", typ, ns)
} }
} }
} }
serverVersion, err := c.GetServerVersion() serverVersion, err := c.GetServerVersion()
if err != nil { if err != nil {
log.WithoutContext().Errorf("Failed to get server version: %v", e rr) log.WithoutContext().Errorf("Failed to get server version: %v", e rr)
return eventCh, nil return eventCh, nil
skipping to change at line 191 skipping to change at line 208
} }
} }
return eventCh, nil return eventCh, nil
} }
// GetIngresses returns all Ingresses for observed namespaces in the cluster. // GetIngresses returns all Ingresses for observed namespaces in the cluster.
func (c *clientWrapper) GetIngresses() []*networkingv1beta1.Ingress { func (c *clientWrapper) GetIngresses() []*networkingv1beta1.Ingress {
var results []*networkingv1beta1.Ingress var results []*networkingv1beta1.Ingress
for ns, factory := range c.factories { for ns, factory := range c.factoriesKube {
// extensions // extensions
ings, err := factory.Extensions().V1beta1().Ingresses().Lister(). List(c.ingressLabelSelector) ings, err := factory.Extensions().V1beta1().Ingresses().Lister(). List(c.ingressLabelSelector)
if err != nil { if err != nil {
log.Errorf("Failed to list ingresses in namespace %s: %v" , ns, err) log.Errorf("Failed to list ingresses in namespace %s: %v" , ns, err)
} }
for _, ing := range ings { for _, ing := range ings {
n, err := extensionsToNetworking(ing) n, err := extensionsToNetworking(ing)
if err != nil { if err != nil {
log.Errorf("Failed to convert ingress %s from ext ensions/v1beta1 to networking/v1beta1: %v", ns, err) log.Errorf("Failed to convert ingress %s from ext ensions/v1beta1 to networking/v1beta1: %v", ns, err)
skipping to change at line 242 skipping to change at line 259
// UpdateIngressStatus updates an Ingress with a provided status. // UpdateIngressStatus updates an Ingress with a provided status.
func (c *clientWrapper) UpdateIngressStatus(src *networkingv1beta1.Ingress, ip, hostname string) error { func (c *clientWrapper) UpdateIngressStatus(src *networkingv1beta1.Ingress, ip, hostname string) error {
if !c.isWatchedNamespace(src.Namespace) { if !c.isWatchedNamespace(src.Namespace) {
return fmt.Errorf("failed to get ingress %s/%s: namespace is not within watched namespaces", src.Namespace, src.Name) return fmt.Errorf("failed to get ingress %s/%s: namespace is not within watched namespaces", src.Namespace, src.Name)
} }
if src.GetObjectKind().GroupVersionKind().Group != "networking.k8s.io" { if src.GetObjectKind().GroupVersionKind().Group != "networking.k8s.io" {
return c.updateIngressStatusOld(src, ip, hostname) return c.updateIngressStatusOld(src, ip, hostname)
} }
ing, err := c.factories[c.lookupNamespace(src.Namespace)].Networking().V1 beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name) ing, err := c.factoriesKube[c.lookupNamespace(src.Namespace)].Networking( ).V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name)
if err != nil { if err != nil {
return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespac e, src.Name, err) return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespac e, src.Name, err)
} }
if len(ing.Status.LoadBalancer.Ingress) > 0 { if len(ing.Status.LoadBalancer.Ingress) > 0 {
if ing.Status.LoadBalancer.Ingress[0].Hostname == hostname && ing .Status.LoadBalancer.Ingress[0].IP == ip { if ing.Status.LoadBalancer.Ingress[0].Hostname == hostname && ing .Status.LoadBalancer.Ingress[0].IP == ip {
// If status is already set, skip update // If status is already set, skip update
log.Debugf("Skipping status update on ingress %s/%s", ing .Namespace, ing.Name) log.Debugf("Skipping status update on ingress %s/%s", ing .Namespace, ing.Name)
return nil return nil
} }
skipping to change at line 271 skipping to change at line 288
_, err = c.clientset.NetworkingV1beta1().Ingresses(ingCopy.Namespace).Upd ateStatus(ctx, ingCopy, metav1.UpdateOptions{}) _, err = c.clientset.NetworkingV1beta1().Ingresses(ingCopy.Namespace).Upd ateStatus(ctx, ingCopy, metav1.UpdateOptions{})
if err != nil { if err != nil {
return fmt.Errorf("failed to update ingress status %s/%s: %w", sr c.Namespace, src.Name, err) return fmt.Errorf("failed to update ingress status %s/%s: %w", sr c.Namespace, src.Name, err)
} }
log.Infof("Updated status on ingress %s/%s", src.Namespace, src.Name) log.Infof("Updated status on ingress %s/%s", src.Namespace, src.Name)
return nil return nil
} }
func (c *clientWrapper) updateIngressStatusOld(src *networkingv1beta1.Ingress, i p, hostname string) error { func (c *clientWrapper) updateIngressStatusOld(src *networkingv1beta1.Ingress, i p, hostname string) error {
ing, err := c.factories[c.lookupNamespace(src.Namespace)].Extensions().V1 beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name) ing, err := c.factoriesKube[c.lookupNamespace(src.Namespace)].Extensions( ).V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name)
if err != nil { if err != nil {
return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespac e, src.Name, err) return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespac e, src.Name, err)
} }
if len(ing.Status.LoadBalancer.Ingress) > 0 { if len(ing.Status.LoadBalancer.Ingress) > 0 {
if ing.Status.LoadBalancer.Ingress[0].Hostname == hostname && ing .Status.LoadBalancer.Ingress[0].IP == ip { if ing.Status.LoadBalancer.Ingress[0].Hostname == hostname && ing .Status.LoadBalancer.Ingress[0].IP == ip {
// If status is already set, skip update // If status is already set, skip update
log.Debugf("Skipping status update on ingress %s/%s", ing .Namespace, ing.Name) log.Debugf("Skipping status update on ingress %s/%s", ing .Namespace, ing.Name)
return nil return nil
} }
skipping to change at line 305 skipping to change at line 322
log.Infof("Updated status on ingress %s/%s", src.Namespace, src.Name) log.Infof("Updated status on ingress %s/%s", src.Namespace, src.Name)
return nil return nil
} }
// GetService returns the named service from the given namespace. // GetService returns the named service from the given namespace.
func (c *clientWrapper) GetService(namespace, name string) (*corev1.Service, boo l, error) { func (c *clientWrapper) GetService(namespace, name string) (*corev1.Service, boo l, error) {
if !c.isWatchedNamespace(namespace) { if !c.isWatchedNamespace(namespace) {
return nil, false, fmt.Errorf("failed to get service %s/%s: names pace is not within watched namespaces", namespace, name) return nil, false, fmt.Errorf("failed to get service %s/%s: names pace is not within watched namespaces", namespace, name)
} }
service, err := c.factories[c.lookupNamespace(namespace)].Core().V1().Ser vices().Lister().Services(namespace).Get(name) service, err := c.factoriesKube[c.lookupNamespace(namespace)].Core().V1() .Services().Lister().Services(namespace).Get(name)
exist, err := translateNotFoundError(err) exist, err := translateNotFoundError(err)
return service, exist, err return service, exist, err
} }
// GetEndpoints returns the named endpoints from the given namespace. // GetEndpoints returns the named endpoints from the given namespace.
func (c *clientWrapper) GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) { func (c *clientWrapper) GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) {
if !c.isWatchedNamespace(namespace) { if !c.isWatchedNamespace(namespace) {
return nil, false, fmt.Errorf("failed to get endpoints %s/%s: nam espace is not within watched namespaces", namespace, name) return nil, false, fmt.Errorf("failed to get endpoints %s/%s: nam espace is not within watched namespaces", namespace, name)
} }
endpoint, err := c.factories[c.lookupNamespace(namespace)].Core().V1().En dpoints().Lister().Endpoints(namespace).Get(name) endpoint, err := c.factoriesKube[c.lookupNamespace(namespace)].Core().V1( ).Endpoints().Lister().Endpoints(namespace).Get(name)
exist, err := translateNotFoundError(err) exist, err := translateNotFoundError(err)
return endpoint, exist, err return endpoint, exist, err
} }
// GetSecret returns the named secret from the given namespace. // GetSecret returns the named secret from the given namespace.
func (c *clientWrapper) GetSecret(namespace, name string) (*corev1.Secret, bool, error) { func (c *clientWrapper) GetSecret(namespace, name string) (*corev1.Secret, bool, error) {
if !c.isWatchedNamespace(namespace) { if !c.isWatchedNamespace(namespace) {
return nil, false, fmt.Errorf("failed to get secret %s/%s: namesp ace is not within watched namespaces", namespace, name) return nil, false, fmt.Errorf("failed to get secret %s/%s: namesp ace is not within watched namespaces", namespace, name)
} }
secret, err := c.factories[c.lookupNamespace(namespace)].Core().V1().Secr ets().Lister().Secrets(namespace).Get(name) secret, err := c.factoriesSecret[c.lookupNamespace(namespace)].Core().V1( ).Secrets().Lister().Secrets(namespace).Get(name)
exist, err := translateNotFoundError(err) exist, err := translateNotFoundError(err)
return secret, exist, err return secret, exist, err
} }
func (c *clientWrapper) GetIngressClass() (*networkingv1beta1.IngressClass, erro r) { func (c *clientWrapper) GetIngressClass() (*networkingv1beta1.IngressClass, erro r) {
if c.clusterFactory == nil { if c.clusterFactory == nil {
return nil, errors.New("failed to find ingressClass: factory not loaded") return nil, errors.New("failed to find ingressClass: factory not loaded")
} }
ingressClasses, err := c.clusterFactory.Networking().V1beta1().IngressCla sses().Lister().List(labels.Everything()) ingressClasses, err := c.clusterFactory.Networking().V1beta1().IngressCla sses().Lister().List(labels.Everything())
 End of changes. 13 change blocks. 
24 lines changed or deleted 48 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)