http.go (nsq-1.2.0) | : | http.go (nsq-1.2.1) | ||
---|---|---|---|---|
skipping to change at line 51 | skipping to change at line 51 | |||
req.SetBasicAuth(target.User.Username(), passwd) | req.SetBasicAuth(target.User.Username(), passwd) | |||
} | } | |||
} | } | |||
return &httputil.ReverseProxy{ | return &httputil.ReverseProxy{ | |||
Director: director, | Director: director, | |||
Transport: http_api.NewDeadlineTransport(connectTimeout, requestT imeout), | Transport: http_api.NewDeadlineTransport(connectTimeout, requestT imeout), | |||
} | } | |||
} | } | |||
type httpServer struct { | type httpServer struct { | |||
ctx *Context | nsqadmin *NSQAdmin | |||
router http.Handler | router http.Handler | |||
client *http_api.Client | client *http_api.Client | |||
ci *clusterinfo.ClusterInfo | ci *clusterinfo.ClusterInfo | |||
basePath string | basePath string | |||
} | } | |||
func NewHTTPServer(ctx *Context) *httpServer { | func NewHTTPServer(nsqadmin *NSQAdmin) *httpServer { | |||
log := http_api.Log(ctx.nsqadmin.logf) | log := http_api.Log(nsqadmin.logf) | |||
client := http_api.NewClient(ctx.nsqadmin.httpClientTLSConfig, ctx.nsqadm | client := http_api.NewClient(nsqadmin.httpClientTLSConfig, nsqadmin.getOp | |||
in.getOpts().HTTPClientConnectTimeout, | ts().HTTPClientConnectTimeout, | |||
ctx.nsqadmin.getOpts().HTTPClientRequestTimeout) | nsqadmin.getOpts().HTTPClientRequestTimeout) | |||
router := httprouter.New() | router := httprouter.New() | |||
router.HandleMethodNotAllowed = true | router.HandleMethodNotAllowed = true | |||
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqadmin.logf) | router.PanicHandler = http_api.LogPanicHandler(nsqadmin.logf) | |||
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqadmin.logf) | router.NotFound = http_api.LogNotFoundHandler(nsqadmin.logf) | |||
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqadmi | router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(nsqadmin.lo | |||
n.logf) | gf) | |||
s := &httpServer{ | s := &httpServer{ | |||
ctx: ctx, | nsqadmin: nsqadmin, | |||
router: router, | router: router, | |||
client: client, | client: client, | |||
ci: clusterinfo.New(ctx.nsqadmin.logf, client), | ci: clusterinfo.New(nsqadmin.logf, client), | |||
basePath: ctx.nsqadmin.getOpts().BasePath, | basePath: nsqadmin.getOpts().BasePath, | |||
} | } | |||
bp := func(p string) string { | bp := func(p string) string { | |||
return path.Join(s.basePath, p) | return path.Join(s.basePath, p) | |||
} | } | |||
router.Handle("GET", bp("/"), http_api.Decorate(s.indexHandler, log)) | router.Handle("GET", bp("/"), http_api.Decorate(s.indexHandler, log)) | |||
router.Handle("GET", bp("/ping"), http_api.Decorate(s.pingHandler, log, h ttp_api.PlainText)) | router.Handle("GET", bp("/ping"), http_api.Decorate(s.pingHandler, log, h ttp_api.PlainText)) | |||
router.Handle("GET", bp("/topics"), http_api.Decorate(s.indexHandler, log )) | router.Handle("GET", bp("/topics"), http_api.Decorate(s.indexHandler, log )) | |||
router.Handle("GET", bp("/topics/:topic"), http_api.Decorate(s.indexHandl er, log)) | router.Handle("GET", bp("/topics/:topic"), http_api.Decorate(s.indexHandl er, log)) | |||
router.Handle("GET", bp("/topics/:topic/:channel"), http_api.Decorate(s.i ndexHandler, log)) | router.Handle("GET", bp("/topics/:topic/:channel"), http_api.Decorate(s.i ndexHandler, log)) | |||
router.Handle("GET", bp("/nodes"), http_api.Decorate(s.indexHandler, log) ) | router.Handle("GET", bp("/nodes"), http_api.Decorate(s.indexHandler, log) ) | |||
router.Handle("GET", bp("/nodes/:node"), http_api.Decorate(s.indexHandler , log)) | router.Handle("GET", bp("/nodes/:node"), http_api.Decorate(s.indexHandler , log)) | |||
router.Handle("GET", bp("/counter"), http_api.Decorate(s.indexHandler, lo g)) | router.Handle("GET", bp("/counter"), http_api.Decorate(s.indexHandler, lo g)) | |||
router.Handle("GET", bp("/lookup"), http_api.Decorate(s.indexHandler, log )) | router.Handle("GET", bp("/lookup"), http_api.Decorate(s.indexHandler, log )) | |||
router.Handle("GET", bp("/static/:asset"), http_api.Decorate(s.staticAsse tHandler, log, http_api.PlainText)) | router.Handle("GET", bp("/static/:asset"), http_api.Decorate(s.staticAsse tHandler, log, http_api.PlainText)) | |||
router.Handle("GET", bp("/fonts/:asset"), http_api.Decorate(s.staticAsset Handler, log, http_api.PlainText)) | router.Handle("GET", bp("/fonts/:asset"), http_api.Decorate(s.staticAsset Handler, log, http_api.PlainText)) | |||
if s.ctx.nsqadmin.getOpts().ProxyGraphite { | if s.nsqadmin.getOpts().ProxyGraphite { | |||
proxy := NewSingleHostReverseProxy(ctx.nsqadmin.graphiteURL, ctx. | proxy := NewSingleHostReverseProxy(nsqadmin.graphiteURL, nsqadmin | |||
nsqadmin.getOpts().HTTPClientConnectTimeout, | .getOpts().HTTPClientConnectTimeout, | |||
ctx.nsqadmin.getOpts().HTTPClientRequestTimeout) | nsqadmin.getOpts().HTTPClientRequestTimeout) | |||
router.Handler("GET", bp("/render"), proxy) | router.Handler("GET", bp("/render"), proxy) | |||
} | } | |||
// v1 endpoints | // v1 endpoints | |||
router.Handle("GET", bp("/api/topics"), http_api.Decorate(s.topicsHandler , log, http_api.V1)) | router.Handle("GET", bp("/api/topics"), http_api.Decorate(s.topicsHandler , log, http_api.V1)) | |||
router.Handle("GET", bp("/api/topics/:topic"), http_api.Decorate(s.topicH andler, log, http_api.V1)) | router.Handle("GET", bp("/api/topics/:topic"), http_api.Decorate(s.topicH andler, log, http_api.V1)) | |||
router.Handle("GET", bp("/api/topics/:topic/:channel"), http_api.Decorate (s.channelHandler, log, http_api.V1)) | router.Handle("GET", bp("/api/topics/:topic/:channel"), http_api.Decorate (s.channelHandler, log, http_api.V1)) | |||
router.Handle("GET", bp("/api/nodes"), http_api.Decorate(s.nodesHandler, log, http_api.V1)) | router.Handle("GET", bp("/api/nodes"), http_api.Decorate(s.nodesHandler, log, http_api.V1)) | |||
router.Handle("GET", bp("/api/nodes/:node"), http_api.Decorate(s.nodeHand ler, log, http_api.V1)) | router.Handle("GET", bp("/api/nodes/:node"), http_api.Decorate(s.nodeHand ler, log, http_api.V1)) | |||
router.Handle("POST", bp("/api/topics"), http_api.Decorate(s.createTopicC hannelHandler, log, http_api.V1)) | router.Handle("POST", bp("/api/topics"), http_api.Decorate(s.createTopicC hannelHandler, log, http_api.V1)) | |||
skipping to change at line 150 | skipping to change at line 150 | |||
GraphEnabled bool | GraphEnabled bool | |||
GraphiteURL string | GraphiteURL string | |||
StatsdInterval int | StatsdInterval int | |||
StatsdCounterFormat string | StatsdCounterFormat string | |||
StatsdGaugeFormat string | StatsdGaugeFormat string | |||
StatsdPrefix string | StatsdPrefix string | |||
NSQLookupd []string | NSQLookupd []string | |||
IsAdmin bool | IsAdmin bool | |||
}{ | }{ | |||
Version: version.Binary, | Version: version.Binary, | |||
ProxyGraphite: s.ctx.nsqadmin.getOpts().ProxyGraphite, | ProxyGraphite: s.nsqadmin.getOpts().ProxyGraphite, | |||
GraphEnabled: s.ctx.nsqadmin.getOpts().GraphiteURL != "", | GraphEnabled: s.nsqadmin.getOpts().GraphiteURL != "", | |||
GraphiteURL: s.ctx.nsqadmin.getOpts().GraphiteURL, | GraphiteURL: s.nsqadmin.getOpts().GraphiteURL, | |||
StatsdInterval: int(s.ctx.nsqadmin.getOpts().StatsdInterval | StatsdInterval: int(s.nsqadmin.getOpts().StatsdInterval / ti | |||
/ time.Second), | me.Second), | |||
StatsdCounterFormat: s.ctx.nsqadmin.getOpts().StatsdCounterFormat | StatsdCounterFormat: s.nsqadmin.getOpts().StatsdCounterFormat, | |||
, | StatsdGaugeFormat: s.nsqadmin.getOpts().StatsdGaugeFormat, | |||
StatsdGaugeFormat: s.ctx.nsqadmin.getOpts().StatsdGaugeFormat, | StatsdPrefix: s.nsqadmin.getOpts().StatsdPrefix, | |||
StatsdPrefix: s.ctx.nsqadmin.getOpts().StatsdPrefix, | NSQLookupd: s.nsqadmin.getOpts().NSQLookupdHTTPAddresses | |||
NSQLookupd: s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddre | , | |||
sses, | ||||
IsAdmin: s.isAuthorizedAdminRequest(req), | IsAdmin: s.isAuthorizedAdminRequest(req), | |||
}) | }) | |||
return nil, nil | return nil, nil | |||
} | } | |||
func (s *httpServer) staticAssetHandler(w http.ResponseWriter, req *http.Request , ps httprouter.Params) (interface{}, error) { | func (s *httpServer) staticAssetHandler(w http.ResponseWriter, req *http.Request , ps httprouter.Params) (interface{}, error) { | |||
assetName := ps.ByName("asset") | assetName := ps.ByName("asset") | |||
asset, err := Asset(assetName) | asset, err := staticAsset(assetName) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{404, "NOT_FOUND"} | return nil, http_api.Err{404, "NOT_FOUND"} | |||
} | } | |||
ext := path.Ext(assetName) | ext := path.Ext(assetName) | |||
ct := mime.TypeByExtension(ext) | ct := mime.TypeByExtension(ext) | |||
if ct == "" { | if ct == "" { | |||
switch ext { | switch ext { | |||
case ".map": | ||||
ct = "application/json" | ||||
case ".svg": | case ".svg": | |||
ct = "image/svg+xml" | ct = "image/svg+xml" | |||
case ".woff": | case ".woff": | |||
ct = "application/font-woff" | ct = "application/font-woff" | |||
case ".ttf": | case ".ttf": | |||
ct = "application/font-sfnt" | ct = "application/font-sfnt" | |||
case ".eot": | case ".eot": | |||
ct = "application/vnd.ms-fontobject" | ct = "application/vnd.ms-fontobject" | |||
case ".woff2": | case ".woff2": | |||
ct = "application/font-woff2" | ct = "application/font-woff2" | |||
skipping to change at line 204 | skipping to change at line 206 | |||
func (s *httpServer) topicsHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { | func (s *httpServer) topicsHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { | |||
var messages []string | var messages []string | |||
reqParams, err := http_api.NewReqParams(req) | reqParams, err := http_api.NewReqParams(req) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, err.Error()} | return nil, http_api.Err{400, err.Error()} | |||
} | } | |||
var topics []string | var topics []string | |||
if len(s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses) != 0 { | if len(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) != 0 { | |||
topics, err = s.ci.GetLookupdTopics(s.ctx.nsqadmin.getOpts().NSQL | topics, err = s.ci.GetLookupdTopics(s.nsqadmin.getOpts().NSQLooku | |||
ookupdHTTPAddresses) | pdHTTPAddresses) | |||
} else { | } else { | |||
topics, err = s.ci.GetNSQDTopics(s.ctx.nsqadmin.getOpts().NSQDHTT PAddresses) | topics, err = s.ci.GetNSQDTopics(s.nsqadmin.getOpts().NSQDHTTPAdd resses) | |||
} | } | |||
if err != nil { | if err != nil { | |||
pe, ok := err.(clusterinfo.PartialErr) | pe, ok := err.(clusterinfo.PartialErr) | |||
if !ok { | if !ok { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get topics - %s ", err) | s.nsqadmin.logf(LOG_ERROR, "failed to get topics - %s", e rr) | |||
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | |||
} | } | |||
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) | s.nsqadmin.logf(LOG_WARN, "%s", err) | |||
messages = append(messages, pe.Error()) | messages = append(messages, pe.Error()) | |||
} | } | |||
inactive, _ := reqParams.Get("inactive") | inactive, _ := reqParams.Get("inactive") | |||
if inactive == "true" { | if inactive == "true" { | |||
topicChannelMap := make(map[string][]string) | topicChannelMap := make(map[string][]string) | |||
if len(s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses) == 0 { | if len(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) == 0 { | |||
goto respond | goto respond | |||
} | } | |||
for _, topicName := range topics { | for _, topicName := range topics { | |||
producers, _ := s.ci.GetLookupdTopicProducers( | producers, _ := s.ci.GetLookupdTopicProducers( | |||
topicName, s.ctx.nsqadmin.getOpts().NSQLookupdHTT PAddresses) | topicName, s.nsqadmin.getOpts().NSQLookupdHTTPAdd resses) | |||
if len(producers) == 0 { | if len(producers) == 0 { | |||
topicChannels, _ := s.ci.GetLookupdTopicChannels( | topicChannels, _ := s.ci.GetLookupdTopicChannels( | |||
topicName, s.ctx.nsqadmin.getOpts().NSQLo okupdHTTPAddresses) | topicName, s.nsqadmin.getOpts().NSQLookup dHTTPAddresses) | |||
topicChannelMap[topicName] = topicChannels | topicChannelMap[topicName] = topicChannels | |||
} | } | |||
} | } | |||
respond: | respond: | |||
return struct { | return struct { | |||
Topics map[string][]string `json:"topics"` | Topics map[string][]string `json:"topics"` | |||
Message string `json:"message"` | Message string `json:"message"` | |||
}{topicChannelMap, maybeWarnMsg(messages)}, nil | }{topicChannelMap, maybeWarnMsg(messages)}, nil | |||
} | } | |||
skipping to change at line 253 | skipping to change at line 255 | |||
Message string `json:"message"` | Message string `json:"message"` | |||
}{topics, maybeWarnMsg(messages)}, nil | }{topics, maybeWarnMsg(messages)}, nil | |||
} | } | |||
func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps h ttprouter.Params) (interface{}, error) { | func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps h ttprouter.Params) (interface{}, error) { | |||
var messages []string | var messages []string | |||
topicName := ps.ByName("topic") | topicName := ps.ByName("topic") | |||
producers, err := s.ci.GetTopicProducers(topicName, | producers, err := s.ci.GetTopicProducers(topicName, | |||
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | |||
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) | s.nsqadmin.getOpts().NSQDHTTPAddresses) | |||
if err != nil { | if err != nil { | |||
pe, ok := err.(clusterinfo.PartialErr) | pe, ok := err.(clusterinfo.PartialErr) | |||
if !ok { | if !ok { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get topic produ cers - %s", err) | s.nsqadmin.logf(LOG_ERROR, "failed to get topic producers - %s", err) | |||
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | |||
} | } | |||
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) | s.nsqadmin.logf(LOG_WARN, "%s", err) | |||
messages = append(messages, pe.Error()) | messages = append(messages, pe.Error()) | |||
} | } | |||
topicStats, _, err := s.ci.GetNSQDStats(producers, topicName, "", false) | topicStats, _, err := s.ci.GetNSQDStats(producers, topicName, "", false) | |||
if err != nil { | if err != nil { | |||
pe, ok := err.(clusterinfo.PartialErr) | pe, ok := err.(clusterinfo.PartialErr) | |||
if !ok { | if !ok { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get topic metad ata - %s", err) | s.nsqadmin.logf(LOG_ERROR, "failed to get topic metadata - %s", err) | |||
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | |||
} | } | |||
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) | s.nsqadmin.logf(LOG_WARN, "%s", err) | |||
messages = append(messages, pe.Error()) | messages = append(messages, pe.Error()) | |||
} | } | |||
allNodesTopicStats := &clusterinfo.TopicStats{TopicName: topicName} | allNodesTopicStats := &clusterinfo.TopicStats{TopicName: topicName} | |||
for _, t := range topicStats { | for _, t := range topicStats { | |||
allNodesTopicStats.Add(t) | allNodesTopicStats.Add(t) | |||
} | } | |||
return struct { | return struct { | |||
*clusterinfo.TopicStats | *clusterinfo.TopicStats | |||
skipping to change at line 293 | skipping to change at line 295 | |||
}{allNodesTopicStats, maybeWarnMsg(messages)}, nil | }{allNodesTopicStats, maybeWarnMsg(messages)}, nil | |||
} | } | |||
func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { | func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { | |||
var messages []string | var messages []string | |||
topicName := ps.ByName("topic") | topicName := ps.ByName("topic") | |||
channelName := ps.ByName("channel") | channelName := ps.ByName("channel") | |||
producers, err := s.ci.GetTopicProducers(topicName, | producers, err := s.ci.GetTopicProducers(topicName, | |||
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | |||
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) | s.nsqadmin.getOpts().NSQDHTTPAddresses) | |||
if err != nil { | if err != nil { | |||
pe, ok := err.(clusterinfo.PartialErr) | pe, ok := err.(clusterinfo.PartialErr) | |||
if !ok { | if !ok { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get topic produ cers - %s", err) | s.nsqadmin.logf(LOG_ERROR, "failed to get topic producers - %s", err) | |||
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | |||
} | } | |||
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) | s.nsqadmin.logf(LOG_WARN, "%s", err) | |||
messages = append(messages, pe.Error()) | messages = append(messages, pe.Error()) | |||
} | } | |||
_, channelStats, err := s.ci.GetNSQDStats(producers, topicName, channelNa me, true) | _, channelStats, err := s.ci.GetNSQDStats(producers, topicName, channelNa me, true) | |||
if err != nil { | if err != nil { | |||
pe, ok := err.(clusterinfo.PartialErr) | pe, ok := err.(clusterinfo.PartialErr) | |||
if !ok { | if !ok { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get channel met adata - %s", err) | s.nsqadmin.logf(LOG_ERROR, "failed to get channel metadat a - %s", err) | |||
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | |||
} | } | |||
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) | s.nsqadmin.logf(LOG_WARN, "%s", err) | |||
messages = append(messages, pe.Error()) | messages = append(messages, pe.Error()) | |||
} | } | |||
return struct { | return struct { | |||
*clusterinfo.ChannelStats | *clusterinfo.ChannelStats | |||
Message string `json:"message"` | Message string `json:"message"` | |||
}{channelStats[channelName], maybeWarnMsg(messages)}, nil | }{channelStats[channelName], maybeWarnMsg(messages)}, nil | |||
} | } | |||
func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps h ttprouter.Params) (interface{}, error) { | func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps h ttprouter.Params) (interface{}, error) { | |||
var messages []string | var messages []string | |||
producers, err := s.ci.GetProducers(s.ctx.nsqadmin.getOpts().NSQLookupdHT TPAddresses, s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) | producers, err := s.ci.GetProducers(s.nsqadmin.getOpts().NSQLookupdHTTPAd dresses, s.nsqadmin.getOpts().NSQDHTTPAddresses) | |||
if err != nil { | if err != nil { | |||
pe, ok := err.(clusterinfo.PartialErr) | pe, ok := err.(clusterinfo.PartialErr) | |||
if !ok { | if !ok { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get nodes - %s" , err) | s.nsqadmin.logf(LOG_ERROR, "failed to get nodes - %s", er r) | |||
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | |||
} | } | |||
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) | s.nsqadmin.logf(LOG_WARN, "%s", err) | |||
messages = append(messages, pe.Error()) | messages = append(messages, pe.Error()) | |||
} | } | |||
return struct { | return struct { | |||
Nodes clusterinfo.Producers `json:"nodes"` | Nodes clusterinfo.Producers `json:"nodes"` | |||
Message string `json:"message"` | Message string `json:"message"` | |||
}{producers, maybeWarnMsg(messages)}, nil | }{producers, maybeWarnMsg(messages)}, nil | |||
} | } | |||
func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht tprouter.Params) (interface{}, error) { | func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht tprouter.Params) (interface{}, error) { | |||
var messages []string | var messages []string | |||
node := ps.ByName("node") | node := ps.ByName("node") | |||
producers, err := s.ci.GetProducers(s.ctx.nsqadmin.getOpts().NSQLookupdHT TPAddresses, s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) | producers, err := s.ci.GetProducers(s.nsqadmin.getOpts().NSQLookupdHTTPAd dresses, s.nsqadmin.getOpts().NSQDHTTPAddresses) | |||
if err != nil { | if err != nil { | |||
pe, ok := err.(clusterinfo.PartialErr) | pe, ok := err.(clusterinfo.PartialErr) | |||
if !ok { | if !ok { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get producers - %s", err) | s.nsqadmin.logf(LOG_ERROR, "failed to get producers - %s" , err) | |||
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | |||
} | } | |||
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) | s.nsqadmin.logf(LOG_WARN, "%s", err) | |||
messages = append(messages, pe.Error()) | messages = append(messages, pe.Error()) | |||
} | } | |||
producer := producers.Search(node) | producer := producers.Search(node) | |||
if producer == nil { | if producer == nil { | |||
return nil, http_api.Err{404, "NODE_NOT_FOUND"} | return nil, http_api.Err{404, "NODE_NOT_FOUND"} | |||
} | } | |||
topicStats, _, err := s.ci.GetNSQDStats(clusterinfo.Producers{producer}, "", "", true) | topicStats, _, err := s.ci.GetNSQDStats(clusterinfo.Producers{producer}, "", "", true) | |||
if err != nil { | if err != nil { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s", e rr) | s.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s", err) | |||
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", e rr)} | return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", e rr)} | |||
} | } | |||
var totalClients int64 | var totalClients int64 | |||
var totalMessages int64 | var totalMessages int64 | |||
for _, ts := range topicStats { | for _, ts := range topicStats { | |||
for _, cs := range ts.Channels { | for _, cs := range ts.Channels { | |||
totalClients += int64(len(cs.Clients)) | totalClients += int64(len(cs.Clients)) | |||
} | } | |||
totalMessages += ts.MessageCount | totalMessages += ts.MessageCount | |||
skipping to change at line 410 | skipping to change at line 412 | |||
err := json.NewDecoder(req.Body).Decode(&body) | err := json.NewDecoder(req.Body).Decode(&body) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "INVALID_BODY"} | return nil, http_api.Err{400, "INVALID_BODY"} | |||
} | } | |||
if !protocol.IsValidTopicName(body.Topic) { | if !protocol.IsValidTopicName(body.Topic) { | |||
return nil, http_api.Err{400, "INVALID_TOPIC"} | return nil, http_api.Err{400, "INVALID_TOPIC"} | |||
} | } | |||
err = s.ci.TombstoneNodeForTopic(body.Topic, node, | err = s.ci.TombstoneNodeForTopic(body.Topic, node, | |||
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses) | s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) | |||
if err != nil { | if err != nil { | |||
pe, ok := err.(clusterinfo.PartialErr) | pe, ok := err.(clusterinfo.PartialErr) | |||
if !ok { | if !ok { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to tombstone node for topic - %s", err) | s.nsqadmin.logf(LOG_ERROR, "failed to tombstone node for topic - %s", err) | |||
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | |||
} | } | |||
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) | s.nsqadmin.logf(LOG_WARN, "%s", err) | |||
messages = append(messages, pe.Error()) | messages = append(messages, pe.Error()) | |||
} | } | |||
s.notifyAdminAction("tombstone_topic_producer", body.Topic, "", node, req ) | s.notifyAdminAction("tombstone_topic_producer", body.Topic, "", node, req ) | |||
return struct { | return struct { | |||
Message string `json:"message"` | Message string `json:"message"` | |||
}{maybeWarnMsg(messages)}, nil | }{maybeWarnMsg(messages)}, nil | |||
} | } | |||
skipping to change at line 454 | skipping to change at line 456 | |||
if !protocol.IsValidTopicName(body.Topic) { | if !protocol.IsValidTopicName(body.Topic) { | |||
return nil, http_api.Err{400, "INVALID_TOPIC"} | return nil, http_api.Err{400, "INVALID_TOPIC"} | |||
} | } | |||
if len(body.Channel) > 0 && !protocol.IsValidChannelName(body.Channel) { | if len(body.Channel) > 0 && !protocol.IsValidChannelName(body.Channel) { | |||
return nil, http_api.Err{400, "INVALID_CHANNEL"} | return nil, http_api.Err{400, "INVALID_CHANNEL"} | |||
} | } | |||
err = s.ci.CreateTopicChannel(body.Topic, body.Channel, | err = s.ci.CreateTopicChannel(body.Topic, body.Channel, | |||
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses) | s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) | |||
if err != nil { | if err != nil { | |||
pe, ok := err.(clusterinfo.PartialErr) | pe, ok := err.(clusterinfo.PartialErr) | |||
if !ok { | if !ok { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to create topic/ch annel - %s", err) | s.nsqadmin.logf(LOG_ERROR, "failed to create topic/channe l - %s", err) | |||
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | |||
} | } | |||
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) | s.nsqadmin.logf(LOG_WARN, "%s", err) | |||
messages = append(messages, pe.Error()) | messages = append(messages, pe.Error()) | |||
} | } | |||
s.notifyAdminAction("create_topic", body.Topic, "", "", req) | s.notifyAdminAction("create_topic", body.Topic, "", "", req) | |||
if len(body.Channel) > 0 { | if len(body.Channel) > 0 { | |||
s.notifyAdminAction("create_channel", body.Topic, body.Channel, " ", req) | s.notifyAdminAction("create_channel", body.Topic, body.Channel, " ", req) | |||
} | } | |||
return struct { | return struct { | |||
Message string `json:"message"` | Message string `json:"message"` | |||
skipping to change at line 485 | skipping to change at line 487 | |||
func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request , ps httprouter.Params) (interface{}, error) { | func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request , ps httprouter.Params) (interface{}, error) { | |||
var messages []string | var messages []string | |||
if !s.isAuthorizedAdminRequest(req) { | if !s.isAuthorizedAdminRequest(req) { | |||
return nil, http_api.Err{403, "FORBIDDEN"} | return nil, http_api.Err{403, "FORBIDDEN"} | |||
} | } | |||
topicName := ps.ByName("topic") | topicName := ps.ByName("topic") | |||
err := s.ci.DeleteTopic(topicName, | err := s.ci.DeleteTopic(topicName, | |||
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | |||
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) | s.nsqadmin.getOpts().NSQDHTTPAddresses) | |||
if err != nil { | if err != nil { | |||
pe, ok := err.(clusterinfo.PartialErr) | pe, ok := err.(clusterinfo.PartialErr) | |||
if !ok { | if !ok { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to delete topic - %s", err) | s.nsqadmin.logf(LOG_ERROR, "failed to delete topic - %s", err) | |||
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | |||
} | } | |||
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) | s.nsqadmin.logf(LOG_WARN, "%s", err) | |||
messages = append(messages, pe.Error()) | messages = append(messages, pe.Error()) | |||
} | } | |||
s.notifyAdminAction("delete_topic", topicName, "", "", req) | s.notifyAdminAction("delete_topic", topicName, "", "", req) | |||
return struct { | return struct { | |||
Message string `json:"message"` | Message string `json:"message"` | |||
}{maybeWarnMsg(messages)}, nil | }{maybeWarnMsg(messages)}, nil | |||
} | } | |||
skipping to change at line 515 | skipping to change at line 517 | |||
var messages []string | var messages []string | |||
if !s.isAuthorizedAdminRequest(req) { | if !s.isAuthorizedAdminRequest(req) { | |||
return nil, http_api.Err{403, "FORBIDDEN"} | return nil, http_api.Err{403, "FORBIDDEN"} | |||
} | } | |||
topicName := ps.ByName("topic") | topicName := ps.ByName("topic") | |||
channelName := ps.ByName("channel") | channelName := ps.ByName("channel") | |||
err := s.ci.DeleteChannel(topicName, channelName, | err := s.ci.DeleteChannel(topicName, channelName, | |||
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | |||
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) | s.nsqadmin.getOpts().NSQDHTTPAddresses) | |||
if err != nil { | if err != nil { | |||
pe, ok := err.(clusterinfo.PartialErr) | pe, ok := err.(clusterinfo.PartialErr) | |||
if !ok { | if !ok { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to delete channel - %s", err) | s.nsqadmin.logf(LOG_ERROR, "failed to delete channel - %s ", err) | |||
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | |||
} | } | |||
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) | s.nsqadmin.logf(LOG_WARN, "%s", err) | |||
messages = append(messages, pe.Error()) | messages = append(messages, pe.Error()) | |||
} | } | |||
s.notifyAdminAction("delete_channel", topicName, channelName, "", req) | s.notifyAdminAction("delete_channel", topicName, channelName, "", req) | |||
return struct { | return struct { | |||
Message string `json:"message"` | Message string `json:"message"` | |||
}{maybeWarnMsg(messages)}, nil | }{maybeWarnMsg(messages)}, nil | |||
} | } | |||
skipping to change at line 565 | skipping to change at line 567 | |||
err := json.NewDecoder(req.Body).Decode(&body) | err := json.NewDecoder(req.Body).Decode(&body) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, err.Error()} | return nil, http_api.Err{400, err.Error()} | |||
} | } | |||
switch body.Action { | switch body.Action { | |||
case "pause": | case "pause": | |||
if channelName != "" { | if channelName != "" { | |||
err = s.ci.PauseChannel(topicName, channelName, | err = s.ci.PauseChannel(topicName, channelName, | |||
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | |||
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) | s.nsqadmin.getOpts().NSQDHTTPAddresses) | |||
s.notifyAdminAction("pause_channel", topicName, channelNa me, "", req) | s.notifyAdminAction("pause_channel", topicName, channelNa me, "", req) | |||
} else { | } else { | |||
err = s.ci.PauseTopic(topicName, | err = s.ci.PauseTopic(topicName, | |||
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | |||
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) | s.nsqadmin.getOpts().NSQDHTTPAddresses) | |||
s.notifyAdminAction("pause_topic", topicName, "", "", req ) | s.notifyAdminAction("pause_topic", topicName, "", "", req ) | |||
} | } | |||
case "unpause": | case "unpause": | |||
if channelName != "" { | if channelName != "" { | |||
err = s.ci.UnPauseChannel(topicName, channelName, | err = s.ci.UnPauseChannel(topicName, channelName, | |||
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | |||
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) | s.nsqadmin.getOpts().NSQDHTTPAddresses) | |||
s.notifyAdminAction("unpause_channel", topicName, channel Name, "", req) | s.notifyAdminAction("unpause_channel", topicName, channel Name, "", req) | |||
} else { | } else { | |||
err = s.ci.UnPauseTopic(topicName, | err = s.ci.UnPauseTopic(topicName, | |||
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | |||
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) | s.nsqadmin.getOpts().NSQDHTTPAddresses) | |||
s.notifyAdminAction("unpause_topic", topicName, "", "", r eq) | s.notifyAdminAction("unpause_topic", topicName, "", "", r eq) | |||
} | } | |||
case "empty": | case "empty": | |||
if channelName != "" { | if channelName != "" { | |||
err = s.ci.EmptyChannel(topicName, channelName, | err = s.ci.EmptyChannel(topicName, channelName, | |||
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | |||
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) | s.nsqadmin.getOpts().NSQDHTTPAddresses) | |||
s.notifyAdminAction("empty_channel", topicName, channelNa me, "", req) | s.notifyAdminAction("empty_channel", topicName, channelNa me, "", req) | |||
} else { | } else { | |||
err = s.ci.EmptyTopic(topicName, | err = s.ci.EmptyTopic(topicName, | |||
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, | |||
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) | s.nsqadmin.getOpts().NSQDHTTPAddresses) | |||
s.notifyAdminAction("empty_topic", topicName, "", "", req ) | s.notifyAdminAction("empty_topic", topicName, "", "", req ) | |||
} | } | |||
default: | default: | |||
return nil, http_api.Err{400, "INVALID_ACTION"} | return nil, http_api.Err{400, "INVALID_ACTION"} | |||
} | } | |||
if err != nil { | if err != nil { | |||
pe, ok := err.(clusterinfo.PartialErr) | pe, ok := err.(clusterinfo.PartialErr) | |||
if !ok { | if !ok { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to %s topic/channe l - %s", body.Action, err) | s.nsqadmin.logf(LOG_ERROR, "failed to %s topic/channel - %s", body.Action, err) | |||
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | |||
} | } | |||
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) | s.nsqadmin.logf(LOG_WARN, "%s", err) | |||
messages = append(messages, pe.Error()) | messages = append(messages, pe.Error()) | |||
} | } | |||
return struct { | return struct { | |||
Message string `json:"message"` | Message string `json:"message"` | |||
}{maybeWarnMsg(messages)}, nil | }{maybeWarnMsg(messages)}, nil | |||
} | } | |||
type counterStats struct { | type counterStats struct { | |||
Node string `json:"node"` | Node string `json:"node"` | |||
TopicName string `json:"topic_name"` | TopicName string `json:"topic_name"` | |||
ChannelName string `json:"channel_name"` | ChannelName string `json:"channel_name"` | |||
MessageCount int64 `json:"message_count"` | MessageCount int64 `json:"message_count"` | |||
} | } | |||
func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { | func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { | |||
var messages []string | var messages []string | |||
stats := make(map[string]*counterStats) | stats := make(map[string]*counterStats) | |||
producers, err := s.ci.GetProducers(s.ctx.nsqadmin.getOpts().NSQLookupdHT TPAddresses, s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) | producers, err := s.ci.GetProducers(s.nsqadmin.getOpts().NSQLookupdHTTPAd dresses, s.nsqadmin.getOpts().NSQDHTTPAddresses) | |||
if err != nil { | if err != nil { | |||
pe, ok := err.(clusterinfo.PartialErr) | pe, ok := err.(clusterinfo.PartialErr) | |||
if !ok { | if !ok { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get counter pro ducer list - %s", err) | s.nsqadmin.logf(LOG_ERROR, "failed to get counter produce r list - %s", err) | |||
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | |||
} | } | |||
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) | s.nsqadmin.logf(LOG_WARN, "%s", err) | |||
messages = append(messages, pe.Error()) | messages = append(messages, pe.Error()) | |||
} | } | |||
_, channelStats, err := s.ci.GetNSQDStats(producers, "", "", false) | _, channelStats, err := s.ci.GetNSQDStats(producers, "", "", false) | |||
if err != nil { | if err != nil { | |||
pe, ok := err.(clusterinfo.PartialErr) | pe, ok := err.(clusterinfo.PartialErr) | |||
if !ok { | if !ok { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s", err) | s.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s ", err) | |||
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} | |||
} | } | |||
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) | s.nsqadmin.logf(LOG_WARN, "%s", err) | |||
messages = append(messages, pe.Error()) | messages = append(messages, pe.Error()) | |||
} | } | |||
for _, channelStats := range channelStats { | for _, channelStats := range channelStats { | |||
for _, hostChannelStats := range channelStats.NodeStats { | for _, hostChannelStats := range channelStats.NodeStats { | |||
key := fmt.Sprintf("%s:%s:%s", channelStats.TopicName, ch annelStats.ChannelName, hostChannelStats.Node) | key := fmt.Sprintf("%s:%s:%s", channelStats.TopicName, ch annelStats.ChannelName, hostChannelStats.Node) | |||
s, ok := stats[key] | s, ok := stats[key] | |||
if !ok { | if !ok { | |||
s = &counterStats{ | s = &counterStats{ | |||
Node: hostChannelStats.Node, | Node: hostChannelStats.Node, | |||
skipping to change at line 694 | skipping to change at line 696 | |||
if err != nil || metric != "rate" { | if err != nil || metric != "rate" { | |||
return nil, http_api.Err{400, "INVALID_ARG_METRIC"} | return nil, http_api.Err{400, "INVALID_ARG_METRIC"} | |||
} | } | |||
target, err := reqParams.Get("target") | target, err := reqParams.Get("target") | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "INVALID_ARG_TARGET"} | return nil, http_api.Err{400, "INVALID_ARG_TARGET"} | |||
} | } | |||
params := url.Values{} | params := url.Values{} | |||
params.Set("from", fmt.Sprintf("-%dsec", s.ctx.nsqadmin.getOpts().StatsdI | params.Set("from", fmt.Sprintf("-%dsec", s.nsqadmin.getOpts().StatsdInter | |||
nterval*2/time.Second)) | val*2/time.Second)) | |||
params.Set("until", fmt.Sprintf("-%dsec", s.ctx.nsqadmin.getOpts().Statsd | params.Set("until", fmt.Sprintf("-%dsec", s.nsqadmin.getOpts().StatsdInte | |||
Interval/time.Second)) | rval/time.Second)) | |||
params.Set("format", "json") | params.Set("format", "json") | |||
params.Set("target", target) | params.Set("target", target) | |||
query := fmt.Sprintf("/render?%s", params.Encode()) | query := fmt.Sprintf("/render?%s", params.Encode()) | |||
url := s.ctx.nsqadmin.getOpts().GraphiteURL + query | url := s.nsqadmin.getOpts().GraphiteURL + query | |||
s.ctx.nsqadmin.logf(LOG_INFO, "GRAPHITE: %s", url) | s.nsqadmin.logf(LOG_INFO, "GRAPHITE: %s", url) | |||
var response []struct { | var response []struct { | |||
Target string `json:"target"` | Target string `json:"target"` | |||
DataPoints [][]*float64 `json:"datapoints"` | DataPoints [][]*float64 `json:"datapoints"` | |||
} | } | |||
err = s.client.GETV1(url, &response) | err = s.client.GETV1(url, &response) | |||
if err != nil { | if err != nil { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "graphite request failed - %s", er r) | s.nsqadmin.logf(LOG_ERROR, "graphite request failed - %s", err) | |||
return nil, http_api.Err{500, "INTERNAL_ERROR"} | return nil, http_api.Err{500, "INTERNAL_ERROR"} | |||
} | } | |||
var rateStr string | var rateStr string | |||
rate := *response[0].DataPoints[0][0] | rate := *response[0].DataPoints[0][0] | |||
if rate < 0 { | if rate < 0 { | |||
rateStr = "N/A" | rateStr = "N/A" | |||
} else { | } else { | |||
rateDivisor := s.ctx.nsqadmin.getOpts().StatsdInterval / time.Sec ond | rateDivisor := s.nsqadmin.getOpts().StatsdInterval / time.Second | |||
rateStr = fmt.Sprintf("%.2f", rate/float64(rateDivisor)) | rateStr = fmt.Sprintf("%.2f", rate/float64(rateDivisor)) | |||
} | } | |||
return struct { | return struct { | |||
Rate string `json:"rate"` | Rate string `json:"rate"` | |||
}{rateStr}, nil | }{rateStr}, nil | |||
} | } | |||
func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr outer.Params) (interface{}, error) { | func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr outer.Params) (interface{}, error) { | |||
opt := ps.ByName("opt") | opt := ps.ByName("opt") | |||
allowConfigFromCIDR := s.ctx.nsqadmin.getOpts().AllowConfigFromCIDR | allowConfigFromCIDR := s.nsqadmin.getOpts().AllowConfigFromCIDR | |||
if allowConfigFromCIDR != "" { | if allowConfigFromCIDR != "" { | |||
_, ipnet, _ := net.ParseCIDR(allowConfigFromCIDR) | _, ipnet, _ := net.ParseCIDR(allowConfigFromCIDR) | |||
addr, _, err := net.SplitHostPort(req.RemoteAddr) | addr, _, err := net.SplitHostPort(req.RemoteAddr) | |||
if err != nil { | if err != nil { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to parse RemoteAdd r %s", req.RemoteAddr) | s.nsqadmin.logf(LOG_ERROR, "failed to parse RemoteAddr %s ", req.RemoteAddr) | |||
return nil, http_api.Err{400, "INVALID_REMOTE_ADDR"} | return nil, http_api.Err{400, "INVALID_REMOTE_ADDR"} | |||
} | } | |||
ip := net.ParseIP(addr) | ip := net.ParseIP(addr) | |||
if ip == nil { | if ip == nil { | |||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to parse RemoteAdd r %s", req.RemoteAddr) | s.nsqadmin.logf(LOG_ERROR, "failed to parse RemoteAddr %s ", req.RemoteAddr) | |||
return nil, http_api.Err{400, "INVALID_REMOTE_ADDR"} | return nil, http_api.Err{400, "INVALID_REMOTE_ADDR"} | |||
} | } | |||
if !ipnet.Contains(ip) { | if !ipnet.Contains(ip) { | |||
return nil, http_api.Err{403, "FORBIDDEN"} | return nil, http_api.Err{403, "FORBIDDEN"} | |||
} | } | |||
} | } | |||
if req.Method == "PUT" { | if req.Method == "PUT" { | |||
// add 1 so that it's greater than our max when we test for it | // add 1 so that it's greater than our max when we test for it | |||
// (LimitReader returns a "fake" EOF) | // (LimitReader returns a "fake" EOF) | |||
readMax := int64(1024*1024 + 1) | readMax := int64(1024*1024 + 1) | |||
body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax)) | body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax)) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{500, "INTERNAL_ERROR"} | return nil, http_api.Err{500, "INTERNAL_ERROR"} | |||
} | } | |||
if int64(len(body)) == readMax || len(body) == 0 { | if int64(len(body)) == readMax || len(body) == 0 { | |||
return nil, http_api.Err{413, "INVALID_VALUE"} | return nil, http_api.Err{413, "INVALID_VALUE"} | |||
} | } | |||
opts := *s.ctx.nsqadmin.getOpts() | opts := *s.nsqadmin.getOpts() | |||
switch opt { | switch opt { | |||
case "nsqlookupd_http_addresses": | case "nsqlookupd_http_addresses": | |||
err := json.Unmarshal(body, &opts.NSQLookupdHTTPAddresses ) | err := json.Unmarshal(body, &opts.NSQLookupdHTTPAddresses ) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "INVALID_VALUE"} | return nil, http_api.Err{400, "INVALID_VALUE"} | |||
} | } | |||
case "log_level": | case "log_level": | |||
logLevelStr := string(body) | logLevelStr := string(body) | |||
logLevel, err := lg.ParseLogLevel(logLevelStr) | logLevel, err := lg.ParseLogLevel(logLevelStr) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "INVALID_VALUE"} | return nil, http_api.Err{400, "INVALID_VALUE"} | |||
} | } | |||
opts.LogLevel = logLevel | opts.LogLevel = logLevel | |||
default: | default: | |||
return nil, http_api.Err{400, "INVALID_OPTION"} | return nil, http_api.Err{400, "INVALID_OPTION"} | |||
} | } | |||
s.ctx.nsqadmin.swapOpts(&opts) | s.nsqadmin.swapOpts(&opts) | |||
} | } | |||
v, ok := getOptByCfgName(s.ctx.nsqadmin.getOpts(), opt) | v, ok := getOptByCfgName(s.nsqadmin.getOpts(), opt) | |||
if !ok { | if !ok { | |||
return nil, http_api.Err{400, "INVALID_OPTION"} | return nil, http_api.Err{400, "INVALID_OPTION"} | |||
} | } | |||
return v, nil | return v, nil | |||
} | } | |||
func (s *httpServer) isAuthorizedAdminRequest(req *http.Request) bool { | func (s *httpServer) isAuthorizedAdminRequest(req *http.Request) bool { | |||
adminUsers := s.ctx.nsqadmin.getOpts().AdminUsers | adminUsers := s.nsqadmin.getOpts().AdminUsers | |||
if len(adminUsers) == 0 { | if len(adminUsers) == 0 { | |||
return true | return true | |||
} | } | |||
aclHttpHeader := s.ctx.nsqadmin.getOpts().AclHttpHeader | aclHttpHeader := s.nsqadmin.getOpts().AclHttpHeader | |||
user := req.Header.Get(aclHttpHeader) | user := req.Header.Get(aclHttpHeader) | |||
for _, v := range adminUsers { | for _, v := range adminUsers { | |||
if v == user { | if v == user { | |||
return true | return true | |||
} | } | |||
} | } | |||
return false | return false | |||
} | } | |||
func getOptByCfgName(opts interface{}, name string) (interface{}, bool) { | func getOptByCfgName(opts interface{}, name string) (interface{}, bool) { | |||
End of changes. 72 change blocks. | ||||
106 lines changed or deleted | 107 lines changed or added |