http.go (nsq-1.2.0) | : | http.go (nsq-1.2.1) | ||
---|---|---|---|---|
skipping to change at line 35 | skipping to change at line 35 | |||
) | ) | |||
var boolParams = map[string]bool{ | var boolParams = map[string]bool{ | |||
"true": true, | "true": true, | |||
"1": true, | "1": true, | |||
"false": false, | "false": false, | |||
"0": false, | "0": false, | |||
} | } | |||
type httpServer struct { | type httpServer struct { | |||
ctx *context | nsqd *NSQD | |||
tlsEnabled bool | tlsEnabled bool | |||
tlsRequired bool | tlsRequired bool | |||
router http.Handler | router http.Handler | |||
} | } | |||
func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer | func newHTTPServer(nsqd *NSQD, tlsEnabled bool, tlsRequired bool) *httpServer { | |||
{ | log := http_api.Log(nsqd.logf) | |||
log := http_api.Log(ctx.nsqd.logf) | ||||
router := httprouter.New() | router := httprouter.New() | |||
router.HandleMethodNotAllowed = true | router.HandleMethodNotAllowed = true | |||
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf) | router.PanicHandler = http_api.LogPanicHandler(nsqd.logf) | |||
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf) | router.NotFound = http_api.LogNotFoundHandler(nsqd.logf) | |||
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.lo | router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(nsqd.logf) | |||
gf) | ||||
s := &httpServer{ | s := &httpServer{ | |||
ctx: ctx, | nsqd: nsqd, | |||
tlsEnabled: tlsEnabled, | tlsEnabled: tlsEnabled, | |||
tlsRequired: tlsRequired, | tlsRequired: tlsRequired, | |||
router: router, | router: router, | |||
} | } | |||
router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_ api.PlainText)) | router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_ api.PlainText)) | |||
router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V 1)) | router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V 1)) | |||
// v1 negotiate | // v1 negotiate | |||
router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1)) | router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1)) | |||
skipping to change at line 105 | skipping to change at line 105 | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{http.StatusBadRequest, fmt.Sprintf("inva lid block rate : %s", err.Error())} | return nil, http_api.Err{http.StatusBadRequest, fmt.Sprintf("inva lid block rate : %s", err.Error())} | |||
} | } | |||
runtime.SetBlockProfileRate(rate) | runtime.SetBlockProfileRate(rate) | |||
return nil, nil | return nil, nil | |||
} | } | |||
func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { | func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { | |||
if !s.tlsEnabled && s.tlsRequired { | if !s.tlsEnabled && s.tlsRequired { | |||
resp := fmt.Sprintf(`{"message": "TLS_REQUIRED", "https_port": %d }`, | resp := fmt.Sprintf(`{"message": "TLS_REQUIRED", "https_port": %d }`, | |||
s.ctx.nsqd.RealHTTPSAddr().Port) | s.nsqd.RealHTTPSAddr().Port) | |||
w.Header().Set("X-NSQ-Content-Type", "nsq; version=1.0") | w.Header().Set("X-NSQ-Content-Type", "nsq; version=1.0") | |||
w.Header().Set("Content-Type", "application/json; charset=utf-8") | w.Header().Set("Content-Type", "application/json; charset=utf-8") | |||
w.WriteHeader(403) | w.WriteHeader(403) | |||
io.WriteString(w, resp) | io.WriteString(w, resp) | |||
return | return | |||
} | } | |||
s.router.ServeHTTP(w, req) | s.router.ServeHTTP(w, req) | |||
} | } | |||
func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps ht tprouter.Params) (interface{}, error) { | func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps ht tprouter.Params) (interface{}, error) { | |||
health := s.ctx.nsqd.GetHealth() | health := s.nsqd.GetHealth() | |||
if !s.ctx.nsqd.IsHealthy() { | if !s.nsqd.IsHealthy() { | |||
return nil, http_api.Err{500, health} | return nil, http_api.Err{500, health} | |||
} | } | |||
return health, nil | return health, nil | |||
} | } | |||
func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprou ter.Params) (interface{}, error) { | func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprou ter.Params) (interface{}, error) { | |||
hostname, err := os.Hostname() | hostname, err := os.Hostname() | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{500, err.Error()} | return nil, http_api.Err{500, err.Error()} | |||
} | } | |||
return struct { | return struct { | |||
Version string `json:"version"` | Version string `json:"version"` | |||
BroadcastAddress string `json:"broadcast_address"` | BroadcastAddress string `json:"broadcast_address"` | |||
Hostname string `json:"hostname"` | Hostname string `json:"hostname"` | |||
HTTPPort int `json:"http_port"` | HTTPPort int `json:"http_port"` | |||
TCPPort int `json:"tcp_port"` | TCPPort int `json:"tcp_port"` | |||
StartTime int64 `json:"start_time"` | StartTime int64 `json:"start_time"` | |||
}{ | }{ | |||
Version: version.Binary, | Version: version.Binary, | |||
BroadcastAddress: s.ctx.nsqd.getOpts().BroadcastAddress, | BroadcastAddress: s.nsqd.getOpts().BroadcastAddress, | |||
Hostname: hostname, | Hostname: hostname, | |||
TCPPort: s.ctx.nsqd.RealTCPAddr().Port, | TCPPort: s.nsqd.RealTCPAddr().Port, | |||
HTTPPort: s.ctx.nsqd.RealHTTPAddr().Port, | HTTPPort: s.nsqd.RealHTTPAddr().Port, | |||
StartTime: s.ctx.nsqd.GetStartTime().Unix(), | StartTime: s.nsqd.GetStartTime().Unix(), | |||
}, nil | }, nil | |||
} | } | |||
func (s *httpServer) getExistingTopicFromQuery(req *http.Request) (*http_api.Req Params, *Topic, string, error) { | func (s *httpServer) getExistingTopicFromQuery(req *http.Request) (*http_api.Req Params, *Topic, string, error) { | |||
reqParams, err := http_api.NewReqParams(req) | reqParams, err := http_api.NewReqParams(req) | |||
if err != nil { | if err != nil { | |||
s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) | s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err ) | |||
return nil, nil, "", http_api.Err{400, "INVALID_REQUEST"} | return nil, nil, "", http_api.Err{400, "INVALID_REQUEST"} | |||
} | } | |||
topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) | topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) | |||
if err != nil { | if err != nil { | |||
return nil, nil, "", http_api.Err{400, err.Error()} | return nil, nil, "", http_api.Err{400, err.Error()} | |||
} | } | |||
topic, err := s.ctx.nsqd.GetExistingTopic(topicName) | topic, err := s.nsqd.GetExistingTopic(topicName) | |||
if err != nil { | if err != nil { | |||
return nil, nil, "", http_api.Err{404, "TOPIC_NOT_FOUND"} | return nil, nil, "", http_api.Err{404, "TOPIC_NOT_FOUND"} | |||
} | } | |||
return reqParams, topic, channelName, err | return reqParams, topic, channelName, err | |||
} | } | |||
func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, e rror) { | func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, e rror) { | |||
reqParams, err := url.ParseQuery(req.URL.RawQuery) | reqParams, err := url.ParseQuery(req.URL.RawQuery) | |||
if err != nil { | if err != nil { | |||
s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) | s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err ) | |||
return nil, nil, http_api.Err{400, "INVALID_REQUEST"} | return nil, nil, http_api.Err{400, "INVALID_REQUEST"} | |||
} | } | |||
topicNames, ok := reqParams["topic"] | topicNames, ok := reqParams["topic"] | |||
if !ok { | if !ok { | |||
return nil, nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | return nil, nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | |||
} | } | |||
topicName := topicNames[0] | topicName := topicNames[0] | |||
if !protocol.IsValidTopicName(topicName) { | if !protocol.IsValidTopicName(topicName) { | |||
return nil, nil, http_api.Err{400, "INVALID_TOPIC"} | return nil, nil, http_api.Err{400, "INVALID_TOPIC"} | |||
} | } | |||
return reqParams, s.ctx.nsqd.GetTopic(topicName), nil | return reqParams, s.nsqd.GetTopic(topicName), nil | |||
} | } | |||
func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout er.Params) (interface{}, error) { | func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout er.Params) (interface{}, error) { | |||
// TODO: one day I'd really like to just error on chunked requests | // TODO: one day I'd really like to just error on chunked requests | |||
// to be able to fail "too big" requests before we even read | // to be able to fail "too big" requests before we even read | |||
if req.ContentLength > s.ctx.nsqd.getOpts().MaxMsgSize { | if req.ContentLength > s.nsqd.getOpts().MaxMsgSize { | |||
return nil, http_api.Err{413, "MSG_TOO_BIG"} | return nil, http_api.Err{413, "MSG_TOO_BIG"} | |||
} | } | |||
// add 1 so that it's greater than our max when we test for it | // add 1 so that it's greater than our max when we test for it | |||
// (LimitReader returns a "fake" EOF) | // (LimitReader returns a "fake" EOF) | |||
readMax := s.ctx.nsqd.getOpts().MaxMsgSize + 1 | readMax := s.nsqd.getOpts().MaxMsgSize + 1 | |||
body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax)) | body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax)) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{500, "INTERNAL_ERROR"} | return nil, http_api.Err{500, "INTERNAL_ERROR"} | |||
} | } | |||
if int64(len(body)) == readMax { | if int64(len(body)) == readMax { | |||
return nil, http_api.Err{413, "MSG_TOO_BIG"} | return nil, http_api.Err{413, "MSG_TOO_BIG"} | |||
} | } | |||
if len(body) == 0 { | if len(body) == 0 { | |||
return nil, http_api.Err{400, "MSG_EMPTY"} | return nil, http_api.Err{400, "MSG_EMPTY"} | |||
} | } | |||
skipping to change at line 220 | skipping to change at line 220 | |||
} | } | |||
var deferred time.Duration | var deferred time.Duration | |||
if ds, ok := reqParams["defer"]; ok { | if ds, ok := reqParams["defer"]; ok { | |||
var di int64 | var di int64 | |||
di, err = strconv.ParseInt(ds[0], 10, 64) | di, err = strconv.ParseInt(ds[0], 10, 64) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "INVALID_DEFER"} | return nil, http_api.Err{400, "INVALID_DEFER"} | |||
} | } | |||
deferred = time.Duration(di) * time.Millisecond | deferred = time.Duration(di) * time.Millisecond | |||
if deferred < 0 || deferred > s.ctx.nsqd.getOpts().MaxReqTimeout { | if deferred < 0 || deferred > s.nsqd.getOpts().MaxReqTimeout { | |||
return nil, http_api.Err{400, "INVALID_DEFER"} | return nil, http_api.Err{400, "INVALID_DEFER"} | |||
} | } | |||
} | } | |||
msg := NewMessage(topic.GenerateID(), body) | msg := NewMessage(topic.GenerateID(), body) | |||
msg.deferred = deferred | msg.deferred = deferred | |||
err = topic.PutMessage(msg) | err = topic.PutMessage(msg) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{503, "EXITING"} | return nil, http_api.Err{503, "EXITING"} | |||
} | } | |||
skipping to change at line 242 | skipping to change at line 242 | |||
return "OK", nil | return "OK", nil | |||
} | } | |||
func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou ter.Params) (interface{}, error) { | func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou ter.Params) (interface{}, error) { | |||
var msgs []*Message | var msgs []*Message | |||
var exit bool | var exit bool | |||
// TODO: one day I'd really like to just error on chunked requests | // TODO: one day I'd really like to just error on chunked requests | |||
// to be able to fail "too big" requests before we even read | // to be able to fail "too big" requests before we even read | |||
if req.ContentLength > s.ctx.nsqd.getOpts().MaxBodySize { | if req.ContentLength > s.nsqd.getOpts().MaxBodySize { | |||
return nil, http_api.Err{413, "BODY_TOO_BIG"} | return nil, http_api.Err{413, "BODY_TOO_BIG"} | |||
} | } | |||
reqParams, topic, err := s.getTopicFromQuery(req) | reqParams, topic, err := s.getTopicFromQuery(req) | |||
if err != nil { | if err != nil { | |||
return nil, err | return nil, err | |||
} | } | |||
// text mode is default, but unrecognized binary opt considered true | // text mode is default, but unrecognized binary opt considered true | |||
binaryMode := false | binaryMode := false | |||
if vals, ok := reqParams["binary"]; ok { | if vals, ok := reqParams["binary"]; ok { | |||
if binaryMode, ok = boolParams[vals[0]]; !ok { | if binaryMode, ok = boolParams[vals[0]]; !ok { | |||
binaryMode = true | binaryMode = true | |||
s.ctx.nsqd.logf(LOG_WARN, "deprecated value '%s' used for /mpub binary param", vals[0]) | s.nsqd.logf(LOG_WARN, "deprecated value '%s' used for /mp ub binary param", vals[0]) | |||
} | } | |||
} | } | |||
if binaryMode { | if binaryMode { | |||
tmp := make([]byte, 4) | tmp := make([]byte, 4) | |||
msgs, err = readMPUB(req.Body, tmp, topic, | msgs, err = readMPUB(req.Body, tmp, topic, | |||
s.ctx.nsqd.getOpts().MaxMsgSize, s.ctx.nsqd.getOpts().Max BodySize) | s.nsqd.getOpts().MaxMsgSize, s.nsqd.getOpts().MaxBodySize ) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{413, err.(*protocol.FatalClientE rr).Code[2:]} | return nil, http_api.Err{413, err.(*protocol.FatalClientE rr).Code[2:]} | |||
} | } | |||
} else { | } else { | |||
// add 1 so that it's greater than our max when we test for it | // add 1 so that it's greater than our max when we test for it | |||
// (LimitReader returns a "fake" EOF) | // (LimitReader returns a "fake" EOF) | |||
readMax := s.ctx.nsqd.getOpts().MaxBodySize + 1 | readMax := s.nsqd.getOpts().MaxBodySize + 1 | |||
rdr := bufio.NewReader(io.LimitReader(req.Body, readMax)) | rdr := bufio.NewReader(io.LimitReader(req.Body, readMax)) | |||
total := 0 | total := 0 | |||
for !exit { | for !exit { | |||
var block []byte | var block []byte | |||
block, err = rdr.ReadBytes('\n') | block, err = rdr.ReadBytes('\n') | |||
if err != nil { | if err != nil { | |||
if err != io.EOF { | if err != io.EOF { | |||
return nil, http_api.Err{500, "INTERNAL_E RROR"} | return nil, http_api.Err{500, "INTERNAL_E RROR"} | |||
} | } | |||
exit = true | exit = true | |||
skipping to change at line 296 | skipping to change at line 296 | |||
if len(block) > 0 && block[len(block)-1] == '\n' { | if len(block) > 0 && block[len(block)-1] == '\n' { | |||
block = block[:len(block)-1] | block = block[:len(block)-1] | |||
} | } | |||
// silently discard 0 length messages | // silently discard 0 length messages | |||
// this maintains the behavior pre 0.2.22 | // this maintains the behavior pre 0.2.22 | |||
if len(block) == 0 { | if len(block) == 0 { | |||
continue | continue | |||
} | } | |||
if int64(len(block)) > s.ctx.nsqd.getOpts().MaxMsgSize { | if int64(len(block)) > s.nsqd.getOpts().MaxMsgSize { | |||
return nil, http_api.Err{413, "MSG_TOO_BIG"} | return nil, http_api.Err{413, "MSG_TOO_BIG"} | |||
} | } | |||
msg := NewMessage(topic.GenerateID(), block) | msg := NewMessage(topic.GenerateID(), block) | |||
msgs = append(msgs, msg) | msgs = append(msgs, msg) | |||
} | } | |||
} | } | |||
err = topic.PutMessages(msgs) | err = topic.PutMessages(msgs) | |||
if err != nil { | if err != nil { | |||
skipping to change at line 321 | skipping to change at line 321 | |||
} | } | |||
func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { | func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { | |||
_, _, err := s.getTopicFromQuery(req) | _, _, err := s.getTopicFromQuery(req) | |||
return nil, err | return nil, err | |||
} | } | |||
func (s *httpServer) doEmptyTopic(w http.ResponseWriter, req *http.Request, ps h ttprouter.Params) (interface{}, error) { | func (s *httpServer) doEmptyTopic(w http.ResponseWriter, req *http.Request, ps h ttprouter.Params) (interface{}, error) { | |||
reqParams, err := http_api.NewReqParams(req) | reqParams, err := http_api.NewReqParams(req) | |||
if err != nil { | if err != nil { | |||
s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) | s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err ) | |||
return nil, http_api.Err{400, "INVALID_REQUEST"} | return nil, http_api.Err{400, "INVALID_REQUEST"} | |||
} | } | |||
topicName, err := reqParams.Get("topic") | topicName, err := reqParams.Get("topic") | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | |||
} | } | |||
if !protocol.IsValidTopicName(topicName) { | if !protocol.IsValidTopicName(topicName) { | |||
return nil, http_api.Err{400, "INVALID_TOPIC"} | return nil, http_api.Err{400, "INVALID_TOPIC"} | |||
} | } | |||
topic, err := s.ctx.nsqd.GetExistingTopic(topicName) | topic, err := s.nsqd.GetExistingTopic(topicName) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{404, "TOPIC_NOT_FOUND"} | return nil, http_api.Err{404, "TOPIC_NOT_FOUND"} | |||
} | } | |||
err = topic.Empty() | err = topic.Empty() | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{500, "INTERNAL_ERROR"} | return nil, http_api.Err{500, "INTERNAL_ERROR"} | |||
} | } | |||
return nil, nil | return nil, nil | |||
} | } | |||
func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { | func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { | |||
reqParams, err := http_api.NewReqParams(req) | reqParams, err := http_api.NewReqParams(req) | |||
if err != nil { | if err != nil { | |||
s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) | s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err ) | |||
return nil, http_api.Err{400, "INVALID_REQUEST"} | return nil, http_api.Err{400, "INVALID_REQUEST"} | |||
} | } | |||
topicName, err := reqParams.Get("topic") | topicName, err := reqParams.Get("topic") | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | |||
} | } | |||
err = s.ctx.nsqd.DeleteExistingTopic(topicName) | err = s.nsqd.DeleteExistingTopic(topicName) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{404, "TOPIC_NOT_FOUND"} | return nil, http_api.Err{404, "TOPIC_NOT_FOUND"} | |||
} | } | |||
return nil, nil | return nil, nil | |||
} | } | |||
func (s *httpServer) doPauseTopic(w http.ResponseWriter, req *http.Request, ps h ttprouter.Params) (interface{}, error) { | func (s *httpServer) doPauseTopic(w http.ResponseWriter, req *http.Request, ps h ttprouter.Params) (interface{}, error) { | |||
reqParams, err := http_api.NewReqParams(req) | reqParams, err := http_api.NewReqParams(req) | |||
if err != nil { | if err != nil { | |||
s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) | s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err ) | |||
return nil, http_api.Err{400, "INVALID_REQUEST"} | return nil, http_api.Err{400, "INVALID_REQUEST"} | |||
} | } | |||
topicName, err := reqParams.Get("topic") | topicName, err := reqParams.Get("topic") | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | |||
} | } | |||
topic, err := s.ctx.nsqd.GetExistingTopic(topicName) | topic, err := s.nsqd.GetExistingTopic(topicName) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{404, "TOPIC_NOT_FOUND"} | return nil, http_api.Err{404, "TOPIC_NOT_FOUND"} | |||
} | } | |||
if strings.Contains(req.URL.Path, "unpause") { | if strings.Contains(req.URL.Path, "unpause") { | |||
err = topic.UnPause() | err = topic.UnPause() | |||
} else { | } else { | |||
err = topic.Pause() | err = topic.Pause() | |||
} | } | |||
if err != nil { | if err != nil { | |||
s.ctx.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, er r) | s.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err) | |||
return nil, http_api.Err{500, "INTERNAL_ERROR"} | return nil, http_api.Err{500, "INTERNAL_ERROR"} | |||
} | } | |||
// pro-actively persist metadata so in case of process failure | // pro-actively persist metadata so in case of process failure | |||
// nsqd won't suddenly (un)pause a topic | // nsqd won't suddenly (un)pause a topic | |||
s.ctx.nsqd.Lock() | s.nsqd.Lock() | |||
s.ctx.nsqd.PersistMetadata() | s.nsqd.PersistMetadata() | |||
s.ctx.nsqd.Unlock() | s.nsqd.Unlock() | |||
return nil, nil | return nil, nil | |||
} | } | |||
func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, p s httprouter.Params) (interface{}, error) { | func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, p s httprouter.Params) (interface{}, error) { | |||
_, topic, channelName, err := s.getExistingTopicFromQuery(req) | _, topic, channelName, err := s.getExistingTopicFromQuery(req) | |||
if err != nil { | if err != nil { | |||
return nil, err | return nil, err | |||
} | } | |||
topic.GetChannel(channelName) | topic.GetChannel(channelName) | |||
return nil, nil | return nil, nil | |||
skipping to change at line 461 | skipping to change at line 461 | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"} | return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"} | |||
} | } | |||
if strings.Contains(req.URL.Path, "unpause") { | if strings.Contains(req.URL.Path, "unpause") { | |||
err = channel.UnPause() | err = channel.UnPause() | |||
} else { | } else { | |||
err = channel.Pause() | err = channel.Pause() | |||
} | } | |||
if err != nil { | if err != nil { | |||
s.ctx.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, er r) | s.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err) | |||
return nil, http_api.Err{500, "INTERNAL_ERROR"} | return nil, http_api.Err{500, "INTERNAL_ERROR"} | |||
} | } | |||
// pro-actively persist metadata so in case of process failure | // pro-actively persist metadata so in case of process failure | |||
// nsqd won't suddenly (un)pause a channel | // nsqd won't suddenly (un)pause a channel | |||
s.ctx.nsqd.Lock() | s.nsqd.Lock() | |||
s.ctx.nsqd.PersistMetadata() | s.nsqd.PersistMetadata() | |||
s.ctx.nsqd.Unlock() | s.nsqd.Unlock() | |||
return nil, nil | return nil, nil | |||
} | } | |||
func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro uter.Params) (interface{}, error) { | func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro uter.Params) (interface{}, error) { | |||
var producerStats []ClientStats | ||||
reqParams, err := http_api.NewReqParams(req) | reqParams, err := http_api.NewReqParams(req) | |||
if err != nil { | if err != nil { | |||
s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) | s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err ) | |||
return nil, http_api.Err{400, "INVALID_REQUEST"} | return nil, http_api.Err{400, "INVALID_REQUEST"} | |||
} | } | |||
formatString, _ := reqParams.Get("format") | formatString, _ := reqParams.Get("format") | |||
topicName, _ := reqParams.Get("topic") | topicName, _ := reqParams.Get("topic") | |||
channelName, _ := reqParams.Get("channel") | channelName, _ := reqParams.Get("channel") | |||
includeClientsParam, _ := reqParams.Get("include_clients") | includeClientsParam, _ := reqParams.Get("include_clients") | |||
includeMemParam, _ := reqParams.Get("include_mem") | ||||
jsonFormat := formatString == "json" | jsonFormat := formatString == "json" | |||
includeClients, ok := boolParams[includeClientsParam] | includeClients, ok := boolParams[includeClientsParam] | |||
if !ok { | if !ok { | |||
includeClients = true | includeClients = true | |||
} | } | |||
if includeClients { | includeMem, ok := boolParams[includeMemParam] | |||
producerStats = s.ctx.nsqd.GetProducerStats() | if !ok { | |||
includeMem = true | ||||
} | } | |||
stats := s.ctx.nsqd.GetStats(topicName, channelName, includeClients) | stats := s.nsqd.GetStats(topicName, channelName, includeClients) | |||
health := s.ctx.nsqd.GetHealth() | health := s.nsqd.GetHealth() | |||
startTime := s.ctx.nsqd.GetStartTime() | startTime := s.nsqd.GetStartTime() | |||
uptime := time.Since(startTime) | uptime := time.Since(startTime) | |||
// filter by topic (if specified) | var ms *memStats | |||
if len(topicName) > 0 { | if includeMem { | |||
for _, topicStats := range stats { | m := getMemStats() | |||
if topicStats.TopicName == topicName { | ms = &m | |||
// filter by channel (if specified) | ||||
if len(channelName) > 0 { | ||||
for _, channelStats := range topicStats.C | ||||
hannels { | ||||
if channelStats.ChannelName == ch | ||||
annelName { | ||||
topicStats.Channels = []C | ||||
hannelStats{channelStats} | ||||
break | ||||
} | ||||
} | ||||
} | ||||
stats = []TopicStats{topicStats} | ||||
break | ||||
} | ||||
} | ||||
filteredProducerStats := make([]ClientStats, 0) | ||||
for _, clientStat := range producerStats { | ||||
var found bool | ||||
var count uint64 | ||||
for _, v := range clientStat.PubCounts { | ||||
if v.Topic == topicName { | ||||
count = v.Count | ||||
found = true | ||||
break | ||||
} | ||||
} | ||||
if !found { | ||||
continue | ||||
} | ||||
clientStat.PubCounts = []PubCount{PubCount{ | ||||
Topic: topicName, | ||||
Count: count, | ||||
}} | ||||
filteredProducerStats = append(filteredProducerStats, cli | ||||
entStat) | ||||
} | ||||
producerStats = filteredProducerStats | ||||
} | } | |||
ms := getMemStats() | ||||
if !jsonFormat { | if !jsonFormat { | |||
return s.printStats(stats, producerStats, ms, health, startTime, uptime), nil | return s.printStats(stats, ms, health, startTime, uptime), nil | |||
} | } | |||
// TODO: should producer stats be hung off topics? | ||||
return struct { | return struct { | |||
Version string `json:"version"` | Version string `json:"version"` | |||
Health string `json:"health"` | Health string `json:"health"` | |||
StartTime int64 `json:"start_time"` | StartTime int64 `json:"start_time"` | |||
Topics []TopicStats `json:"topics"` | Topics []TopicStats `json:"topics"` | |||
Memory memStats `json:"memory"` | Memory *memStats `json:"memory,omitempty"` | |||
Producers []ClientStats `json:"producers"` | Producers []ClientStats `json:"producers"` | |||
}{version.Binary, health, startTime.Unix(), stats, ms, producerStats}, ni l | }{version.Binary, health, startTime.Unix(), stats.Topics, ms, stats.Produ cers}, nil | |||
} | } | |||
func (s *httpServer) printStats(stats []TopicStats, producerStats []ClientStats, ms memStats, health string, startTime time.Time, uptime time.Duration) []byte { | func (s *httpServer) printStats(stats Stats, ms *memStats, health string, startT ime time.Time, uptime time.Duration) []byte { | |||
var buf bytes.Buffer | var buf bytes.Buffer | |||
w := &buf | w := &buf | |||
now := time.Now() | ||||
fmt.Fprintf(w, "%s\n", version.String("nsqd")) | fmt.Fprintf(w, "%s\n", version.String("nsqd")) | |||
fmt.Fprintf(w, "start_time %v\n", startTime.Format(time.RFC3339)) | fmt.Fprintf(w, "start_time %v\n", startTime.Format(time.RFC3339)) | |||
fmt.Fprintf(w, "uptime %s\n", uptime) | fmt.Fprintf(w, "uptime %s\n", uptime) | |||
fmt.Fprintf(w, "\nHealth: %s\n", health) | fmt.Fprintf(w, "\nHealth: %s\n", health) | |||
fmt.Fprintf(w, "\nMemory:\n") | if ms != nil { | |||
fmt.Fprintf(w, " %-25s\t%d\n", "heap_objects", ms.HeapObjects) | fmt.Fprintf(w, "\nMemory:\n") | |||
fmt.Fprintf(w, " %-25s\t%d\n", "heap_idle_bytes", ms.HeapIdleBytes) | fmt.Fprintf(w, " %-25s\t%d\n", "heap_objects", ms.HeapObjects) | |||
fmt.Fprintf(w, " %-25s\t%d\n", "heap_in_use_bytes", ms.HeapInUseBytes) | fmt.Fprintf(w, " %-25s\t%d\n", "heap_idle_bytes", ms.HeapIdleBy | |||
fmt.Fprintf(w, " %-25s\t%d\n", "heap_released_bytes", ms.HeapReleasedBy | tes) | |||
tes) | fmt.Fprintf(w, " %-25s\t%d\n", "heap_in_use_bytes", ms.HeapInUs | |||
fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_100", ms.GCPauseUsec100) | eBytes) | |||
fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_99", ms.GCPauseUsec99) | fmt.Fprintf(w, " %-25s\t%d\n", "heap_released_bytes", ms.HeapRe | |||
fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_95", ms.GCPauseUsec95) | leasedBytes) | |||
fmt.Fprintf(w, " %-25s\t%d\n", "next_gc_bytes", ms.NextGCBytes) | fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_100", ms.GCPauseU | |||
fmt.Fprintf(w, " %-25s\t%d\n", "gc_total_runs", ms.GCTotalRuns) | sec100) | |||
fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_99", ms.GCPauseUs | ||||
ec99) | ||||
fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_95", ms.GCPauseUs | ||||
ec95) | ||||
fmt.Fprintf(w, " %-25s\t%d\n", "next_gc_bytes", ms.NextGCBytes) | ||||
fmt.Fprintf(w, " %-25s\t%d\n", "gc_total_runs", ms.GCTotalRuns) | ||||
} | ||||
if len(stats) == 0 { | if len(stats.Topics) == 0 { | |||
fmt.Fprintf(w, "\nTopics: None\n") | fmt.Fprintf(w, "\nTopics: None\n") | |||
} else { | } else { | |||
fmt.Fprintf(w, "\nTopics:") | fmt.Fprintf(w, "\nTopics:") | |||
} | } | |||
for _, t := range stats { | for _, t := range stats.Topics { | |||
var pausedPrefix string | var pausedPrefix string | |||
if t.Paused { | if t.Paused { | |||
pausedPrefix = "*P " | pausedPrefix = "*P " | |||
} else { | } else { | |||
pausedPrefix = " " | pausedPrefix = " " | |||
} | } | |||
fmt.Fprintf(w, "\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n", | fmt.Fprintf(w, "\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n", | |||
pausedPrefix, | pausedPrefix, | |||
t.TopicName, | t.TopicName, | |||
t.Depth, | t.Depth, | |||
skipping to change at line 619 | skipping to change at line 583 | |||
c.Depth, | c.Depth, | |||
c.BackendDepth, | c.BackendDepth, | |||
c.InFlightCount, | c.InFlightCount, | |||
c.DeferredCount, | c.DeferredCount, | |||
c.RequeueCount, | c.RequeueCount, | |||
c.TimeoutCount, | c.TimeoutCount, | |||
c.MessageCount, | c.MessageCount, | |||
c.E2eProcessingLatency, | c.E2eProcessingLatency, | |||
) | ) | |||
for _, client := range c.Clients { | for _, client := range c.Clients { | |||
connectTime := time.Unix(client.ConnectTime, 0) | fmt.Fprintf(w, " %s\n", client) | |||
// truncate to the second | ||||
duration := time.Duration(int64(now.Sub(connectTi | ||||
me).Seconds())) * time.Second | ||||
fmt.Fprintf(w, " [%s %-21s] state: %d infl | ||||
t: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s\n", | ||||
client.Version, | ||||
client.ClientID, | ||||
client.State, | ||||
client.InFlightCount, | ||||
client.ReadyCount, | ||||
client.FinishCount, | ||||
client.RequeueCount, | ||||
client.MessageCount, | ||||
duration, | ||||
) | ||||
} | } | |||
} | } | |||
} | } | |||
if len(producerStats) == 0 { | if len(stats.Producers) == 0 { | |||
fmt.Fprintf(w, "\nProducers: None\n") | fmt.Fprintf(w, "\nProducers: None\n") | |||
} else { | } else { | |||
fmt.Fprintf(w, "\nProducers:") | fmt.Fprintf(w, "\nProducers:\n") | |||
for _, client := range producerStats { | for _, client := range stats.Producers { | |||
connectTime := time.Unix(client.ConnectTime, 0) | fmt.Fprintf(w, " %s\n", client) | |||
// truncate to the second | ||||
duration := time.Duration(int64(now.Sub(connectTime).Seco | ||||
nds())) * time.Second | ||||
var totalPubCount uint64 | ||||
for _, v := range client.PubCounts { | ||||
totalPubCount += v.Count | ||||
} | ||||
fmt.Fprintf(w, "\n [%s %-21s] msgs: %-8d connected: %s\ | ||||
n", | ||||
client.Version, | ||||
client.ClientID, | ||||
totalPubCount, | ||||
duration, | ||||
) | ||||
for _, v := range client.PubCounts { | ||||
fmt.Fprintf(w, " [%-15s] msgs: %-8d\n", | ||||
v.Topic, | ||||
v.Count, | ||||
) | ||||
} | ||||
} | } | |||
} | } | |||
return buf.Bytes() | return buf.Bytes() | |||
} | } | |||
func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr outer.Params) (interface{}, error) { | func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr outer.Params) (interface{}, error) { | |||
opt := ps.ByName("opt") | opt := ps.ByName("opt") | |||
if req.Method == "PUT" { | if req.Method == "PUT" { | |||
// add 1 so that it's greater than our max when we test for it | // add 1 so that it's greater than our max when we test for it | |||
// (LimitReader returns a "fake" EOF) | // (LimitReader returns a "fake" EOF) | |||
readMax := s.ctx.nsqd.getOpts().MaxMsgSize + 1 | readMax := s.nsqd.getOpts().MaxMsgSize + 1 | |||
body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax)) | body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax)) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{500, "INTERNAL_ERROR"} | return nil, http_api.Err{500, "INTERNAL_ERROR"} | |||
} | } | |||
if int64(len(body)) == readMax || len(body) == 0 { | if int64(len(body)) == readMax || len(body) == 0 { | |||
return nil, http_api.Err{413, "INVALID_VALUE"} | return nil, http_api.Err{413, "INVALID_VALUE"} | |||
} | } | |||
opts := *s.ctx.nsqd.getOpts() | opts := *s.nsqd.getOpts() | |||
switch opt { | switch opt { | |||
case "nsqlookupd_tcp_addresses": | case "nsqlookupd_tcp_addresses": | |||
err := json.Unmarshal(body, &opts.NSQLookupdTCPAddresses) | err := json.Unmarshal(body, &opts.NSQLookupdTCPAddresses) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "INVALID_VALUE"} | return nil, http_api.Err{400, "INVALID_VALUE"} | |||
} | } | |||
case "log_level": | case "log_level": | |||
logLevelStr := string(body) | logLevelStr := string(body) | |||
logLevel, err := lg.ParseLogLevel(logLevelStr) | logLevel, err := lg.ParseLogLevel(logLevelStr) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "INVALID_VALUE"} | return nil, http_api.Err{400, "INVALID_VALUE"} | |||
} | } | |||
opts.LogLevel = logLevel | opts.LogLevel = logLevel | |||
default: | default: | |||
return nil, http_api.Err{400, "INVALID_OPTION"} | return nil, http_api.Err{400, "INVALID_OPTION"} | |||
} | } | |||
s.ctx.nsqd.swapOpts(&opts) | s.nsqd.swapOpts(&opts) | |||
s.ctx.nsqd.triggerOptsNotification() | s.nsqd.triggerOptsNotification() | |||
} | } | |||
v, ok := getOptByCfgName(s.ctx.nsqd.getOpts(), opt) | v, ok := getOptByCfgName(s.nsqd.getOpts(), opt) | |||
if !ok { | if !ok { | |||
return nil, http_api.Err{400, "INVALID_OPTION"} | return nil, http_api.Err{400, "INVALID_OPTION"} | |||
} | } | |||
return v, nil | return v, nil | |||
} | } | |||
func getOptByCfgName(opts interface{}, name string) (interface{}, bool) { | func getOptByCfgName(opts interface{}, name string) (interface{}, bool) { | |||
val := reflect.ValueOf(opts).Elem() | val := reflect.ValueOf(opts).Elem() | |||
typ := val.Type() | typ := val.Type() | |||
End of changes. 53 change blocks. | ||||
159 lines changed or deleted | 87 lines changed or added |