"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "client/v3/watch.go" between
etcd-3.5.6.tar.gz and etcd-3.5.7.tar.gz

About: etcd is a distributed reliable key-value store for the most critical data of a distributed system (written in "Go").

watch.go  (etcd-3.5.6):watch.go  (etcd-3.5.7)
skipping to change at line 21 skipping to change at line 21
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package clientv3 package clientv3
import ( import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strings"
"sync" "sync"
"time" "time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/mvccpb"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
skipping to change at line 592 skipping to change at line 591
cur = pbresp cur = pbresp
} else if cur != nil && cur.WatchId == pbresp.WatchId { } else if cur != nil && cur.WatchId == pbresp.WatchId {
// merge new events // merge new events
cur.Events = append(cur.Events, pbresp.Events...) cur.Events = append(cur.Events, pbresp.Events...)
// update "Fragment" field; last response with "F ragment" == false // update "Fragment" field; last response with "F ragment" == false
cur.Fragment = pbresp.Fragment cur.Fragment = pbresp.Fragment
} }
switch { switch {
case pbresp.Created: case pbresp.Created:
cancelReasonError := v3rpc.Error(errors.New(pbres
p.CancelReason))
if shouldRetryWatch(cancelReasonError) {
var newErr error
if wc, newErr = w.newWatchClient(); newEr
r != nil {
w.lg.Error("failed to create a ne
w watch client", zap.Error(newErr))
return
}
if len(w.resuming) != 0 {
if ws := w.resuming[0]; ws != nil
{
if err := wc.Send(ws.init
Req.toPB()); err != nil {
w.lg.Debug("error
when sending request", zap.Error(err))
}
}
}
cur = nil
continue
}
// response to head of queue creation // response to head of queue creation
if len(w.resuming) != 0 { if len(w.resuming) != 0 {
if ws := w.resuming[0]; ws != nil { if ws := w.resuming[0]; ws != nil {
w.addSubstream(pbresp, ws) w.addSubstream(pbresp, ws)
w.dispatchEvent(pbresp) w.dispatchEvent(pbresp)
w.resuming[0] = nil w.resuming[0] = nil
} }
} }
if ws := w.nextResume(); ws != nil { if ws := w.nextResume(); ws != nil {
skipping to change at line 721 skipping to change at line 700
req := &pb.WatchRequest{RequestUnion: cr} req := &pb.WatchRequest{RequestUnion: cr}
w.lg.Debug("sending watch cancel request for clos ed watcher", zap.Int64("watch-id", ws.id)) w.lg.Debug("sending watch cancel request for clos ed watcher", zap.Int64("watch-id", ws.id))
if err := wc.Send(req); err != nil { if err := wc.Send(req); err != nil {
w.lg.Debug("failed to send watch cancel r equest", zap.Int64("watch-id", ws.id), zap.Error(err)) w.lg.Debug("failed to send watch cancel r equest", zap.Int64("watch-id", ws.id), zap.Error(err))
} }
} }
} }
} }
} }
func shouldRetryWatch(cancelReasonError error) bool {
return (strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCInvalidAu
thToken.Error()) == 0) ||
(strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCAuthOldR
evision.Error()) == 0)
}
// nextResume chooses the next resuming to register with the grpc stream. Abando ned // nextResume chooses the next resuming to register with the grpc stream. Abando ned
// streams are marked as nil in the queue since the head must wait for its infli ght registration. // streams are marked as nil in the queue since the head must wait for its infli ght registration.
func (w *watchGrpcStream) nextResume() *watcherStream { func (w *watchGrpcStream) nextResume() *watcherStream {
for len(w.resuming) != 0 { for len(w.resuming) != 0 {
if w.resuming[0] != nil { if w.resuming[0] != nil {
return w.resuming[0] return w.resuming[0]
} }
w.resuming = w.resuming[1:len(w.resuming)] w.resuming = w.resuming[1:len(w.resuming)]
} }
return nil return nil
 End of changes. 3 change blocks. 
34 lines changed or deleted 0 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)