"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "vendor/github.com/couchbaselabs/go-couchbase/streaming.go" between
gitea-1.9.3.tar.gz and gitea-1.9.4.tar.gz

About: Gitea allows to set up a self-hosted code hosting Git service (a fork of Gogs).

streaming.go  (gitea-1.9.3):streaming.go  (gitea-1.9.4)
skipping to change at line 91 skipping to change at line 91
} }
} }
return return
} }
func (b *Bucket) UpdateBucket() error { func (b *Bucket) UpdateBucket() error {
var failures int var failures int
var returnErr error var returnErr error
var poolServices PoolServices
var err error
tlsConfig := b.pool.client.tlsConfig
if tlsConfig != nil {
poolServices, err = b.pool.client.GetPoolServices("default")
if err != nil {
return err
}
}
for { for {
if failures == MAX_RETRY_COUNT { if failures == MAX_RETRY_COUNT {
logging.Errorf(" Maximum failures reached. Exiting loop.. .") logging.Errorf(" Maximum failures reached. Exiting loop.. .")
return fmt.Errorf("Max failures reached. Last Error %v", returnErr) return fmt.Errorf("Max failures reached. Last Error %v", returnErr)
} }
nodes := b.Nodes() nodes := b.Nodes()
if len(nodes) < 1 { if len(nodes) < 1 {
return fmt.Errorf("No healthy nodes found") return fmt.Errorf("No healthy nodes found")
skipping to change at line 113 skipping to change at line 123
startNode := rand.Intn(len(nodes)) startNode := rand.Intn(len(nodes))
node := nodes[(startNode)%len(nodes)] node := nodes[(startNode)%len(nodes)]
streamUrl := fmt.Sprintf("http://%s/pools/default/bucketsStreamin g/%s", node.Hostname, b.GetName()) streamUrl := fmt.Sprintf("http://%s/pools/default/bucketsStreamin g/%s", node.Hostname, b.GetName())
logging.Infof(" Trying with %s", streamUrl) logging.Infof(" Trying with %s", streamUrl)
req, err := http.NewRequest("GET", streamUrl, nil) req, err := http.NewRequest("GET", streamUrl, nil)
if err != nil { if err != nil {
return err return err
} }
b.RLock()
pool := b.pool
bucketName := b.Name
b.RUnlock()
scopes, err := getScopesAndCollections(pool, bucketName)
if err != nil {
return err
}
// Lock here to avoid having pool closed under us. // Lock here to avoid having pool closed under us.
b.RLock() b.RLock()
err = maybeAddAuth(req, b.pool.client.ah) err = maybeAddAuth(req, b.pool.client.ah)
b.RUnlock() b.RUnlock()
if err != nil { if err != nil {
return err return err
} }
res, err := doHTTPRequestForUpdate(req) res, err := doHTTPRequestForUpdate(req)
if err != nil { if err != nil {
skipping to change at line 179 skipping to change at line 180
for i := range newcps { for i := range newcps {
// get the old connection pool and check if it is still valid // get the old connection pool and check if it is still valid
pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerL ist[i], true /* bucket already locked */) pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerL ist[i], true /* bucket already locked */)
if pool != nil && pool.inUse == false { if pool != nil && pool.inUse == false {
// if the hostname and index is unchanged then reuse this pool // if the hostname and index is unchanged then reuse this pool
newcps[i] = pool newcps[i] = pool
pool.inUse = true pool.inUse = true
continue continue
} }
// else create a new pool // else create a new pool
hostport := tmpb.VBSMJson.ServerList[i]
if tlsConfig != nil {
hostport, err = MapKVtoSSL(hostport, &poo
lServices)
if err != nil {
b.Unlock()
return err
}
}
if b.ah != nil { if b.ah != nil {
newcps[i] = newConnectionPool( newcps[i] = newConnectionPool(hostport,
tmpb.VBSMJson.ServerList[i], b.ah, false, PoolSize, PoolOverfl
b.ah, false, PoolSize, PoolOverfl ow, b.pool.client.tlsConfig, b.Name)
ow)
} else { } else {
newcps[i] = newConnectionPool( newcps[i] = newConnectionPool(hostport,
tmpb.VBSMJson.ServerList[i],
b.authHandler(true /* bucket alre ady locked */), b.authHandler(true /* bucket alre ady locked */),
false, PoolSize, PoolOverflow) false, PoolSize, PoolOverflow, b. pool.client.tlsConfig, b.Name)
} }
} }
b.replaceConnPools2(newcps, true /* bucket already locked */) b.replaceConnPools2(newcps, true /* bucket already locked */)
tmpb.ah = b.ah tmpb.ah = b.ah
b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson) b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson)
b.nodeList = unsafe.Pointer(&tmpb.NodesJSON) b.nodeList = unsafe.Pointer(&tmpb.NodesJSON)
b.Scopes = scopes
b.Unlock() b.Unlock()
logging.Infof("Got new configuration for bucket %s", b.Ge tName()) logging.Infof("Got new configuration for bucket %s", b.Ge tName())
} }
// we are here because of an error // we are here because of an error
failures++ failures++
continue continue
} }
 End of changes. 7 change blocks. 
17 lines changed or deleted 24 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)