watch.go (etcd-3.5.5) | : | watch.go (etcd-3.5.6) | ||
---|---|---|---|---|
skipping to change at line 21 | skipping to change at line 21 | |||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
// See the License for the specific language governing permissions and | // See the License for the specific language governing permissions and | |||
// limitations under the License. | // limitations under the License. | |||
package clientv3 | package clientv3 | |||
import ( | import ( | |||
"context" | "context" | |||
"errors" | "errors" | |||
"fmt" | "fmt" | |||
"strings" | ||||
"sync" | "sync" | |||
"time" | "time" | |||
pb "go.etcd.io/etcd/api/v3/etcdserverpb" | pb "go.etcd.io/etcd/api/v3/etcdserverpb" | |||
"go.etcd.io/etcd/api/v3/mvccpb" | "go.etcd.io/etcd/api/v3/mvccpb" | |||
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" | v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" | |||
"go.uber.org/zap" | "go.uber.org/zap" | |||
"google.golang.org/grpc" | "google.golang.org/grpc" | |||
"google.golang.org/grpc/codes" | "google.golang.org/grpc/codes" | |||
"google.golang.org/grpc/metadata" | "google.golang.org/grpc/metadata" | |||
"google.golang.org/grpc/status" | "google.golang.org/grpc/status" | |||
) | ) | |||
const ( | const ( | |||
EventTypeDelete = mvccpb.DELETE | EventTypeDelete = mvccpb.DELETE | |||
EventTypePut = mvccpb.PUT | EventTypePut = mvccpb.PUT | |||
closeSendErrTimeout = 250 * time.Millisecond | closeSendErrTimeout = 250 * time.Millisecond | |||
// AutoWatchID is the watcher ID passed in WatchStream.Watch when no | ||||
// user-provided ID is available. If pass, an ID will automatically be as | ||||
signed. | ||||
AutoWatchID = 0 | ||||
// InvalidWatchID represents an invalid watch ID and prevents duplication | ||||
with an existing watch. | ||||
InvalidWatchID = -1 | ||||
) | ) | |||
type Event mvccpb.Event | type Event mvccpb.Event | |||
type WatchChan <-chan WatchResponse | type WatchChan <-chan WatchResponse | |||
type Watcher interface { | type Watcher interface { | |||
// Watch watches on a key or prefix. The watched events will be returned | // Watch watches on a key or prefix. The watched events will be returned | |||
// through the returned channel. If revisions waiting to be sent over the | // through the returned channel. If revisions waiting to be sent over the | |||
// watch are compacted, then the watch will be canceled by the server, th e | // watch are compacted, then the watch will be canceled by the server, th e | |||
skipping to change at line 453 | skipping to change at line 461 | |||
close(wgs.donec) | close(wgs.donec) | |||
wgs.cancel() | wgs.cancel() | |||
if w.streams != nil { | if w.streams != nil { | |||
delete(w.streams, wgs.ctxKey) | delete(w.streams, wgs.ctxKey) | |||
} | } | |||
w.mu.Unlock() | w.mu.Unlock() | |||
} | } | |||
func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream ) { | func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream ) { | |||
// check watch ID for backward compatibility (<= v3.3) | // check watch ID for backward compatibility (<= v3.3) | |||
if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") { | if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") { | |||
w.closeErr = v3rpc.Error(errors.New(resp.CancelReason)) | w.closeErr = v3rpc.Error(errors.New(resp.CancelReason)) | |||
// failed; no channel | // failed; no channel | |||
close(ws.recvc) | close(ws.recvc) | |||
return | return | |||
} | } | |||
ws.id = resp.WatchId | ws.id = resp.WatchId | |||
w.substreams[ws.id] = ws | w.substreams[ws.id] = ws | |||
} | } | |||
func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchRespo nse) { | func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchRespo nse) { | |||
skipping to change at line 484 | skipping to change at line 492 | |||
select { | select { | |||
case ws.initReq.retc <- ws.outc: | case ws.initReq.retc <- ws.outc: | |||
default: | default: | |||
} | } | |||
// close subscriber's channel | // close subscriber's channel | |||
if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil { | if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil { | |||
go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeE rr: w.closeErr}) | go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeE rr: w.closeErr}) | |||
} else if ws.outc != nil { | } else if ws.outc != nil { | |||
close(ws.outc) | close(ws.outc) | |||
} | } | |||
if ws.id != -1 { | if ws.id != InvalidWatchID { | |||
delete(w.substreams, ws.id) | delete(w.substreams, ws.id) | |||
return | return | |||
} | } | |||
for i := range w.resuming { | for i := range w.resuming { | |||
if w.resuming[i] == ws { | if w.resuming[i] == ws { | |||
w.resuming[i] = nil | w.resuming[i] = nil | |||
return | return | |||
} | } | |||
} | } | |||
} | } | |||
skipping to change at line 536 | skipping to change at line 544 | |||
}() | }() | |||
// start a stream with the etcd grpc server | // start a stream with the etcd grpc server | |||
if wc, closeErr = w.newWatchClient(); closeErr != nil { | if wc, closeErr = w.newWatchClient(); closeErr != nil { | |||
return | return | |||
} | } | |||
cancelSet := make(map[int64]struct{}) | cancelSet := make(map[int64]struct{}) | |||
var cur *pb.WatchResponse | var cur *pb.WatchResponse | |||
backoff := time.Millisecond | ||||
for { | for { | |||
select { | select { | |||
// Watch() requested | // Watch() requested | |||
case req := <-w.reqc: | case req := <-w.reqc: | |||
switch wreq := req.(type) { | switch wreq := req.(type) { | |||
case *watchRequest: | case *watchRequest: | |||
outc := make(chan WatchResponse, 1) | outc := make(chan WatchResponse, 1) | |||
// TODO: pass custom watch ID? | // TODO: pass custom watch ID? | |||
ws := &watcherStream{ | ws := &watcherStream{ | |||
initReq: *wreq, | initReq: *wreq, | |||
id: -1, | id: InvalidWatchID, | |||
outc: outc, | outc: outc, | |||
// unbuffered so resumes won't cause repe at events | // unbuffered so resumes won't cause repe at events | |||
recvc: make(chan *WatchResponse), | recvc: make(chan *WatchResponse), | |||
} | } | |||
ws.donec = make(chan struct{}) | ws.donec = make(chan struct{}) | |||
w.wg.Add(1) | w.wg.Add(1) | |||
go w.serveSubstream(ws, w.resumec) | go w.serveSubstream(ws, w.resumec) | |||
// queue up for watcher creation/resume | // queue up for watcher creation/resume | |||
skipping to change at line 583 | skipping to change at line 592 | |||
cur = pbresp | cur = pbresp | |||
} else if cur != nil && cur.WatchId == pbresp.WatchId { | } else if cur != nil && cur.WatchId == pbresp.WatchId { | |||
// merge new events | // merge new events | |||
cur.Events = append(cur.Events, pbresp.Events...) | cur.Events = append(cur.Events, pbresp.Events...) | |||
// update "Fragment" field; last response with "F ragment" == false | // update "Fragment" field; last response with "F ragment" == false | |||
cur.Fragment = pbresp.Fragment | cur.Fragment = pbresp.Fragment | |||
} | } | |||
switch { | switch { | |||
case pbresp.Created: | case pbresp.Created: | |||
cancelReasonError := v3rpc.Error(errors.New(pbres | ||||
p.CancelReason)) | ||||
if shouldRetryWatch(cancelReasonError) { | ||||
var newErr error | ||||
if wc, newErr = w.newWatchClient(); newEr | ||||
r != nil { | ||||
w.lg.Error("failed to create a ne | ||||
w watch client", zap.Error(newErr)) | ||||
return | ||||
} | ||||
if len(w.resuming) != 0 { | ||||
if ws := w.resuming[0]; ws != nil | ||||
{ | ||||
if err := wc.Send(ws.init | ||||
Req.toPB()); err != nil { | ||||
w.lg.Debug("error | ||||
when sending request", zap.Error(err)) | ||||
} | ||||
} | ||||
} | ||||
cur = nil | ||||
continue | ||||
} | ||||
// response to head of queue creation | // response to head of queue creation | |||
if len(w.resuming) != 0 { | if len(w.resuming) != 0 { | |||
if ws := w.resuming[0]; ws != nil { | if ws := w.resuming[0]; ws != nil { | |||
w.addSubstream(pbresp, ws) | w.addSubstream(pbresp, ws) | |||
w.dispatchEvent(pbresp) | w.dispatchEvent(pbresp) | |||
w.resuming[0] = nil | w.resuming[0] = nil | |||
} | } | |||
} | } | |||
if ws := w.nextResume(); ws != nil { | if ws := w.nextResume(); ws != nil { | |||
skipping to change at line 652 | skipping to change at line 681 | |||
w.lg.Debug("failed to send watch cancel r equest", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err)) | w.lg.Debug("failed to send watch cancel r equest", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err)) | |||
} | } | |||
} | } | |||
// watch client failed on Recv; spawn another if possible | // watch client failed on Recv; spawn another if possible | |||
case err := <-w.errc: | case err := <-w.errc: | |||
if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.Er rNoLeader { | if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.Er rNoLeader { | |||
closeErr = err | closeErr = err | |||
return | return | |||
} | } | |||
backoff = w.backoffIfUnavailable(backoff, err) | ||||
if wc, closeErr = w.newWatchClient(); closeErr != nil { | if wc, closeErr = w.newWatchClient(); closeErr != nil { | |||
return | return | |||
} | } | |||
if ws := w.nextResume(); ws != nil { | if ws := w.nextResume(); ws != nil { | |||
if err := wc.Send(ws.initReq.toPB()); err != nil { | if err := wc.Send(ws.initReq.toPB()); err != nil { | |||
w.lg.Debug("error when sending request", zap.Error(err)) | w.lg.Debug("error when sending request", zap.Error(err)) | |||
} | } | |||
} | } | |||
cancelSet = make(map[int64]struct{}) | cancelSet = make(map[int64]struct{}) | |||
case <-w.ctx.Done(): | case <-w.ctx.Done(): | |||
return | return | |||
case ws := <-w.closingc: | case ws := <-w.closingc: | |||
w.closeSubstream(ws) | w.closeSubstream(ws) | |||
delete(closing, ws) | delete(closing, ws) | |||
// no more watchers on this stream, shutdown, skip cancel lation | // no more watchers on this stream, shutdown, skip cancel lation | |||
if len(w.substreams)+len(w.resuming) == 0 { | if len(w.substreams)+len(w.resuming) == 0 { | |||
return | return | |||
} | } | |||
if ws.id != -1 { | if ws.id != InvalidWatchID { | |||
// client is closing an established watch; close it on the server proactively instead of waiting | // client is closing an established watch; close it on the server proactively instead of waiting | |||
// to close when the next message arrives | // to close when the next message arrives | |||
cancelSet[ws.id] = struct{}{} | cancelSet[ws.id] = struct{}{} | |||
cr := &pb.WatchRequest_CancelRequest{ | cr := &pb.WatchRequest_CancelRequest{ | |||
CancelRequest: &pb.WatchCancelRequest{ | CancelRequest: &pb.WatchCancelRequest{ | |||
WatchId: ws.id, | WatchId: ws.id, | |||
}, | }, | |||
} | } | |||
req := &pb.WatchRequest{RequestUnion: cr} | req := &pb.WatchRequest{RequestUnion: cr} | |||
w.lg.Debug("sending watch cancel request for clos ed watcher", zap.Int64("watch-id", ws.id)) | w.lg.Debug("sending watch cancel request for clos ed watcher", zap.Int64("watch-id", ws.id)) | |||
if err := wc.Send(req); err != nil { | if err := wc.Send(req); err != nil { | |||
w.lg.Debug("failed to send watch cancel r equest", zap.Int64("watch-id", ws.id), zap.Error(err)) | w.lg.Debug("failed to send watch cancel r equest", zap.Int64("watch-id", ws.id), zap.Error(err)) | |||
} | } | |||
} | } | |||
} | } | |||
} | } | |||
} | } | |||
func shouldRetryWatch(cancelReasonError error) bool { | ||||
return (strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCInvalidAu | ||||
thToken.Error()) == 0) || | ||||
(strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCAuthOldR | ||||
evision.Error()) == 0) | ||||
} | ||||
// nextResume chooses the next resuming to register with the grpc stream. Abando ned | // nextResume chooses the next resuming to register with the grpc stream. Abando ned | |||
// streams are marked as nil in the queue since the head must wait for its infli ght registration. | // streams are marked as nil in the queue since the head must wait for its infli ght registration. | |||
func (w *watchGrpcStream) nextResume() *watcherStream { | func (w *watchGrpcStream) nextResume() *watcherStream { | |||
for len(w.resuming) != 0 { | for len(w.resuming) != 0 { | |||
if w.resuming[0] != nil { | if w.resuming[0] != nil { | |||
return w.resuming[0] | return w.resuming[0] | |||
} | } | |||
w.resuming = w.resuming[1:len(w.resuming)] | w.resuming = w.resuming[1:len(w.resuming)] | |||
} | } | |||
return nil | return nil | |||
skipping to change at line 719 | skipping to change at line 754 | |||
// TODO: return watch ID? | // TODO: return watch ID? | |||
wr := &WatchResponse{ | wr := &WatchResponse{ | |||
Header: *pbresp.Header, | Header: *pbresp.Header, | |||
Events: events, | Events: events, | |||
CompactRevision: pbresp.CompactRevision, | CompactRevision: pbresp.CompactRevision, | |||
Created: pbresp.Created, | Created: pbresp.Created, | |||
Canceled: pbresp.Canceled, | Canceled: pbresp.Canceled, | |||
cancelReason: pbresp.CancelReason, | cancelReason: pbresp.CancelReason, | |||
} | } | |||
// watch IDs are zero indexed, so request notify watch responses are assi gned a watch ID of -1 to | // watch IDs are zero indexed, so request notify watch responses are assi gned a watch ID of InvalidWatchID to | |||
// indicate they should be broadcast. | // indicate they should be broadcast. | |||
if wr.IsProgressNotify() && pbresp.WatchId == -1 { | if wr.IsProgressNotify() && pbresp.WatchId == InvalidWatchID { | |||
return w.broadcastResponse(wr) | return w.broadcastResponse(wr) | |||
} | } | |||
return w.unicastResponse(wr, pbresp.WatchId) | return w.unicastResponse(wr, pbresp.WatchId) | |||
} | } | |||
// broadcastResponse send a watch response to all watch substreams. | // broadcastResponse send a watch response to all watch substreams. | |||
func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool { | func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool { | |||
for _, ws := range w.substreams { | for _, ws := range w.substreams { | |||
skipping to change at line 876 | skipping to change at line 911 | |||
} | } | |||
// lazily send cancel message if events on missing id | // lazily send cancel message if events on missing id | |||
} | } | |||
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { | func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { | |||
// mark all substreams as resuming | // mark all substreams as resuming | |||
close(w.resumec) | close(w.resumec) | |||
w.resumec = make(chan struct{}) | w.resumec = make(chan struct{}) | |||
w.joinSubstreams() | w.joinSubstreams() | |||
for _, ws := range w.substreams { | for _, ws := range w.substreams { | |||
ws.id = -1 | ws.id = InvalidWatchID | |||
w.resuming = append(w.resuming, ws) | w.resuming = append(w.resuming, ws) | |||
} | } | |||
// strip out nils, if any | // strip out nils, if any | |||
var resuming []*watcherStream | var resuming []*watcherStream | |||
for _, ws := range w.resuming { | for _, ws := range w.resuming { | |||
if ws != nil { | if ws != nil { | |||
resuming = append(resuming, ws) | resuming = append(resuming, ws) | |||
} | } | |||
} | } | |||
w.resuming = resuming | w.resuming = resuming | |||
skipping to change at line 966 | skipping to change at line 1001 | |||
} | } | |||
for _, ws := range w.resuming { | for _, ws := range w.resuming { | |||
if ws != nil { | if ws != nil { | |||
<-ws.donec | <-ws.donec | |||
} | } | |||
} | } | |||
} | } | |||
var maxBackoff = 100 * time.Millisecond | var maxBackoff = 100 * time.Millisecond | |||
func (w *watchGrpcStream) backoffIfUnavailable(backoff time.Duration, err error) | ||||
time.Duration { | ||||
if isUnavailableErr(w.ctx, err) { | ||||
// retry, but backoff | ||||
if backoff < maxBackoff { | ||||
// 25% backoff factor | ||||
backoff = backoff + backoff/4 | ||||
if backoff > maxBackoff { | ||||
backoff = maxBackoff | ||||
} | ||||
} | ||||
time.Sleep(backoff) | ||||
} | ||||
return backoff | ||||
} | ||||
// openWatchClient retries opening a watch client until success or halt. | // openWatchClient retries opening a watch client until success or halt. | |||
// manually retry in case "ws==nil && err==nil" | // manually retry in case "ws==nil && err==nil" | |||
// TODO: remove FailFast=false | // TODO: remove FailFast=false | |||
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { | func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { | |||
backoff := time.Millisecond | backoff := time.Millisecond | |||
for { | for { | |||
select { | select { | |||
case <-w.ctx.Done(): | case <-w.ctx.Done(): | |||
if err == nil { | if err == nil { | |||
return nil, w.ctx.Err() | return nil, w.ctx.Err() | |||
} | } | |||
return nil, err | return nil, err | |||
default: | default: | |||
} | } | |||
if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && e rr == nil { | if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && e rr == nil { | |||
break | break | |||
} | } | |||
if isHaltErr(w.ctx, err) { | if isHaltErr(w.ctx, err) { | |||
return nil, v3rpc.Error(err) | return nil, v3rpc.Error(err) | |||
} | } | |||
if isUnavailableErr(w.ctx, err) { | backoff = w.backoffIfUnavailable(backoff, err) | |||
// retry, but backoff | ||||
if backoff < maxBackoff { | ||||
// 25% backoff factor | ||||
backoff = backoff + backoff/4 | ||||
if backoff > maxBackoff { | ||||
backoff = maxBackoff | ||||
} | ||||
} | ||||
time.Sleep(backoff) | ||||
} | ||||
} | } | |||
return ws, nil | return ws, nil | |||
} | } | |||
// toPB converts an internal watch request structure to its protobuf WatchReques t structure. | // toPB converts an internal watch request structure to its protobuf WatchReques t structure. | |||
func (wr *watchRequest) toPB() *pb.WatchRequest { | func (wr *watchRequest) toPB() *pb.WatchRequest { | |||
req := &pb.WatchCreateRequest{ | req := &pb.WatchCreateRequest{ | |||
StartRevision: wr.rev, | StartRevision: wr.rev, | |||
Key: []byte(wr.key), | Key: []byte(wr.key), | |||
RangeEnd: []byte(wr.end), | RangeEnd: []byte(wr.end), | |||
End of changes. 15 change blocks. | ||||
18 lines changed or deleted | 69 lines changed or added |