double_barrier.go (etcd-3.5.5) | : | double_barrier.go (etcd-3.5.6) | ||
---|---|---|---|---|
skipping to change at line 48 | skipping to change at line 48 | |||
s: s, | s: s, | |||
ctx: context.TODO(), | ctx: context.TODO(), | |||
key: key, | key: key, | |||
count: count, | count: count, | |||
} | } | |||
} | } | |||
// Enter waits for "count" processes to enter the barrier then returns | // Enter waits for "count" processes to enter the barrier then returns | |||
func (b *DoubleBarrier) Enter() error { | func (b *DoubleBarrier) Enter() error { | |||
client := b.s.Client() | client := b.s.Client() | |||
// Check the entered clients before creating the UniqueEphemeralKey, | ||||
// fail the request if there are already too many clients. | ||||
if resp1, err := b.enteredClients(client); err != nil { | ||||
return err | ||||
} else if len(resp1.Kvs) >= b.count { | ||||
return ErrTooManyClients | ||||
} | ||||
ek, err := newUniqueEphemeralKey(b.s, b.key+"/waiters") | ek, err := newUniqueEphemeralKey(b.s, b.key+"/waiters") | |||
if err != nil { | if err != nil { | |||
return err | return err | |||
} | } | |||
b.myKey = ek | b.myKey = ek | |||
resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) | // Check the entered clients after creating the UniqueEphemeralKey | |||
resp2, err := b.enteredClients(client) | ||||
if err != nil { | if err != nil { | |||
return err | return err | |||
} | } | |||
if len(resp2.Kvs) >= b.count { | ||||
lastWaiter := resp2.Kvs[b.count-1] | ||||
if ek.rev > lastWaiter.CreateRevision { | ||||
// delete itself now, otherwise other processes may need | ||||
to wait | ||||
// until these keys are automatically deleted when the re | ||||
lated | ||||
// lease expires. | ||||
if err = b.myKey.Delete(); err != nil { | ||||
// Nothing to do here. We have to wait for the ke | ||||
y to be | ||||
// deleted when the lease expires. | ||||
} | ||||
return ErrTooManyClients | ||||
} | ||||
if len(resp.Kvs) > b.count { | if ek.rev == lastWaiter.CreateRevision { | |||
return ErrTooManyClients | // TODO(ahrtr): we might need to compare ek.key and | |||
} | // string(lastWaiter.Key), they should be equal. | |||
// unblock all other waiters | ||||
if len(resp.Kvs) == b.count { | _, err = client.Put(b.ctx, b.key+"/ready", "") | |||
// unblock waiters | return err | |||
_, err = client.Put(b.ctx, b.key+"/ready", "") | } | |||
return err | ||||
} | } | |||
_, err = WaitEvents( | _, err = WaitEvents( | |||
client, | client, | |||
b.key+"/ready", | b.key+"/ready", | |||
ek.Revision(), | ek.Revision(), | |||
[]mvccpb.Event_EventType{mvccpb.PUT}) | []mvccpb.Event_EventType{mvccpb.PUT}) | |||
return err | return err | |||
} | } | |||
// enteredClients gets all the entered clients, which are ordered by the | ||||
// createRevision in ascending order. | ||||
func (b *DoubleBarrier) enteredClients(cli *clientv3.Client) (*clientv3.GetRespo | ||||
nse, error) { | ||||
resp, err := cli.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix(), | ||||
clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortAsc | ||||
end)) | ||||
if err != nil { | ||||
return nil, err | ||||
} | ||||
return resp, nil | ||||
} | ||||
// Leave waits for "count" processes to leave the barrier then returns | // Leave waits for "count" processes to leave the barrier then returns | |||
func (b *DoubleBarrier) Leave() error { | func (b *DoubleBarrier) Leave() error { | |||
client := b.s.Client() | client := b.s.Client() | |||
resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) | resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) | |||
if err != nil { | if err != nil { | |||
return err | return err | |||
} | } | |||
if len(resp.Kvs) == 0 { | if len(resp.Kvs) == 0 { | |||
return nil | return nil | |||
} | } | |||
skipping to change at line 99 | skipping to change at line 132 | |||
for _, k := range resp.Kvs { | for _, k := range resp.Kvs { | |||
if k.ModRevision < lowest.ModRevision { | if k.ModRevision < lowest.ModRevision { | |||
lowest = k | lowest = k | |||
} | } | |||
if k.ModRevision > highest.ModRevision { | if k.ModRevision > highest.ModRevision { | |||
highest = k | highest = k | |||
} | } | |||
} | } | |||
isLowest := string(lowest.Key) == b.myKey.Key() | isLowest := string(lowest.Key) == b.myKey.Key() | |||
if len(resp.Kvs) == 1 { | if len(resp.Kvs) == 1 && isLowest { | |||
// this is the only node in the barrier; finish up | // this is the only node in the barrier; finish up | |||
if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil { | if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil { | |||
return err | return err | |||
} | } | |||
return b.myKey.Delete() | return b.myKey.Delete() | |||
} | } | |||
// this ensures that if a process fails, the ephemeral lease will be | // this ensures that if a process fails, the ephemeral lease will be | |||
// revoked, its barrier key is removed, and the barrier can resume | // revoked, its barrier key is removed, and the barrier can resume | |||
End of changes. 6 change blocks. | ||||
10 lines changed or deleted | 48 lines changed or added |