watch.go (etcd-3.5.5) | : | watch.go (etcd-3.5.6) | ||
---|---|---|---|---|
skipping to change at line 241 | skipping to change at line 241 | |||
if err != nil { | if err != nil { | |||
return err | return err | |||
} | } | |||
switch uv := req.RequestUnion.(type) { | switch uv := req.RequestUnion.(type) { | |||
case *pb.WatchRequest_CreateRequest: | case *pb.WatchRequest_CreateRequest: | |||
cr := uv.CreateRequest | cr := uv.CreateRequest | |||
if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd ); err != nil { | if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd ); err != nil { | |||
wps.watchCh <- &pb.WatchResponse{ | wps.watchCh <- &pb.WatchResponse{ | |||
Header: &pb.ResponseHeader{}, | Header: &pb.ResponseHeader{}, | |||
WatchId: -1, | WatchId: clientv3.InvalidWatchID, | |||
Created: true, | Created: true, | |||
Canceled: true, | Canceled: true, | |||
CancelReason: err.Error(), | CancelReason: err.Error(), | |||
} | } | |||
continue | continue | |||
} | } | |||
wps.mu.Lock() | wps.mu.Lock() | |||
w := &watcher{ | w := &watcher{ | |||
wr: watchRange{string(cr.Key), string(cr.RangeEn d)}, | wr: watchRange{string(cr.Key), string(cr.RangeEn d)}, | |||
id: wps.nextWatcherID, | id: wps.nextWatcherID, | |||
wps: wps, | wps: wps, | |||
nextrev: cr.StartRevision, | nextrev: cr.StartRevision, | |||
progress: cr.ProgressNotify, | progress: cr.ProgressNotify, | |||
prevKV: cr.PrevKv, | prevKV: cr.PrevKv, | |||
filters: v3rpc.FiltersFromRequest(cr), | filters: v3rpc.FiltersFromRequest(cr), | |||
} | } | |||
if !w.wr.valid() { | if !w.wr.valid() { | |||
w.post(&pb.WatchResponse{WatchId: -1, Created: tr ue, Canceled: true}) | w.post(&pb.WatchResponse{WatchId: clientv3.Invali dWatchID, Created: true, Canceled: true}) | |||
wps.mu.Unlock() | wps.mu.Unlock() | |||
continue | continue | |||
} | } | |||
wps.nextWatcherID++ | wps.nextWatcherID++ | |||
w.nextrev = cr.StartRevision | w.nextrev = cr.StartRevision | |||
wps.watchers[w.id] = w | wps.watchers[w.id] = w | |||
wps.ranges.add(w) | wps.ranges.add(w) | |||
wps.mu.Unlock() | wps.mu.Unlock() | |||
wps.lg.Debug("create watcher", zap.String("key", w.wr.key ), zap.String("end", w.wr.end), zap.Int64("watcherId", wps.nextWatcherID)) | wps.lg.Debug("create watcher", zap.String("key", w.wr.key ), zap.String("end", w.wr.end), zap.Int64("watcherId", wps.nextWatcherID)) | |||
case *pb.WatchRequest_CancelRequest: | case *pb.WatchRequest_CancelRequest: | |||
End of changes. 2 change blocks. | ||||
2 lines changed or deleted | 2 lines changed or added |