watch.go (etcd-3.5.6) | : | watch.go (etcd-3.5.7) | ||
---|---|---|---|---|
skipping to change at line 21 | skipping to change at line 21 | |||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
// See the License for the specific language governing permissions and | // See the License for the specific language governing permissions and | |||
// limitations under the License. | // limitations under the License. | |||
package clientv3 | package clientv3 | |||
import ( | import ( | |||
"context" | "context" | |||
"errors" | "errors" | |||
"fmt" | "fmt" | |||
"strings" | ||||
"sync" | "sync" | |||
"time" | "time" | |||
pb "go.etcd.io/etcd/api/v3/etcdserverpb" | pb "go.etcd.io/etcd/api/v3/etcdserverpb" | |||
"go.etcd.io/etcd/api/v3/mvccpb" | "go.etcd.io/etcd/api/v3/mvccpb" | |||
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" | v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" | |||
"go.uber.org/zap" | "go.uber.org/zap" | |||
"google.golang.org/grpc" | "google.golang.org/grpc" | |||
"google.golang.org/grpc/codes" | "google.golang.org/grpc/codes" | |||
skipping to change at line 592 | skipping to change at line 591 | |||
cur = pbresp | cur = pbresp | |||
} else if cur != nil && cur.WatchId == pbresp.WatchId { | } else if cur != nil && cur.WatchId == pbresp.WatchId { | |||
// merge new events | // merge new events | |||
cur.Events = append(cur.Events, pbresp.Events...) | cur.Events = append(cur.Events, pbresp.Events...) | |||
// update "Fragment" field; last response with "F ragment" == false | // update "Fragment" field; last response with "F ragment" == false | |||
cur.Fragment = pbresp.Fragment | cur.Fragment = pbresp.Fragment | |||
} | } | |||
switch { | switch { | |||
case pbresp.Created: | case pbresp.Created: | |||
cancelReasonError := v3rpc.Error(errors.New(pbres | ||||
p.CancelReason)) | ||||
if shouldRetryWatch(cancelReasonError) { | ||||
var newErr error | ||||
if wc, newErr = w.newWatchClient(); newEr | ||||
r != nil { | ||||
w.lg.Error("failed to create a ne | ||||
w watch client", zap.Error(newErr)) | ||||
return | ||||
} | ||||
if len(w.resuming) != 0 { | ||||
if ws := w.resuming[0]; ws != nil | ||||
{ | ||||
if err := wc.Send(ws.init | ||||
Req.toPB()); err != nil { | ||||
w.lg.Debug("error | ||||
when sending request", zap.Error(err)) | ||||
} | ||||
} | ||||
} | ||||
cur = nil | ||||
continue | ||||
} | ||||
// response to head of queue creation | // response to head of queue creation | |||
if len(w.resuming) != 0 { | if len(w.resuming) != 0 { | |||
if ws := w.resuming[0]; ws != nil { | if ws := w.resuming[0]; ws != nil { | |||
w.addSubstream(pbresp, ws) | w.addSubstream(pbresp, ws) | |||
w.dispatchEvent(pbresp) | w.dispatchEvent(pbresp) | |||
w.resuming[0] = nil | w.resuming[0] = nil | |||
} | } | |||
} | } | |||
if ws := w.nextResume(); ws != nil { | if ws := w.nextResume(); ws != nil { | |||
skipping to change at line 721 | skipping to change at line 700 | |||
req := &pb.WatchRequest{RequestUnion: cr} | req := &pb.WatchRequest{RequestUnion: cr} | |||
w.lg.Debug("sending watch cancel request for clos ed watcher", zap.Int64("watch-id", ws.id)) | w.lg.Debug("sending watch cancel request for clos ed watcher", zap.Int64("watch-id", ws.id)) | |||
if err := wc.Send(req); err != nil { | if err := wc.Send(req); err != nil { | |||
w.lg.Debug("failed to send watch cancel r equest", zap.Int64("watch-id", ws.id), zap.Error(err)) | w.lg.Debug("failed to send watch cancel r equest", zap.Int64("watch-id", ws.id), zap.Error(err)) | |||
} | } | |||
} | } | |||
} | } | |||
} | } | |||
} | } | |||
func shouldRetryWatch(cancelReasonError error) bool { | ||||
return (strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCInvalidAu | ||||
thToken.Error()) == 0) || | ||||
(strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCAuthOldR | ||||
evision.Error()) == 0) | ||||
} | ||||
// nextResume chooses the next resuming to register with the grpc stream. Abando ned | // nextResume chooses the next resuming to register with the grpc stream. Abando ned | |||
// streams are marked as nil in the queue since the head must wait for its infli ght registration. | // streams are marked as nil in the queue since the head must wait for its infli ght registration. | |||
func (w *watchGrpcStream) nextResume() *watcherStream { | func (w *watchGrpcStream) nextResume() *watcherStream { | |||
for len(w.resuming) != 0 { | for len(w.resuming) != 0 { | |||
if w.resuming[0] != nil { | if w.resuming[0] != nil { | |||
return w.resuming[0] | return w.resuming[0] | |||
} | } | |||
w.resuming = w.resuming[1:len(w.resuming)] | w.resuming = w.resuming[1:len(w.resuming)] | |||
} | } | |||
return nil | return nil | |||
End of changes. 3 change blocks. | ||||
34 lines changed or deleted | 0 lines changed or added |