"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "client/v3/watch.go" between
etcd-3.5.5.tar.gz and etcd-3.5.6.tar.gz

About: etcd is a distributed reliable key-value store for the most critical data of a distributed system (written in "Go").

watch.go  (etcd-3.5.5):watch.go  (etcd-3.5.6)
skipping to change at line 21 skipping to change at line 21
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package clientv3 package clientv3
import ( import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strings"
"sync" "sync"
"time" "time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/mvccpb"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
const ( const (
EventTypeDelete = mvccpb.DELETE EventTypeDelete = mvccpb.DELETE
EventTypePut = mvccpb.PUT EventTypePut = mvccpb.PUT
closeSendErrTimeout = 250 * time.Millisecond closeSendErrTimeout = 250 * time.Millisecond
// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
// user-provided ID is available. If pass, an ID will automatically be as
signed.
AutoWatchID = 0
// InvalidWatchID represents an invalid watch ID and prevents duplication
with an existing watch.
InvalidWatchID = -1
) )
type Event mvccpb.Event type Event mvccpb.Event
type WatchChan <-chan WatchResponse type WatchChan <-chan WatchResponse
type Watcher interface { type Watcher interface {
// Watch watches on a key or prefix. The watched events will be returned // Watch watches on a key or prefix. The watched events will be returned
// through the returned channel. If revisions waiting to be sent over the // through the returned channel. If revisions waiting to be sent over the
// watch are compacted, then the watch will be canceled by the server, th e // watch are compacted, then the watch will be canceled by the server, th e
skipping to change at line 453 skipping to change at line 461
close(wgs.donec) close(wgs.donec)
wgs.cancel() wgs.cancel()
if w.streams != nil { if w.streams != nil {
delete(w.streams, wgs.ctxKey) delete(w.streams, wgs.ctxKey)
} }
w.mu.Unlock() w.mu.Unlock()
} }
func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream ) { func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream ) {
// check watch ID for backward compatibility (<= v3.3) // check watch ID for backward compatibility (<= v3.3)
if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") { if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") {
w.closeErr = v3rpc.Error(errors.New(resp.CancelReason)) w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
// failed; no channel // failed; no channel
close(ws.recvc) close(ws.recvc)
return return
} }
ws.id = resp.WatchId ws.id = resp.WatchId
w.substreams[ws.id] = ws w.substreams[ws.id] = ws
} }
func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchRespo nse) { func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchRespo nse) {
skipping to change at line 484 skipping to change at line 492
select { select {
case ws.initReq.retc <- ws.outc: case ws.initReq.retc <- ws.outc:
default: default:
} }
// close subscriber's channel // close subscriber's channel
if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil { if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeE rr: w.closeErr}) go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeE rr: w.closeErr})
} else if ws.outc != nil { } else if ws.outc != nil {
close(ws.outc) close(ws.outc)
} }
if ws.id != -1 { if ws.id != InvalidWatchID {
delete(w.substreams, ws.id) delete(w.substreams, ws.id)
return return
} }
for i := range w.resuming { for i := range w.resuming {
if w.resuming[i] == ws { if w.resuming[i] == ws {
w.resuming[i] = nil w.resuming[i] = nil
return return
} }
} }
} }
skipping to change at line 536 skipping to change at line 544
}() }()
// start a stream with the etcd grpc server // start a stream with the etcd grpc server
if wc, closeErr = w.newWatchClient(); closeErr != nil { if wc, closeErr = w.newWatchClient(); closeErr != nil {
return return
} }
cancelSet := make(map[int64]struct{}) cancelSet := make(map[int64]struct{})
var cur *pb.WatchResponse var cur *pb.WatchResponse
backoff := time.Millisecond
for { for {
select { select {
// Watch() requested // Watch() requested
case req := <-w.reqc: case req := <-w.reqc:
switch wreq := req.(type) { switch wreq := req.(type) {
case *watchRequest: case *watchRequest:
outc := make(chan WatchResponse, 1) outc := make(chan WatchResponse, 1)
// TODO: pass custom watch ID? // TODO: pass custom watch ID?
ws := &watcherStream{ ws := &watcherStream{
initReq: *wreq, initReq: *wreq,
id: -1, id: InvalidWatchID,
outc: outc, outc: outc,
// unbuffered so resumes won't cause repe at events // unbuffered so resumes won't cause repe at events
recvc: make(chan *WatchResponse), recvc: make(chan *WatchResponse),
} }
ws.donec = make(chan struct{}) ws.donec = make(chan struct{})
w.wg.Add(1) w.wg.Add(1)
go w.serveSubstream(ws, w.resumec) go w.serveSubstream(ws, w.resumec)
// queue up for watcher creation/resume // queue up for watcher creation/resume
skipping to change at line 583 skipping to change at line 592
cur = pbresp cur = pbresp
} else if cur != nil && cur.WatchId == pbresp.WatchId { } else if cur != nil && cur.WatchId == pbresp.WatchId {
// merge new events // merge new events
cur.Events = append(cur.Events, pbresp.Events...) cur.Events = append(cur.Events, pbresp.Events...)
// update "Fragment" field; last response with "F ragment" == false // update "Fragment" field; last response with "F ragment" == false
cur.Fragment = pbresp.Fragment cur.Fragment = pbresp.Fragment
} }
switch { switch {
case pbresp.Created: case pbresp.Created:
cancelReasonError := v3rpc.Error(errors.New(pbres
p.CancelReason))
if shouldRetryWatch(cancelReasonError) {
var newErr error
if wc, newErr = w.newWatchClient(); newEr
r != nil {
w.lg.Error("failed to create a ne
w watch client", zap.Error(newErr))
return
}
if len(w.resuming) != 0 {
if ws := w.resuming[0]; ws != nil
{
if err := wc.Send(ws.init
Req.toPB()); err != nil {
w.lg.Debug("error
when sending request", zap.Error(err))
}
}
}
cur = nil
continue
}
// response to head of queue creation // response to head of queue creation
if len(w.resuming) != 0 { if len(w.resuming) != 0 {
if ws := w.resuming[0]; ws != nil { if ws := w.resuming[0]; ws != nil {
w.addSubstream(pbresp, ws) w.addSubstream(pbresp, ws)
w.dispatchEvent(pbresp) w.dispatchEvent(pbresp)
w.resuming[0] = nil w.resuming[0] = nil
} }
} }
if ws := w.nextResume(); ws != nil { if ws := w.nextResume(); ws != nil {
skipping to change at line 652 skipping to change at line 681
w.lg.Debug("failed to send watch cancel r equest", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err)) w.lg.Debug("failed to send watch cancel r equest", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err))
} }
} }
// watch client failed on Recv; spawn another if possible // watch client failed on Recv; spawn another if possible
case err := <-w.errc: case err := <-w.errc:
if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.Er rNoLeader { if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.Er rNoLeader {
closeErr = err closeErr = err
return return
} }
backoff = w.backoffIfUnavailable(backoff, err)
if wc, closeErr = w.newWatchClient(); closeErr != nil { if wc, closeErr = w.newWatchClient(); closeErr != nil {
return return
} }
if ws := w.nextResume(); ws != nil { if ws := w.nextResume(); ws != nil {
if err := wc.Send(ws.initReq.toPB()); err != nil { if err := wc.Send(ws.initReq.toPB()); err != nil {
w.lg.Debug("error when sending request", zap.Error(err)) w.lg.Debug("error when sending request", zap.Error(err))
} }
} }
cancelSet = make(map[int64]struct{}) cancelSet = make(map[int64]struct{})
case <-w.ctx.Done(): case <-w.ctx.Done():
return return
case ws := <-w.closingc: case ws := <-w.closingc:
w.closeSubstream(ws) w.closeSubstream(ws)
delete(closing, ws) delete(closing, ws)
// no more watchers on this stream, shutdown, skip cancel lation // no more watchers on this stream, shutdown, skip cancel lation
if len(w.substreams)+len(w.resuming) == 0 { if len(w.substreams)+len(w.resuming) == 0 {
return return
} }
if ws.id != -1 { if ws.id != InvalidWatchID {
// client is closing an established watch; close it on the server proactively instead of waiting // client is closing an established watch; close it on the server proactively instead of waiting
// to close when the next message arrives // to close when the next message arrives
cancelSet[ws.id] = struct{}{} cancelSet[ws.id] = struct{}{}
cr := &pb.WatchRequest_CancelRequest{ cr := &pb.WatchRequest_CancelRequest{
CancelRequest: &pb.WatchCancelRequest{ CancelRequest: &pb.WatchCancelRequest{
WatchId: ws.id, WatchId: ws.id,
}, },
} }
req := &pb.WatchRequest{RequestUnion: cr} req := &pb.WatchRequest{RequestUnion: cr}
w.lg.Debug("sending watch cancel request for clos ed watcher", zap.Int64("watch-id", ws.id)) w.lg.Debug("sending watch cancel request for clos ed watcher", zap.Int64("watch-id", ws.id))
if err := wc.Send(req); err != nil { if err := wc.Send(req); err != nil {
w.lg.Debug("failed to send watch cancel r equest", zap.Int64("watch-id", ws.id), zap.Error(err)) w.lg.Debug("failed to send watch cancel r equest", zap.Int64("watch-id", ws.id), zap.Error(err))
} }
} }
} }
} }
} }
func shouldRetryWatch(cancelReasonError error) bool {
return (strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCInvalidAu
thToken.Error()) == 0) ||
(strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCAuthOldR
evision.Error()) == 0)
}
// nextResume chooses the next resuming to register with the grpc stream. Abando ned // nextResume chooses the next resuming to register with the grpc stream. Abando ned
// streams are marked as nil in the queue since the head must wait for its infli ght registration. // streams are marked as nil in the queue since the head must wait for its infli ght registration.
func (w *watchGrpcStream) nextResume() *watcherStream { func (w *watchGrpcStream) nextResume() *watcherStream {
for len(w.resuming) != 0 { for len(w.resuming) != 0 {
if w.resuming[0] != nil { if w.resuming[0] != nil {
return w.resuming[0] return w.resuming[0]
} }
w.resuming = w.resuming[1:len(w.resuming)] w.resuming = w.resuming[1:len(w.resuming)]
} }
return nil return nil
skipping to change at line 719 skipping to change at line 754
// TODO: return watch ID? // TODO: return watch ID?
wr := &WatchResponse{ wr := &WatchResponse{
Header: *pbresp.Header, Header: *pbresp.Header,
Events: events, Events: events,
CompactRevision: pbresp.CompactRevision, CompactRevision: pbresp.CompactRevision,
Created: pbresp.Created, Created: pbresp.Created,
Canceled: pbresp.Canceled, Canceled: pbresp.Canceled,
cancelReason: pbresp.CancelReason, cancelReason: pbresp.CancelReason,
} }
// watch IDs are zero indexed, so request notify watch responses are assi gned a watch ID of -1 to // watch IDs are zero indexed, so request notify watch responses are assi gned a watch ID of InvalidWatchID to
// indicate they should be broadcast. // indicate they should be broadcast.
if wr.IsProgressNotify() && pbresp.WatchId == -1 { if wr.IsProgressNotify() && pbresp.WatchId == InvalidWatchID {
return w.broadcastResponse(wr) return w.broadcastResponse(wr)
} }
return w.unicastResponse(wr, pbresp.WatchId) return w.unicastResponse(wr, pbresp.WatchId)
} }
// broadcastResponse send a watch response to all watch substreams. // broadcastResponse send a watch response to all watch substreams.
func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool { func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
for _, ws := range w.substreams { for _, ws := range w.substreams {
skipping to change at line 876 skipping to change at line 911
} }
// lazily send cancel message if events on missing id // lazily send cancel message if events on missing id
} }
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
// mark all substreams as resuming // mark all substreams as resuming
close(w.resumec) close(w.resumec)
w.resumec = make(chan struct{}) w.resumec = make(chan struct{})
w.joinSubstreams() w.joinSubstreams()
for _, ws := range w.substreams { for _, ws := range w.substreams {
ws.id = -1 ws.id = InvalidWatchID
w.resuming = append(w.resuming, ws) w.resuming = append(w.resuming, ws)
} }
// strip out nils, if any // strip out nils, if any
var resuming []*watcherStream var resuming []*watcherStream
for _, ws := range w.resuming { for _, ws := range w.resuming {
if ws != nil { if ws != nil {
resuming = append(resuming, ws) resuming = append(resuming, ws)
} }
} }
w.resuming = resuming w.resuming = resuming
skipping to change at line 966 skipping to change at line 1001
} }
for _, ws := range w.resuming { for _, ws := range w.resuming {
if ws != nil { if ws != nil {
<-ws.donec <-ws.donec
} }
} }
} }
var maxBackoff = 100 * time.Millisecond var maxBackoff = 100 * time.Millisecond
func (w *watchGrpcStream) backoffIfUnavailable(backoff time.Duration, err error)
time.Duration {
if isUnavailableErr(w.ctx, err) {
// retry, but backoff
if backoff < maxBackoff {
// 25% backoff factor
backoff = backoff + backoff/4
if backoff > maxBackoff {
backoff = maxBackoff
}
}
time.Sleep(backoff)
}
return backoff
}
// openWatchClient retries opening a watch client until success or halt. // openWatchClient retries opening a watch client until success or halt.
// manually retry in case "ws==nil && err==nil" // manually retry in case "ws==nil && err==nil"
// TODO: remove FailFast=false // TODO: remove FailFast=false
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
backoff := time.Millisecond backoff := time.Millisecond
for { for {
select { select {
case <-w.ctx.Done(): case <-w.ctx.Done():
if err == nil { if err == nil {
return nil, w.ctx.Err() return nil, w.ctx.Err()
} }
return nil, err return nil, err
default: default:
} }
if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && e rr == nil { if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && e rr == nil {
break break
} }
if isHaltErr(w.ctx, err) { if isHaltErr(w.ctx, err) {
return nil, v3rpc.Error(err) return nil, v3rpc.Error(err)
} }
if isUnavailableErr(w.ctx, err) { backoff = w.backoffIfUnavailable(backoff, err)
// retry, but backoff
if backoff < maxBackoff {
// 25% backoff factor
backoff = backoff + backoff/4
if backoff > maxBackoff {
backoff = maxBackoff
}
}
time.Sleep(backoff)
}
} }
return ws, nil return ws, nil
} }
// toPB converts an internal watch request structure to its protobuf WatchReques t structure. // toPB converts an internal watch request structure to its protobuf WatchReques t structure.
func (wr *watchRequest) toPB() *pb.WatchRequest { func (wr *watchRequest) toPB() *pb.WatchRequest {
req := &pb.WatchCreateRequest{ req := &pb.WatchCreateRequest{
StartRevision: wr.rev, StartRevision: wr.rev,
Key: []byte(wr.key), Key: []byte(wr.key),
RangeEnd: []byte(wr.end), RangeEnd: []byte(wr.end),
 End of changes. 15 change blocks. 
18 lines changed or deleted 69 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)