watcher.go (etcd-3.5.5) | : | watcher.go (etcd-3.5.6) | ||
---|---|---|---|---|
skipping to change at line 23 | skipping to change at line 23 | |||
// limitations under the License. | // limitations under the License. | |||
package mvcc | package mvcc | |||
import ( | import ( | |||
"bytes" | "bytes" | |||
"errors" | "errors" | |||
"sync" | "sync" | |||
"go.etcd.io/etcd/api/v3/mvccpb" | "go.etcd.io/etcd/api/v3/mvccpb" | |||
clientv3 "go.etcd.io/etcd/client/v3" | ||||
) | ) | |||
// AutoWatchID is the watcher ID passed in WatchStream.Watch when no | ||||
// user-provided ID is available. If pass, an ID will automatically be assigned. | ||||
const AutoWatchID WatchID = 0 | ||||
var ( | var ( | |||
ErrWatcherNotExist = errors.New("mvcc: watcher does not exist") | ErrWatcherNotExist = errors.New("mvcc: watcher does not exist") | |||
ErrEmptyWatcherRange = errors.New("mvcc: watcher range is empty") | ErrEmptyWatcherRange = errors.New("mvcc: watcher range is empty") | |||
ErrWatcherDuplicateID = errors.New("mvcc: duplicate watch ID provided on the WatchStream") | ErrWatcherDuplicateID = errors.New("mvcc: duplicate watch ID provided on the WatchStream") | |||
) | ) | |||
type WatchID int64 | type WatchID int64 | |||
// FilterFunc returns true if the given event should be filtered out. | // FilterFunc returns true if the given event should be filtered out. | |||
type FilterFunc func(e mvccpb.Event) bool | type FilterFunc func(e mvccpb.Event) bool | |||
skipping to change at line 121 | skipping to change at line 118 | |||
if len(end) != 0 && bytes.Compare(key, end) != -1 { | if len(end) != 0 && bytes.Compare(key, end) != -1 { | |||
return -1, ErrEmptyWatcherRange | return -1, ErrEmptyWatcherRange | |||
} | } | |||
ws.mu.Lock() | ws.mu.Lock() | |||
defer ws.mu.Unlock() | defer ws.mu.Unlock() | |||
if ws.closed { | if ws.closed { | |||
return -1, ErrEmptyWatcherRange | return -1, ErrEmptyWatcherRange | |||
} | } | |||
if id == AutoWatchID { | if id == clientv3.AutoWatchID { | |||
for ws.watchers[ws.nextID] != nil { | for ws.watchers[ws.nextID] != nil { | |||
ws.nextID++ | ws.nextID++ | |||
} | } | |||
id = ws.nextID | id = ws.nextID | |||
ws.nextID++ | ws.nextID++ | |||
} else if _, ok := ws.watchers[id]; ok { | } else if _, ok := ws.watchers[id]; ok { | |||
return -1, ErrWatcherDuplicateID | return -1, ErrWatcherDuplicateID | |||
} | } | |||
w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...) | w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...) | |||
End of changes. 3 change blocks. | ||||
5 lines changed or deleted | 2 lines changed or added |