"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "server/etcdserver/api/v3rpc/watch.go" between
etcd-3.5.5.tar.gz and etcd-3.5.6.tar.gz

About: etcd is a distributed reliable key-value store for the most critical data of a distributed system (written in "Go").

watch.go  (etcd-3.5.5):watch.go  (etcd-3.5.6)
skipping to change at line 19 skipping to change at line 19
// Unless required by applicable law or agreed to in writing, software // Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, // distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package v3rpc package v3rpc
import ( import (
"context" "context"
"fmt"
"io" "io"
"math/rand" "math/rand"
"sync" "sync"
"time" "time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/mvcc" "go.etcd.io/etcd/server/v3/mvcc"
"go.uber.org/zap" "go.uber.org/zap"
) )
const minWatchProgressInterval = 100 * time.Millisecond const minWatchProgressInterval = 100 * time.Millisecond
type watchServer struct { type watchServer struct {
skipping to change at line 226 skipping to change at line 228
err = stream.Context().Err() err = stream.Context().Err()
if err == context.Canceled { if err == context.Canceled {
err = rpctypes.ErrGRPCWatchCanceled err = rpctypes.ErrGRPCWatchCanceled
} }
} }
sws.close() sws.close()
return err return err
} }
func (sws *serverWatchStream) isWatchPermitted(wcr *pb.WatchCreateRequest) bool { func (sws *serverWatchStream) isWatchPermitted(wcr *pb.WatchCreateRequest) error {
authInfo, err := sws.ag.AuthInfoFromCtx(sws.gRPCStream.Context()) authInfo, err := sws.ag.AuthInfoFromCtx(sws.gRPCStream.Context())
if err != nil { if err != nil {
return false return err
} }
if authInfo == nil { if authInfo == nil {
// if auth is enabled, IsRangePermitted() can cause an error // if auth is enabled, IsRangePermitted() can cause an error
authInfo = &auth.AuthInfo{} authInfo = &auth.AuthInfo{}
} }
return sws.ag.AuthStore().IsRangePermitted(authInfo, wcr.Key, wcr.RangeEn d) == nil return sws.ag.AuthStore().IsRangePermitted(authInfo, wcr.Key, wcr.RangeEn d)
} }
func (sws *serverWatchStream) recvLoop() error { func (sws *serverWatchStream) recvLoop() error {
for { for {
req, err := sws.gRPCStream.Recv() req, err := sws.gRPCStream.Recv()
if err == io.EOF { if err == io.EOF {
return nil return nil
} }
if err != nil { if err != nil {
return err return err
skipping to change at line 269 skipping to change at line 271
if len(creq.RangeEnd) == 0 { if len(creq.RangeEnd) == 0 {
// force nil since watchstream.Watch distinguishe s // force nil since watchstream.Watch distinguishe s
// between nil and []byte{} for single key / >= // between nil and []byte{} for single key / >=
creq.RangeEnd = nil creq.RangeEnd = nil
} }
if len(creq.RangeEnd) == 1 && creq.RangeEnd[0] == 0 { if len(creq.RangeEnd) == 1 && creq.RangeEnd[0] == 0 {
// support >= key queries // support >= key queries
creq.RangeEnd = []byte{} creq.RangeEnd = []byte{}
} }
if !sws.isWatchPermitted(creq) { err := sws.isWatchPermitted(creq)
if err != nil {
var cancelReason string
switch err {
case auth.ErrInvalidAuthToken:
cancelReason = rpctypes.ErrGRPCInvalidAut
hToken.Error()
case auth.ErrAuthOldRevision:
cancelReason = rpctypes.ErrGRPCAuthOldRev
ision.Error()
case auth.ErrUserEmpty:
cancelReason = rpctypes.ErrGRPCUserEmpty.
Error()
default:
if err != auth.ErrPermissionDenied {
sws.lg.Error("unexpected error co
de", zap.Error(err))
}
cancelReason = rpctypes.ErrGRPCPermission
Denied.Error()
}
wr := &pb.WatchResponse{ wr := &pb.WatchResponse{
Header: sws.newResponseHeader(sws.w atchStream.Rev()), Header: sws.newResponseHeader(sws.w atchStream.Rev()),
WatchId: creq.WatchId, WatchId: clientv3.InvalidWatchID,
Canceled: true, Canceled: true,
Created: true, Created: true,
CancelReason: rpctypes.ErrGRPCPermissionD enied.Error(), CancelReason: cancelReason,
} }
select { select {
case sws.ctrlStream <- wr: case sws.ctrlStream <- wr:
continue continue
case <-sws.closec: case <-sws.closec:
return nil return nil
} }
} }
skipping to change at line 306 skipping to change at line 324
if creq.ProgressNotify { if creq.ProgressNotify {
sws.progress[id] = true sws.progress[id] = true
} }
if creq.PrevKv { if creq.PrevKv {
sws.prevKV[id] = true sws.prevKV[id] = true
} }
if creq.Fragment { if creq.Fragment {
sws.fragment[id] = true sws.fragment[id] = true
} }
sws.mu.Unlock() sws.mu.Unlock()
} else {
id = clientv3.InvalidWatchID
} }
wr := &pb.WatchResponse{ wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wsrev), Header: sws.newResponseHeader(wsrev),
WatchId: int64(id), WatchId: int64(id),
Created: true, Created: true,
Canceled: err != nil, Canceled: err != nil,
} }
if err != nil { if err != nil {
wr.CancelReason = err.Error() wr.CancelReason = err.Error()
} }
select { select {
skipping to change at line 343 skipping to change at line 364
delete(sws.progress, mvcc.WatchID(id)) delete(sws.progress, mvcc.WatchID(id))
delete(sws.prevKV, mvcc.WatchID(id)) delete(sws.prevKV, mvcc.WatchID(id))
delete(sws.fragment, mvcc.WatchID(id)) delete(sws.fragment, mvcc.WatchID(id))
sws.mu.Unlock() sws.mu.Unlock()
} }
} }
case *pb.WatchRequest_ProgressRequest: case *pb.WatchRequest_ProgressRequest:
if uv.ProgressRequest != nil { if uv.ProgressRequest != nil {
sws.ctrlStream <- &pb.WatchResponse{ sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchS tream.Rev()), Header: sws.newResponseHeader(sws.watchS tream.Rev()),
WatchId: -1, // response is not associate d with any WatchId and will be broadcast to all watch channels WatchId: clientv3.InvalidWatchID, // resp onse is not associated with any WatchId and will be broadcast to all watch chann els
} }
} }
default: default:
// we probably should not shutdown the entire stream when // we probably should not shutdown the entire stream when
// receive an valid command. // receive an valid command.
// so just do nothing instead. // so just do nothing instead.
continue continue
} }
} }
} }
skipping to change at line 466 skipping to change at line 487
sws.lg.Debug("failed to send watch contro l response to gRPC stream", zap.Error(err)) sws.lg.Debug("failed to send watch contro l response to gRPC stream", zap.Error(err))
} else { } else {
sws.lg.Warn("failed to send watch control response to gRPC stream", zap.Error(err)) sws.lg.Warn("failed to send watch control response to gRPC stream", zap.Error(err))
streamFailures.WithLabelValues("send", "w atch").Inc() streamFailures.WithLabelValues("send", "w atch").Inc()
} }
return return
} }
// track id creation // track id creation
wid := mvcc.WatchID(c.WatchId) wid := mvcc.WatchID(c.WatchId)
if c.Canceled {
if !(!(c.Canceled && c.Created) || wid == clientv3.Invali
dWatchID) {
panic(fmt.Sprintf("unexpected watchId: %d, wanted
: %d, since both 'Canceled' and 'Created' are true", wid, clientv3.InvalidWatchI
D))
}
if c.Canceled && wid != clientv3.InvalidWatchID {
delete(ids, wid) delete(ids, wid)
continue continue
} }
if c.Created { if c.Created {
// flush buffered events // flush buffered events
ids[wid] = struct{}{} ids[wid] = struct{}{}
for _, v := range pending[wid] { for _, v := range pending[wid] {
mvcc.ReportEventReceived(len(v.Events)) mvcc.ReportEventReceived(len(v.Events))
if err := sws.gRPCStream.Send(v); err != nil { if err := sws.gRPCStream.Send(v); err != nil {
if isClientCtxErr(sws.gRPCStream. Context().Err(), err) { if isClientCtxErr(sws.gRPCStream. Context().Err(), err) {
 End of changes. 12 change blocks. 
8 lines changed or deleted 42 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)