"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "nsqd/http.go" between
nsq-1.2.0.tar.gz and nsq-1.2.1.tar.gz

About: nsq is a realtime distributed and and decentralized messaging platform.

http.go  (nsq-1.2.0):http.go  (nsq-1.2.1)
skipping to change at line 35 skipping to change at line 35
) )
var boolParams = map[string]bool{ var boolParams = map[string]bool{
"true": true, "true": true,
"1": true, "1": true,
"false": false, "false": false,
"0": false, "0": false,
} }
type httpServer struct { type httpServer struct {
ctx *context nsqd *NSQD
tlsEnabled bool tlsEnabled bool
tlsRequired bool tlsRequired bool
router http.Handler router http.Handler
} }
func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer func newHTTPServer(nsqd *NSQD, tlsEnabled bool, tlsRequired bool) *httpServer {
{ log := http_api.Log(nsqd.logf)
log := http_api.Log(ctx.nsqd.logf)
router := httprouter.New() router := httprouter.New()
router.HandleMethodNotAllowed = true router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf) router.PanicHandler = http_api.LogPanicHandler(nsqd.logf)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf) router.NotFound = http_api.LogNotFoundHandler(nsqd.logf)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.lo router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(nsqd.logf)
gf)
s := &httpServer{ s := &httpServer{
ctx: ctx, nsqd: nsqd,
tlsEnabled: tlsEnabled, tlsEnabled: tlsEnabled,
tlsRequired: tlsRequired, tlsRequired: tlsRequired,
router: router, router: router,
} }
router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_ api.PlainText)) router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_ api.PlainText))
router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V 1)) router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V 1))
// v1 negotiate // v1 negotiate
router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1)) router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
skipping to change at line 105 skipping to change at line 105
if err != nil { if err != nil {
return nil, http_api.Err{http.StatusBadRequest, fmt.Sprintf("inva lid block rate : %s", err.Error())} return nil, http_api.Err{http.StatusBadRequest, fmt.Sprintf("inva lid block rate : %s", err.Error())}
} }
runtime.SetBlockProfileRate(rate) runtime.SetBlockProfileRate(rate)
return nil, nil return nil, nil
} }
func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if !s.tlsEnabled && s.tlsRequired { if !s.tlsEnabled && s.tlsRequired {
resp := fmt.Sprintf(`{"message": "TLS_REQUIRED", "https_port": %d }`, resp := fmt.Sprintf(`{"message": "TLS_REQUIRED", "https_port": %d }`,
s.ctx.nsqd.RealHTTPSAddr().Port) s.nsqd.RealHTTPSAddr().Port)
w.Header().Set("X-NSQ-Content-Type", "nsq; version=1.0") w.Header().Set("X-NSQ-Content-Type", "nsq; version=1.0")
w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(403) w.WriteHeader(403)
io.WriteString(w, resp) io.WriteString(w, resp)
return return
} }
s.router.ServeHTTP(w, req) s.router.ServeHTTP(w, req)
} }
func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps ht tprouter.Params) (interface{}, error) { func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps ht tprouter.Params) (interface{}, error) {
health := s.ctx.nsqd.GetHealth() health := s.nsqd.GetHealth()
if !s.ctx.nsqd.IsHealthy() { if !s.nsqd.IsHealthy() {
return nil, http_api.Err{500, health} return nil, http_api.Err{500, health}
} }
return health, nil return health, nil
} }
func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprou ter.Params) (interface{}, error) { func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprou ter.Params) (interface{}, error) {
hostname, err := os.Hostname() hostname, err := os.Hostname()
if err != nil { if err != nil {
return nil, http_api.Err{500, err.Error()} return nil, http_api.Err{500, err.Error()}
} }
return struct { return struct {
Version string `json:"version"` Version string `json:"version"`
BroadcastAddress string `json:"broadcast_address"` BroadcastAddress string `json:"broadcast_address"`
Hostname string `json:"hostname"` Hostname string `json:"hostname"`
HTTPPort int `json:"http_port"` HTTPPort int `json:"http_port"`
TCPPort int `json:"tcp_port"` TCPPort int `json:"tcp_port"`
StartTime int64 `json:"start_time"` StartTime int64 `json:"start_time"`
}{ }{
Version: version.Binary, Version: version.Binary,
BroadcastAddress: s.ctx.nsqd.getOpts().BroadcastAddress, BroadcastAddress: s.nsqd.getOpts().BroadcastAddress,
Hostname: hostname, Hostname: hostname,
TCPPort: s.ctx.nsqd.RealTCPAddr().Port, TCPPort: s.nsqd.RealTCPAddr().Port,
HTTPPort: s.ctx.nsqd.RealHTTPAddr().Port, HTTPPort: s.nsqd.RealHTTPAddr().Port,
StartTime: s.ctx.nsqd.GetStartTime().Unix(), StartTime: s.nsqd.GetStartTime().Unix(),
}, nil }, nil
} }
func (s *httpServer) getExistingTopicFromQuery(req *http.Request) (*http_api.Req Params, *Topic, string, error) { func (s *httpServer) getExistingTopicFromQuery(req *http.Request) (*http_api.Req Params, *Topic, string, error) {
reqParams, err := http_api.NewReqParams(req) reqParams, err := http_api.NewReqParams(req)
if err != nil { if err != nil {
s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err )
return nil, nil, "", http_api.Err{400, "INVALID_REQUEST"} return nil, nil, "", http_api.Err{400, "INVALID_REQUEST"}
} }
topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
if err != nil { if err != nil {
return nil, nil, "", http_api.Err{400, err.Error()} return nil, nil, "", http_api.Err{400, err.Error()}
} }
topic, err := s.ctx.nsqd.GetExistingTopic(topicName) topic, err := s.nsqd.GetExistingTopic(topicName)
if err != nil { if err != nil {
return nil, nil, "", http_api.Err{404, "TOPIC_NOT_FOUND"} return nil, nil, "", http_api.Err{404, "TOPIC_NOT_FOUND"}
} }
return reqParams, topic, channelName, err return reqParams, topic, channelName, err
} }
func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, e rror) { func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, e rror) {
reqParams, err := url.ParseQuery(req.URL.RawQuery) reqParams, err := url.ParseQuery(req.URL.RawQuery)
if err != nil { if err != nil {
s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err )
return nil, nil, http_api.Err{400, "INVALID_REQUEST"} return nil, nil, http_api.Err{400, "INVALID_REQUEST"}
} }
topicNames, ok := reqParams["topic"] topicNames, ok := reqParams["topic"]
if !ok { if !ok {
return nil, nil, http_api.Err{400, "MISSING_ARG_TOPIC"} return nil, nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
} }
topicName := topicNames[0] topicName := topicNames[0]
if !protocol.IsValidTopicName(topicName) { if !protocol.IsValidTopicName(topicName) {
return nil, nil, http_api.Err{400, "INVALID_TOPIC"} return nil, nil, http_api.Err{400, "INVALID_TOPIC"}
} }
return reqParams, s.ctx.nsqd.GetTopic(topicName), nil return reqParams, s.nsqd.GetTopic(topicName), nil
} }
func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout er.Params) (interface{}, error) { func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout er.Params) (interface{}, error) {
// TODO: one day I'd really like to just error on chunked requests // TODO: one day I'd really like to just error on chunked requests
// to be able to fail "too big" requests before we even read // to be able to fail "too big" requests before we even read
if req.ContentLength > s.ctx.nsqd.getOpts().MaxMsgSize { if req.ContentLength > s.nsqd.getOpts().MaxMsgSize {
return nil, http_api.Err{413, "MSG_TOO_BIG"} return nil, http_api.Err{413, "MSG_TOO_BIG"}
} }
// add 1 so that it's greater than our max when we test for it // add 1 so that it's greater than our max when we test for it
// (LimitReader returns a "fake" EOF) // (LimitReader returns a "fake" EOF)
readMax := s.ctx.nsqd.getOpts().MaxMsgSize + 1 readMax := s.nsqd.getOpts().MaxMsgSize + 1
body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax)) body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax))
if err != nil { if err != nil {
return nil, http_api.Err{500, "INTERNAL_ERROR"} return nil, http_api.Err{500, "INTERNAL_ERROR"}
} }
if int64(len(body)) == readMax { if int64(len(body)) == readMax {
return nil, http_api.Err{413, "MSG_TOO_BIG"} return nil, http_api.Err{413, "MSG_TOO_BIG"}
} }
if len(body) == 0 { if len(body) == 0 {
return nil, http_api.Err{400, "MSG_EMPTY"} return nil, http_api.Err{400, "MSG_EMPTY"}
} }
skipping to change at line 220 skipping to change at line 220
} }
var deferred time.Duration var deferred time.Duration
if ds, ok := reqParams["defer"]; ok { if ds, ok := reqParams["defer"]; ok {
var di int64 var di int64
di, err = strconv.ParseInt(ds[0], 10, 64) di, err = strconv.ParseInt(ds[0], 10, 64)
if err != nil { if err != nil {
return nil, http_api.Err{400, "INVALID_DEFER"} return nil, http_api.Err{400, "INVALID_DEFER"}
} }
deferred = time.Duration(di) * time.Millisecond deferred = time.Duration(di) * time.Millisecond
if deferred < 0 || deferred > s.ctx.nsqd.getOpts().MaxReqTimeout { if deferred < 0 || deferred > s.nsqd.getOpts().MaxReqTimeout {
return nil, http_api.Err{400, "INVALID_DEFER"} return nil, http_api.Err{400, "INVALID_DEFER"}
} }
} }
msg := NewMessage(topic.GenerateID(), body) msg := NewMessage(topic.GenerateID(), body)
msg.deferred = deferred msg.deferred = deferred
err = topic.PutMessage(msg) err = topic.PutMessage(msg)
if err != nil { if err != nil {
return nil, http_api.Err{503, "EXITING"} return nil, http_api.Err{503, "EXITING"}
} }
skipping to change at line 242 skipping to change at line 242
return "OK", nil return "OK", nil
} }
func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou ter.Params) (interface{}, error) { func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou ter.Params) (interface{}, error) {
var msgs []*Message var msgs []*Message
var exit bool var exit bool
// TODO: one day I'd really like to just error on chunked requests // TODO: one day I'd really like to just error on chunked requests
// to be able to fail "too big" requests before we even read // to be able to fail "too big" requests before we even read
if req.ContentLength > s.ctx.nsqd.getOpts().MaxBodySize { if req.ContentLength > s.nsqd.getOpts().MaxBodySize {
return nil, http_api.Err{413, "BODY_TOO_BIG"} return nil, http_api.Err{413, "BODY_TOO_BIG"}
} }
reqParams, topic, err := s.getTopicFromQuery(req) reqParams, topic, err := s.getTopicFromQuery(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// text mode is default, but unrecognized binary opt considered true // text mode is default, but unrecognized binary opt considered true
binaryMode := false binaryMode := false
if vals, ok := reqParams["binary"]; ok { if vals, ok := reqParams["binary"]; ok {
if binaryMode, ok = boolParams[vals[0]]; !ok { if binaryMode, ok = boolParams[vals[0]]; !ok {
binaryMode = true binaryMode = true
s.ctx.nsqd.logf(LOG_WARN, "deprecated value '%s' used for /mpub binary param", vals[0]) s.nsqd.logf(LOG_WARN, "deprecated value '%s' used for /mp ub binary param", vals[0])
} }
} }
if binaryMode { if binaryMode {
tmp := make([]byte, 4) tmp := make([]byte, 4)
msgs, err = readMPUB(req.Body, tmp, topic, msgs, err = readMPUB(req.Body, tmp, topic,
s.ctx.nsqd.getOpts().MaxMsgSize, s.ctx.nsqd.getOpts().Max BodySize) s.nsqd.getOpts().MaxMsgSize, s.nsqd.getOpts().MaxBodySize )
if err != nil { if err != nil {
return nil, http_api.Err{413, err.(*protocol.FatalClientE rr).Code[2:]} return nil, http_api.Err{413, err.(*protocol.FatalClientE rr).Code[2:]}
} }
} else { } else {
// add 1 so that it's greater than our max when we test for it // add 1 so that it's greater than our max when we test for it
// (LimitReader returns a "fake" EOF) // (LimitReader returns a "fake" EOF)
readMax := s.ctx.nsqd.getOpts().MaxBodySize + 1 readMax := s.nsqd.getOpts().MaxBodySize + 1
rdr := bufio.NewReader(io.LimitReader(req.Body, readMax)) rdr := bufio.NewReader(io.LimitReader(req.Body, readMax))
total := 0 total := 0
for !exit { for !exit {
var block []byte var block []byte
block, err = rdr.ReadBytes('\n') block, err = rdr.ReadBytes('\n')
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
return nil, http_api.Err{500, "INTERNAL_E RROR"} return nil, http_api.Err{500, "INTERNAL_E RROR"}
} }
exit = true exit = true
skipping to change at line 296 skipping to change at line 296
if len(block) > 0 && block[len(block)-1] == '\n' { if len(block) > 0 && block[len(block)-1] == '\n' {
block = block[:len(block)-1] block = block[:len(block)-1]
} }
// silently discard 0 length messages // silently discard 0 length messages
// this maintains the behavior pre 0.2.22 // this maintains the behavior pre 0.2.22
if len(block) == 0 { if len(block) == 0 {
continue continue
} }
if int64(len(block)) > s.ctx.nsqd.getOpts().MaxMsgSize { if int64(len(block)) > s.nsqd.getOpts().MaxMsgSize {
return nil, http_api.Err{413, "MSG_TOO_BIG"} return nil, http_api.Err{413, "MSG_TOO_BIG"}
} }
msg := NewMessage(topic.GenerateID(), block) msg := NewMessage(topic.GenerateID(), block)
msgs = append(msgs, msg) msgs = append(msgs, msg)
} }
} }
err = topic.PutMessages(msgs) err = topic.PutMessages(msgs)
if err != nil { if err != nil {
skipping to change at line 321 skipping to change at line 321
} }
func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
_, _, err := s.getTopicFromQuery(req) _, _, err := s.getTopicFromQuery(req)
return nil, err return nil, err
} }
func (s *httpServer) doEmptyTopic(w http.ResponseWriter, req *http.Request, ps h ttprouter.Params) (interface{}, error) { func (s *httpServer) doEmptyTopic(w http.ResponseWriter, req *http.Request, ps h ttprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req) reqParams, err := http_api.NewReqParams(req)
if err != nil { if err != nil {
s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err )
return nil, http_api.Err{400, "INVALID_REQUEST"} return nil, http_api.Err{400, "INVALID_REQUEST"}
} }
topicName, err := reqParams.Get("topic") topicName, err := reqParams.Get("topic")
if err != nil { if err != nil {
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
} }
if !protocol.IsValidTopicName(topicName) { if !protocol.IsValidTopicName(topicName) {
return nil, http_api.Err{400, "INVALID_TOPIC"} return nil, http_api.Err{400, "INVALID_TOPIC"}
} }
topic, err := s.ctx.nsqd.GetExistingTopic(topicName) topic, err := s.nsqd.GetExistingTopic(topicName)
if err != nil { if err != nil {
return nil, http_api.Err{404, "TOPIC_NOT_FOUND"} return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
} }
err = topic.Empty() err = topic.Empty()
if err != nil { if err != nil {
return nil, http_api.Err{500, "INTERNAL_ERROR"} return nil, http_api.Err{500, "INTERNAL_ERROR"}
} }
return nil, nil return nil, nil
} }
func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req) reqParams, err := http_api.NewReqParams(req)
if err != nil { if err != nil {
s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err )
return nil, http_api.Err{400, "INVALID_REQUEST"} return nil, http_api.Err{400, "INVALID_REQUEST"}
} }
topicName, err := reqParams.Get("topic") topicName, err := reqParams.Get("topic")
if err != nil { if err != nil {
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
} }
err = s.ctx.nsqd.DeleteExistingTopic(topicName) err = s.nsqd.DeleteExistingTopic(topicName)
if err != nil { if err != nil {
return nil, http_api.Err{404, "TOPIC_NOT_FOUND"} return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
} }
return nil, nil return nil, nil
} }
func (s *httpServer) doPauseTopic(w http.ResponseWriter, req *http.Request, ps h ttprouter.Params) (interface{}, error) { func (s *httpServer) doPauseTopic(w http.ResponseWriter, req *http.Request, ps h ttprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req) reqParams, err := http_api.NewReqParams(req)
if err != nil { if err != nil {
s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err )
return nil, http_api.Err{400, "INVALID_REQUEST"} return nil, http_api.Err{400, "INVALID_REQUEST"}
} }
topicName, err := reqParams.Get("topic") topicName, err := reqParams.Get("topic")
if err != nil { if err != nil {
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
} }
topic, err := s.ctx.nsqd.GetExistingTopic(topicName) topic, err := s.nsqd.GetExistingTopic(topicName)
if err != nil { if err != nil {
return nil, http_api.Err{404, "TOPIC_NOT_FOUND"} return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
} }
if strings.Contains(req.URL.Path, "unpause") { if strings.Contains(req.URL.Path, "unpause") {
err = topic.UnPause() err = topic.UnPause()
} else { } else {
err = topic.Pause() err = topic.Pause()
} }
if err != nil { if err != nil {
s.ctx.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, er r) s.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err)
return nil, http_api.Err{500, "INTERNAL_ERROR"} return nil, http_api.Err{500, "INTERNAL_ERROR"}
} }
// pro-actively persist metadata so in case of process failure // pro-actively persist metadata so in case of process failure
// nsqd won't suddenly (un)pause a topic // nsqd won't suddenly (un)pause a topic
s.ctx.nsqd.Lock() s.nsqd.Lock()
s.ctx.nsqd.PersistMetadata() s.nsqd.PersistMetadata()
s.ctx.nsqd.Unlock() s.nsqd.Unlock()
return nil, nil return nil, nil
} }
func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, p s httprouter.Params) (interface{}, error) { func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, p s httprouter.Params) (interface{}, error) {
_, topic, channelName, err := s.getExistingTopicFromQuery(req) _, topic, channelName, err := s.getExistingTopicFromQuery(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
topic.GetChannel(channelName) topic.GetChannel(channelName)
return nil, nil return nil, nil
skipping to change at line 461 skipping to change at line 461
if err != nil { if err != nil {
return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"} return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
} }
if strings.Contains(req.URL.Path, "unpause") { if strings.Contains(req.URL.Path, "unpause") {
err = channel.UnPause() err = channel.UnPause()
} else { } else {
err = channel.Pause() err = channel.Pause()
} }
if err != nil { if err != nil {
s.ctx.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, er r) s.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err)
return nil, http_api.Err{500, "INTERNAL_ERROR"} return nil, http_api.Err{500, "INTERNAL_ERROR"}
} }
// pro-actively persist metadata so in case of process failure // pro-actively persist metadata so in case of process failure
// nsqd won't suddenly (un)pause a channel // nsqd won't suddenly (un)pause a channel
s.ctx.nsqd.Lock() s.nsqd.Lock()
s.ctx.nsqd.PersistMetadata() s.nsqd.PersistMetadata()
s.ctx.nsqd.Unlock() s.nsqd.Unlock()
return nil, nil return nil, nil
} }
func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro uter.Params) (interface{}, error) { func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro uter.Params) (interface{}, error) {
var producerStats []ClientStats
reqParams, err := http_api.NewReqParams(req) reqParams, err := http_api.NewReqParams(req)
if err != nil { if err != nil {
s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err )
return nil, http_api.Err{400, "INVALID_REQUEST"} return nil, http_api.Err{400, "INVALID_REQUEST"}
} }
formatString, _ := reqParams.Get("format") formatString, _ := reqParams.Get("format")
topicName, _ := reqParams.Get("topic") topicName, _ := reqParams.Get("topic")
channelName, _ := reqParams.Get("channel") channelName, _ := reqParams.Get("channel")
includeClientsParam, _ := reqParams.Get("include_clients") includeClientsParam, _ := reqParams.Get("include_clients")
includeMemParam, _ := reqParams.Get("include_mem")
jsonFormat := formatString == "json" jsonFormat := formatString == "json"
includeClients, ok := boolParams[includeClientsParam] includeClients, ok := boolParams[includeClientsParam]
if !ok { if !ok {
includeClients = true includeClients = true
} }
if includeClients { includeMem, ok := boolParams[includeMemParam]
producerStats = s.ctx.nsqd.GetProducerStats() if !ok {
includeMem = true
} }
stats := s.ctx.nsqd.GetStats(topicName, channelName, includeClients) stats := s.nsqd.GetStats(topicName, channelName, includeClients)
health := s.ctx.nsqd.GetHealth() health := s.nsqd.GetHealth()
startTime := s.ctx.nsqd.GetStartTime() startTime := s.nsqd.GetStartTime()
uptime := time.Since(startTime) uptime := time.Since(startTime)
// filter by topic (if specified) var ms *memStats
if len(topicName) > 0 { if includeMem {
for _, topicStats := range stats { m := getMemStats()
if topicStats.TopicName == topicName { ms = &m
// filter by channel (if specified)
if len(channelName) > 0 {
for _, channelStats := range topicStats.C
hannels {
if channelStats.ChannelName == ch
annelName {
topicStats.Channels = []C
hannelStats{channelStats}
break
}
}
}
stats = []TopicStats{topicStats}
break
}
}
filteredProducerStats := make([]ClientStats, 0)
for _, clientStat := range producerStats {
var found bool
var count uint64
for _, v := range clientStat.PubCounts {
if v.Topic == topicName {
count = v.Count
found = true
break
}
}
if !found {
continue
}
clientStat.PubCounts = []PubCount{PubCount{
Topic: topicName,
Count: count,
}}
filteredProducerStats = append(filteredProducerStats, cli
entStat)
}
producerStats = filteredProducerStats
} }
ms := getMemStats()
if !jsonFormat { if !jsonFormat {
return s.printStats(stats, producerStats, ms, health, startTime, uptime), nil return s.printStats(stats, ms, health, startTime, uptime), nil
} }
// TODO: should producer stats be hung off topics?
return struct { return struct {
Version string `json:"version"` Version string `json:"version"`
Health string `json:"health"` Health string `json:"health"`
StartTime int64 `json:"start_time"` StartTime int64 `json:"start_time"`
Topics []TopicStats `json:"topics"` Topics []TopicStats `json:"topics"`
Memory memStats `json:"memory"` Memory *memStats `json:"memory,omitempty"`
Producers []ClientStats `json:"producers"` Producers []ClientStats `json:"producers"`
}{version.Binary, health, startTime.Unix(), stats, ms, producerStats}, ni l }{version.Binary, health, startTime.Unix(), stats.Topics, ms, stats.Produ cers}, nil
} }
func (s *httpServer) printStats(stats []TopicStats, producerStats []ClientStats, ms memStats, health string, startTime time.Time, uptime time.Duration) []byte { func (s *httpServer) printStats(stats Stats, ms *memStats, health string, startT ime time.Time, uptime time.Duration) []byte {
var buf bytes.Buffer var buf bytes.Buffer
w := &buf w := &buf
now := time.Now()
fmt.Fprintf(w, "%s\n", version.String("nsqd")) fmt.Fprintf(w, "%s\n", version.String("nsqd"))
fmt.Fprintf(w, "start_time %v\n", startTime.Format(time.RFC3339)) fmt.Fprintf(w, "start_time %v\n", startTime.Format(time.RFC3339))
fmt.Fprintf(w, "uptime %s\n", uptime) fmt.Fprintf(w, "uptime %s\n", uptime)
fmt.Fprintf(w, "\nHealth: %s\n", health) fmt.Fprintf(w, "\nHealth: %s\n", health)
fmt.Fprintf(w, "\nMemory:\n") if ms != nil {
fmt.Fprintf(w, " %-25s\t%d\n", "heap_objects", ms.HeapObjects) fmt.Fprintf(w, "\nMemory:\n")
fmt.Fprintf(w, " %-25s\t%d\n", "heap_idle_bytes", ms.HeapIdleBytes) fmt.Fprintf(w, " %-25s\t%d\n", "heap_objects", ms.HeapObjects)
fmt.Fprintf(w, " %-25s\t%d\n", "heap_in_use_bytes", ms.HeapInUseBytes) fmt.Fprintf(w, " %-25s\t%d\n", "heap_idle_bytes", ms.HeapIdleBy
fmt.Fprintf(w, " %-25s\t%d\n", "heap_released_bytes", ms.HeapReleasedBy tes)
tes) fmt.Fprintf(w, " %-25s\t%d\n", "heap_in_use_bytes", ms.HeapInUs
fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_100", ms.GCPauseUsec100) eBytes)
fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_99", ms.GCPauseUsec99) fmt.Fprintf(w, " %-25s\t%d\n", "heap_released_bytes", ms.HeapRe
fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_95", ms.GCPauseUsec95) leasedBytes)
fmt.Fprintf(w, " %-25s\t%d\n", "next_gc_bytes", ms.NextGCBytes) fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_100", ms.GCPauseU
fmt.Fprintf(w, " %-25s\t%d\n", "gc_total_runs", ms.GCTotalRuns) sec100)
fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_99", ms.GCPauseUs
ec99)
fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_95", ms.GCPauseUs
ec95)
fmt.Fprintf(w, " %-25s\t%d\n", "next_gc_bytes", ms.NextGCBytes)
fmt.Fprintf(w, " %-25s\t%d\n", "gc_total_runs", ms.GCTotalRuns)
}
if len(stats) == 0 { if len(stats.Topics) == 0 {
fmt.Fprintf(w, "\nTopics: None\n") fmt.Fprintf(w, "\nTopics: None\n")
} else { } else {
fmt.Fprintf(w, "\nTopics:") fmt.Fprintf(w, "\nTopics:")
} }
for _, t := range stats { for _, t := range stats.Topics {
var pausedPrefix string var pausedPrefix string
if t.Paused { if t.Paused {
pausedPrefix = "*P " pausedPrefix = "*P "
} else { } else {
pausedPrefix = " " pausedPrefix = " "
} }
fmt.Fprintf(w, "\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n", fmt.Fprintf(w, "\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n",
pausedPrefix, pausedPrefix,
t.TopicName, t.TopicName,
t.Depth, t.Depth,
skipping to change at line 619 skipping to change at line 583
c.Depth, c.Depth,
c.BackendDepth, c.BackendDepth,
c.InFlightCount, c.InFlightCount,
c.DeferredCount, c.DeferredCount,
c.RequeueCount, c.RequeueCount,
c.TimeoutCount, c.TimeoutCount,
c.MessageCount, c.MessageCount,
c.E2eProcessingLatency, c.E2eProcessingLatency,
) )
for _, client := range c.Clients { for _, client := range c.Clients {
connectTime := time.Unix(client.ConnectTime, 0) fmt.Fprintf(w, " %s\n", client)
// truncate to the second
duration := time.Duration(int64(now.Sub(connectTi
me).Seconds())) * time.Second
fmt.Fprintf(w, " [%s %-21s] state: %d infl
t: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s\n",
client.Version,
client.ClientID,
client.State,
client.InFlightCount,
client.ReadyCount,
client.FinishCount,
client.RequeueCount,
client.MessageCount,
duration,
)
} }
} }
} }
if len(producerStats) == 0 { if len(stats.Producers) == 0 {
fmt.Fprintf(w, "\nProducers: None\n") fmt.Fprintf(w, "\nProducers: None\n")
} else { } else {
fmt.Fprintf(w, "\nProducers:") fmt.Fprintf(w, "\nProducers:\n")
for _, client := range producerStats { for _, client := range stats.Producers {
connectTime := time.Unix(client.ConnectTime, 0) fmt.Fprintf(w, " %s\n", client)
// truncate to the second
duration := time.Duration(int64(now.Sub(connectTime).Seco
nds())) * time.Second
var totalPubCount uint64
for _, v := range client.PubCounts {
totalPubCount += v.Count
}
fmt.Fprintf(w, "\n [%s %-21s] msgs: %-8d connected: %s\
n",
client.Version,
client.ClientID,
totalPubCount,
duration,
)
for _, v := range client.PubCounts {
fmt.Fprintf(w, " [%-15s] msgs: %-8d\n",
v.Topic,
v.Count,
)
}
} }
} }
return buf.Bytes() return buf.Bytes()
} }
func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr outer.Params) (interface{}, error) { func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr outer.Params) (interface{}, error) {
opt := ps.ByName("opt") opt := ps.ByName("opt")
if req.Method == "PUT" { if req.Method == "PUT" {
// add 1 so that it's greater than our max when we test for it // add 1 so that it's greater than our max when we test for it
// (LimitReader returns a "fake" EOF) // (LimitReader returns a "fake" EOF)
readMax := s.ctx.nsqd.getOpts().MaxMsgSize + 1 readMax := s.nsqd.getOpts().MaxMsgSize + 1
body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax)) body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax))
if err != nil { if err != nil {
return nil, http_api.Err{500, "INTERNAL_ERROR"} return nil, http_api.Err{500, "INTERNAL_ERROR"}
} }
if int64(len(body)) == readMax || len(body) == 0 { if int64(len(body)) == readMax || len(body) == 0 {
return nil, http_api.Err{413, "INVALID_VALUE"} return nil, http_api.Err{413, "INVALID_VALUE"}
} }
opts := *s.ctx.nsqd.getOpts() opts := *s.nsqd.getOpts()
switch opt { switch opt {
case "nsqlookupd_tcp_addresses": case "nsqlookupd_tcp_addresses":
err := json.Unmarshal(body, &opts.NSQLookupdTCPAddresses) err := json.Unmarshal(body, &opts.NSQLookupdTCPAddresses)
if err != nil { if err != nil {
return nil, http_api.Err{400, "INVALID_VALUE"} return nil, http_api.Err{400, "INVALID_VALUE"}
} }
case "log_level": case "log_level":
logLevelStr := string(body) logLevelStr := string(body)
logLevel, err := lg.ParseLogLevel(logLevelStr) logLevel, err := lg.ParseLogLevel(logLevelStr)
if err != nil { if err != nil {
return nil, http_api.Err{400, "INVALID_VALUE"} return nil, http_api.Err{400, "INVALID_VALUE"}
} }
opts.LogLevel = logLevel opts.LogLevel = logLevel
default: default:
return nil, http_api.Err{400, "INVALID_OPTION"} return nil, http_api.Err{400, "INVALID_OPTION"}
} }
s.ctx.nsqd.swapOpts(&opts) s.nsqd.swapOpts(&opts)
s.ctx.nsqd.triggerOptsNotification() s.nsqd.triggerOptsNotification()
} }
v, ok := getOptByCfgName(s.ctx.nsqd.getOpts(), opt) v, ok := getOptByCfgName(s.nsqd.getOpts(), opt)
if !ok { if !ok {
return nil, http_api.Err{400, "INVALID_OPTION"} return nil, http_api.Err{400, "INVALID_OPTION"}
} }
return v, nil return v, nil
} }
func getOptByCfgName(opts interface{}, name string) (interface{}, bool) { func getOptByCfgName(opts interface{}, name string) (interface{}, bool) {
val := reflect.ValueOf(opts).Elem() val := reflect.ValueOf(opts).Elem()
typ := val.Type() typ := val.Type()
 End of changes. 53 change blocks. 
159 lines changed or deleted 87 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)