watch.go (etcd-3.5.5) | : | watch.go (etcd-3.5.6) | ||
---|---|---|---|---|
skipping to change at line 19 | skipping to change at line 19 | |||
// Unless required by applicable law or agreed to in writing, software | // Unless required by applicable law or agreed to in writing, software | |||
// distributed under the License is distributed on an "AS IS" BASIS, | // distributed under the License is distributed on an "AS IS" BASIS, | |||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
// See the License for the specific language governing permissions and | // See the License for the specific language governing permissions and | |||
// limitations under the License. | // limitations under the License. | |||
package v3rpc | package v3rpc | |||
import ( | import ( | |||
"context" | "context" | |||
"fmt" | ||||
"io" | "io" | |||
"math/rand" | "math/rand" | |||
"sync" | "sync" | |||
"time" | "time" | |||
pb "go.etcd.io/etcd/api/v3/etcdserverpb" | pb "go.etcd.io/etcd/api/v3/etcdserverpb" | |||
"go.etcd.io/etcd/api/v3/mvccpb" | "go.etcd.io/etcd/api/v3/mvccpb" | |||
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes" | "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" | |||
clientv3 "go.etcd.io/etcd/client/v3" | ||||
"go.etcd.io/etcd/server/v3/auth" | "go.etcd.io/etcd/server/v3/auth" | |||
"go.etcd.io/etcd/server/v3/etcdserver" | "go.etcd.io/etcd/server/v3/etcdserver" | |||
"go.etcd.io/etcd/server/v3/mvcc" | "go.etcd.io/etcd/server/v3/mvcc" | |||
"go.uber.org/zap" | "go.uber.org/zap" | |||
) | ) | |||
const minWatchProgressInterval = 100 * time.Millisecond | const minWatchProgressInterval = 100 * time.Millisecond | |||
type watchServer struct { | type watchServer struct { | |||
skipping to change at line 226 | skipping to change at line 228 | |||
err = stream.Context().Err() | err = stream.Context().Err() | |||
if err == context.Canceled { | if err == context.Canceled { | |||
err = rpctypes.ErrGRPCWatchCanceled | err = rpctypes.ErrGRPCWatchCanceled | |||
} | } | |||
} | } | |||
sws.close() | sws.close() | |||
return err | return err | |||
} | } | |||
func (sws *serverWatchStream) isWatchPermitted(wcr *pb.WatchCreateRequest) bool { | func (sws *serverWatchStream) isWatchPermitted(wcr *pb.WatchCreateRequest) error { | |||
authInfo, err := sws.ag.AuthInfoFromCtx(sws.gRPCStream.Context()) | authInfo, err := sws.ag.AuthInfoFromCtx(sws.gRPCStream.Context()) | |||
if err != nil { | if err != nil { | |||
return false | return err | |||
} | } | |||
if authInfo == nil { | if authInfo == nil { | |||
// if auth is enabled, IsRangePermitted() can cause an error | // if auth is enabled, IsRangePermitted() can cause an error | |||
authInfo = &auth.AuthInfo{} | authInfo = &auth.AuthInfo{} | |||
} | } | |||
return sws.ag.AuthStore().IsRangePermitted(authInfo, wcr.Key, wcr.RangeEn d) == nil | return sws.ag.AuthStore().IsRangePermitted(authInfo, wcr.Key, wcr.RangeEn d) | |||
} | } | |||
func (sws *serverWatchStream) recvLoop() error { | func (sws *serverWatchStream) recvLoop() error { | |||
for { | for { | |||
req, err := sws.gRPCStream.Recv() | req, err := sws.gRPCStream.Recv() | |||
if err == io.EOF { | if err == io.EOF { | |||
return nil | return nil | |||
} | } | |||
if err != nil { | if err != nil { | |||
return err | return err | |||
skipping to change at line 269 | skipping to change at line 271 | |||
if len(creq.RangeEnd) == 0 { | if len(creq.RangeEnd) == 0 { | |||
// force nil since watchstream.Watch distinguishe s | // force nil since watchstream.Watch distinguishe s | |||
// between nil and []byte{} for single key / >= | // between nil and []byte{} for single key / >= | |||
creq.RangeEnd = nil | creq.RangeEnd = nil | |||
} | } | |||
if len(creq.RangeEnd) == 1 && creq.RangeEnd[0] == 0 { | if len(creq.RangeEnd) == 1 && creq.RangeEnd[0] == 0 { | |||
// support >= key queries | // support >= key queries | |||
creq.RangeEnd = []byte{} | creq.RangeEnd = []byte{} | |||
} | } | |||
if !sws.isWatchPermitted(creq) { | err := sws.isWatchPermitted(creq) | |||
if err != nil { | ||||
var cancelReason string | ||||
switch err { | ||||
case auth.ErrInvalidAuthToken: | ||||
cancelReason = rpctypes.ErrGRPCInvalidAut | ||||
hToken.Error() | ||||
case auth.ErrAuthOldRevision: | ||||
cancelReason = rpctypes.ErrGRPCAuthOldRev | ||||
ision.Error() | ||||
case auth.ErrUserEmpty: | ||||
cancelReason = rpctypes.ErrGRPCUserEmpty. | ||||
Error() | ||||
default: | ||||
if err != auth.ErrPermissionDenied { | ||||
sws.lg.Error("unexpected error co | ||||
de", zap.Error(err)) | ||||
} | ||||
cancelReason = rpctypes.ErrGRPCPermission | ||||
Denied.Error() | ||||
} | ||||
wr := &pb.WatchResponse{ | wr := &pb.WatchResponse{ | |||
Header: sws.newResponseHeader(sws.w atchStream.Rev()), | Header: sws.newResponseHeader(sws.w atchStream.Rev()), | |||
WatchId: creq.WatchId, | WatchId: clientv3.InvalidWatchID, | |||
Canceled: true, | Canceled: true, | |||
Created: true, | Created: true, | |||
CancelReason: rpctypes.ErrGRPCPermissionD enied.Error(), | CancelReason: cancelReason, | |||
} | } | |||
select { | select { | |||
case sws.ctrlStream <- wr: | case sws.ctrlStream <- wr: | |||
continue | continue | |||
case <-sws.closec: | case <-sws.closec: | |||
return nil | return nil | |||
} | } | |||
} | } | |||
skipping to change at line 306 | skipping to change at line 324 | |||
if creq.ProgressNotify { | if creq.ProgressNotify { | |||
sws.progress[id] = true | sws.progress[id] = true | |||
} | } | |||
if creq.PrevKv { | if creq.PrevKv { | |||
sws.prevKV[id] = true | sws.prevKV[id] = true | |||
} | } | |||
if creq.Fragment { | if creq.Fragment { | |||
sws.fragment[id] = true | sws.fragment[id] = true | |||
} | } | |||
sws.mu.Unlock() | sws.mu.Unlock() | |||
} else { | ||||
id = clientv3.InvalidWatchID | ||||
} | } | |||
wr := &pb.WatchResponse{ | wr := &pb.WatchResponse{ | |||
Header: sws.newResponseHeader(wsrev), | Header: sws.newResponseHeader(wsrev), | |||
WatchId: int64(id), | WatchId: int64(id), | |||
Created: true, | Created: true, | |||
Canceled: err != nil, | Canceled: err != nil, | |||
} | } | |||
if err != nil { | if err != nil { | |||
wr.CancelReason = err.Error() | wr.CancelReason = err.Error() | |||
} | } | |||
select { | select { | |||
skipping to change at line 343 | skipping to change at line 364 | |||
delete(sws.progress, mvcc.WatchID(id)) | delete(sws.progress, mvcc.WatchID(id)) | |||
delete(sws.prevKV, mvcc.WatchID(id)) | delete(sws.prevKV, mvcc.WatchID(id)) | |||
delete(sws.fragment, mvcc.WatchID(id)) | delete(sws.fragment, mvcc.WatchID(id)) | |||
sws.mu.Unlock() | sws.mu.Unlock() | |||
} | } | |||
} | } | |||
case *pb.WatchRequest_ProgressRequest: | case *pb.WatchRequest_ProgressRequest: | |||
if uv.ProgressRequest != nil { | if uv.ProgressRequest != nil { | |||
sws.ctrlStream <- &pb.WatchResponse{ | sws.ctrlStream <- &pb.WatchResponse{ | |||
Header: sws.newResponseHeader(sws.watchS tream.Rev()), | Header: sws.newResponseHeader(sws.watchS tream.Rev()), | |||
WatchId: -1, // response is not associate d with any WatchId and will be broadcast to all watch channels | WatchId: clientv3.InvalidWatchID, // resp onse is not associated with any WatchId and will be broadcast to all watch chann els | |||
} | } | |||
} | } | |||
default: | default: | |||
// we probably should not shutdown the entire stream when | // we probably should not shutdown the entire stream when | |||
// receive an valid command. | // receive an valid command. | |||
// so just do nothing instead. | // so just do nothing instead. | |||
continue | continue | |||
} | } | |||
} | } | |||
} | } | |||
skipping to change at line 466 | skipping to change at line 487 | |||
sws.lg.Debug("failed to send watch contro l response to gRPC stream", zap.Error(err)) | sws.lg.Debug("failed to send watch contro l response to gRPC stream", zap.Error(err)) | |||
} else { | } else { | |||
sws.lg.Warn("failed to send watch control response to gRPC stream", zap.Error(err)) | sws.lg.Warn("failed to send watch control response to gRPC stream", zap.Error(err)) | |||
streamFailures.WithLabelValues("send", "w atch").Inc() | streamFailures.WithLabelValues("send", "w atch").Inc() | |||
} | } | |||
return | return | |||
} | } | |||
// track id creation | // track id creation | |||
wid := mvcc.WatchID(c.WatchId) | wid := mvcc.WatchID(c.WatchId) | |||
if c.Canceled { | ||||
if !(!(c.Canceled && c.Created) || wid == clientv3.Invali | ||||
dWatchID) { | ||||
panic(fmt.Sprintf("unexpected watchId: %d, wanted | ||||
: %d, since both 'Canceled' and 'Created' are true", wid, clientv3.InvalidWatchI | ||||
D)) | ||||
} | ||||
if c.Canceled && wid != clientv3.InvalidWatchID { | ||||
delete(ids, wid) | delete(ids, wid) | |||
continue | continue | |||
} | } | |||
if c.Created { | if c.Created { | |||
// flush buffered events | // flush buffered events | |||
ids[wid] = struct{}{} | ids[wid] = struct{}{} | |||
for _, v := range pending[wid] { | for _, v := range pending[wid] { | |||
mvcc.ReportEventReceived(len(v.Events)) | mvcc.ReportEventReceived(len(v.Events)) | |||
if err := sws.gRPCStream.Send(v); err != nil { | if err := sws.gRPCStream.Send(v); err != nil { | |||
if isClientCtxErr(sws.gRPCStream. Context().Err(), err) { | if isClientCtxErr(sws.gRPCStream. Context().Err(), err) { | |||
End of changes. 12 change blocks. | ||||
8 lines changed or deleted | 42 lines changed or added |