"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "client/v3/experimental/recipes/double_barrier.go" between
etcd-3.5.5.tar.gz and etcd-3.5.6.tar.gz

About: etcd is a distributed reliable key-value store for the most critical data of a distributed system (written in "Go").

double_barrier.go  (etcd-3.5.5):double_barrier.go  (etcd-3.5.6)
skipping to change at line 48 skipping to change at line 48
s: s, s: s,
ctx: context.TODO(), ctx: context.TODO(),
key: key, key: key,
count: count, count: count,
} }
} }
// Enter waits for "count" processes to enter the barrier then returns // Enter waits for "count" processes to enter the barrier then returns
func (b *DoubleBarrier) Enter() error { func (b *DoubleBarrier) Enter() error {
client := b.s.Client() client := b.s.Client()
// Check the entered clients before creating the UniqueEphemeralKey,
// fail the request if there are already too many clients.
if resp1, err := b.enteredClients(client); err != nil {
return err
} else if len(resp1.Kvs) >= b.count {
return ErrTooManyClients
}
ek, err := newUniqueEphemeralKey(b.s, b.key+"/waiters") ek, err := newUniqueEphemeralKey(b.s, b.key+"/waiters")
if err != nil { if err != nil {
return err return err
} }
b.myKey = ek b.myKey = ek
resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) // Check the entered clients after creating the UniqueEphemeralKey
resp2, err := b.enteredClients(client)
if err != nil { if err != nil {
return err return err
} }
if len(resp2.Kvs) >= b.count {
lastWaiter := resp2.Kvs[b.count-1]
if ek.rev > lastWaiter.CreateRevision {
// delete itself now, otherwise other processes may need
to wait
// until these keys are automatically deleted when the re
lated
// lease expires.
if err = b.myKey.Delete(); err != nil {
// Nothing to do here. We have to wait for the ke
y to be
// deleted when the lease expires.
}
return ErrTooManyClients
}
if len(resp.Kvs) > b.count { if ek.rev == lastWaiter.CreateRevision {
return ErrTooManyClients // TODO(ahrtr): we might need to compare ek.key and
} // string(lastWaiter.Key), they should be equal.
// unblock all other waiters
if len(resp.Kvs) == b.count { _, err = client.Put(b.ctx, b.key+"/ready", "")
// unblock waiters return err
_, err = client.Put(b.ctx, b.key+"/ready", "") }
return err
} }
_, err = WaitEvents( _, err = WaitEvents(
client, client,
b.key+"/ready", b.key+"/ready",
ek.Revision(), ek.Revision(),
[]mvccpb.Event_EventType{mvccpb.PUT}) []mvccpb.Event_EventType{mvccpb.PUT})
return err return err
} }
// enteredClients gets all the entered clients, which are ordered by the
// createRevision in ascending order.
func (b *DoubleBarrier) enteredClients(cli *clientv3.Client) (*clientv3.GetRespo
nse, error) {
resp, err := cli.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortAsc
end))
if err != nil {
return nil, err
}
return resp, nil
}
// Leave waits for "count" processes to leave the barrier then returns // Leave waits for "count" processes to leave the barrier then returns
func (b *DoubleBarrier) Leave() error { func (b *DoubleBarrier) Leave() error {
client := b.s.Client() client := b.s.Client()
resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
if err != nil { if err != nil {
return err return err
} }
if len(resp.Kvs) == 0 { if len(resp.Kvs) == 0 {
return nil return nil
} }
skipping to change at line 99 skipping to change at line 132
for _, k := range resp.Kvs { for _, k := range resp.Kvs {
if k.ModRevision < lowest.ModRevision { if k.ModRevision < lowest.ModRevision {
lowest = k lowest = k
} }
if k.ModRevision > highest.ModRevision { if k.ModRevision > highest.ModRevision {
highest = k highest = k
} }
} }
isLowest := string(lowest.Key) == b.myKey.Key() isLowest := string(lowest.Key) == b.myKey.Key()
if len(resp.Kvs) == 1 { if len(resp.Kvs) == 1 && isLowest {
// this is the only node in the barrier; finish up // this is the only node in the barrier; finish up
if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil { if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil {
return err return err
} }
return b.myKey.Delete() return b.myKey.Delete()
} }
// this ensures that if a process fails, the ephemeral lease will be // this ensures that if a process fails, the ephemeral lease will be
// revoked, its barrier key is removed, and the barrier can resume // revoked, its barrier key is removed, and the barrier can resume
 End of changes. 6 change blocks. 
10 lines changed or deleted 48 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)