"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "nsqadmin/http.go" between
nsq-1.2.0.tar.gz and nsq-1.2.1.tar.gz

About: nsq is a realtime distributed and and decentralized messaging platform.

http.go  (nsq-1.2.0):http.go  (nsq-1.2.1)
skipping to change at line 51 skipping to change at line 51
req.SetBasicAuth(target.User.Username(), passwd) req.SetBasicAuth(target.User.Username(), passwd)
} }
} }
return &httputil.ReverseProxy{ return &httputil.ReverseProxy{
Director: director, Director: director,
Transport: http_api.NewDeadlineTransport(connectTimeout, requestT imeout), Transport: http_api.NewDeadlineTransport(connectTimeout, requestT imeout),
} }
} }
type httpServer struct { type httpServer struct {
ctx *Context nsqadmin *NSQAdmin
router http.Handler router http.Handler
client *http_api.Client client *http_api.Client
ci *clusterinfo.ClusterInfo ci *clusterinfo.ClusterInfo
basePath string basePath string
} }
func NewHTTPServer(ctx *Context) *httpServer { func NewHTTPServer(nsqadmin *NSQAdmin) *httpServer {
log := http_api.Log(ctx.nsqadmin.logf) log := http_api.Log(nsqadmin.logf)
client := http_api.NewClient(ctx.nsqadmin.httpClientTLSConfig, ctx.nsqadm client := http_api.NewClient(nsqadmin.httpClientTLSConfig, nsqadmin.getOp
in.getOpts().HTTPClientConnectTimeout, ts().HTTPClientConnectTimeout,
ctx.nsqadmin.getOpts().HTTPClientRequestTimeout) nsqadmin.getOpts().HTTPClientRequestTimeout)
router := httprouter.New() router := httprouter.New()
router.HandleMethodNotAllowed = true router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqadmin.logf) router.PanicHandler = http_api.LogPanicHandler(nsqadmin.logf)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqadmin.logf) router.NotFound = http_api.LogNotFoundHandler(nsqadmin.logf)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqadmi router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(nsqadmin.lo
n.logf) gf)
s := &httpServer{ s := &httpServer{
ctx: ctx, nsqadmin: nsqadmin,
router: router, router: router,
client: client, client: client,
ci: clusterinfo.New(ctx.nsqadmin.logf, client), ci: clusterinfo.New(nsqadmin.logf, client),
basePath: ctx.nsqadmin.getOpts().BasePath, basePath: nsqadmin.getOpts().BasePath,
} }
bp := func(p string) string { bp := func(p string) string {
return path.Join(s.basePath, p) return path.Join(s.basePath, p)
} }
router.Handle("GET", bp("/"), http_api.Decorate(s.indexHandler, log)) router.Handle("GET", bp("/"), http_api.Decorate(s.indexHandler, log))
router.Handle("GET", bp("/ping"), http_api.Decorate(s.pingHandler, log, h ttp_api.PlainText)) router.Handle("GET", bp("/ping"), http_api.Decorate(s.pingHandler, log, h ttp_api.PlainText))
router.Handle("GET", bp("/topics"), http_api.Decorate(s.indexHandler, log )) router.Handle("GET", bp("/topics"), http_api.Decorate(s.indexHandler, log ))
router.Handle("GET", bp("/topics/:topic"), http_api.Decorate(s.indexHandl er, log)) router.Handle("GET", bp("/topics/:topic"), http_api.Decorate(s.indexHandl er, log))
router.Handle("GET", bp("/topics/:topic/:channel"), http_api.Decorate(s.i ndexHandler, log)) router.Handle("GET", bp("/topics/:topic/:channel"), http_api.Decorate(s.i ndexHandler, log))
router.Handle("GET", bp("/nodes"), http_api.Decorate(s.indexHandler, log) ) router.Handle("GET", bp("/nodes"), http_api.Decorate(s.indexHandler, log) )
router.Handle("GET", bp("/nodes/:node"), http_api.Decorate(s.indexHandler , log)) router.Handle("GET", bp("/nodes/:node"), http_api.Decorate(s.indexHandler , log))
router.Handle("GET", bp("/counter"), http_api.Decorate(s.indexHandler, lo g)) router.Handle("GET", bp("/counter"), http_api.Decorate(s.indexHandler, lo g))
router.Handle("GET", bp("/lookup"), http_api.Decorate(s.indexHandler, log )) router.Handle("GET", bp("/lookup"), http_api.Decorate(s.indexHandler, log ))
router.Handle("GET", bp("/static/:asset"), http_api.Decorate(s.staticAsse tHandler, log, http_api.PlainText)) router.Handle("GET", bp("/static/:asset"), http_api.Decorate(s.staticAsse tHandler, log, http_api.PlainText))
router.Handle("GET", bp("/fonts/:asset"), http_api.Decorate(s.staticAsset Handler, log, http_api.PlainText)) router.Handle("GET", bp("/fonts/:asset"), http_api.Decorate(s.staticAsset Handler, log, http_api.PlainText))
if s.ctx.nsqadmin.getOpts().ProxyGraphite { if s.nsqadmin.getOpts().ProxyGraphite {
proxy := NewSingleHostReverseProxy(ctx.nsqadmin.graphiteURL, ctx. proxy := NewSingleHostReverseProxy(nsqadmin.graphiteURL, nsqadmin
nsqadmin.getOpts().HTTPClientConnectTimeout, .getOpts().HTTPClientConnectTimeout,
ctx.nsqadmin.getOpts().HTTPClientRequestTimeout) nsqadmin.getOpts().HTTPClientRequestTimeout)
router.Handler("GET", bp("/render"), proxy) router.Handler("GET", bp("/render"), proxy)
} }
// v1 endpoints // v1 endpoints
router.Handle("GET", bp("/api/topics"), http_api.Decorate(s.topicsHandler , log, http_api.V1)) router.Handle("GET", bp("/api/topics"), http_api.Decorate(s.topicsHandler , log, http_api.V1))
router.Handle("GET", bp("/api/topics/:topic"), http_api.Decorate(s.topicH andler, log, http_api.V1)) router.Handle("GET", bp("/api/topics/:topic"), http_api.Decorate(s.topicH andler, log, http_api.V1))
router.Handle("GET", bp("/api/topics/:topic/:channel"), http_api.Decorate (s.channelHandler, log, http_api.V1)) router.Handle("GET", bp("/api/topics/:topic/:channel"), http_api.Decorate (s.channelHandler, log, http_api.V1))
router.Handle("GET", bp("/api/nodes"), http_api.Decorate(s.nodesHandler, log, http_api.V1)) router.Handle("GET", bp("/api/nodes"), http_api.Decorate(s.nodesHandler, log, http_api.V1))
router.Handle("GET", bp("/api/nodes/:node"), http_api.Decorate(s.nodeHand ler, log, http_api.V1)) router.Handle("GET", bp("/api/nodes/:node"), http_api.Decorate(s.nodeHand ler, log, http_api.V1))
router.Handle("POST", bp("/api/topics"), http_api.Decorate(s.createTopicC hannelHandler, log, http_api.V1)) router.Handle("POST", bp("/api/topics"), http_api.Decorate(s.createTopicC hannelHandler, log, http_api.V1))
skipping to change at line 150 skipping to change at line 150
GraphEnabled bool GraphEnabled bool
GraphiteURL string GraphiteURL string
StatsdInterval int StatsdInterval int
StatsdCounterFormat string StatsdCounterFormat string
StatsdGaugeFormat string StatsdGaugeFormat string
StatsdPrefix string StatsdPrefix string
NSQLookupd []string NSQLookupd []string
IsAdmin bool IsAdmin bool
}{ }{
Version: version.Binary, Version: version.Binary,
ProxyGraphite: s.ctx.nsqadmin.getOpts().ProxyGraphite, ProxyGraphite: s.nsqadmin.getOpts().ProxyGraphite,
GraphEnabled: s.ctx.nsqadmin.getOpts().GraphiteURL != "", GraphEnabled: s.nsqadmin.getOpts().GraphiteURL != "",
GraphiteURL: s.ctx.nsqadmin.getOpts().GraphiteURL, GraphiteURL: s.nsqadmin.getOpts().GraphiteURL,
StatsdInterval: int(s.ctx.nsqadmin.getOpts().StatsdInterval StatsdInterval: int(s.nsqadmin.getOpts().StatsdInterval / ti
/ time.Second), me.Second),
StatsdCounterFormat: s.ctx.nsqadmin.getOpts().StatsdCounterFormat StatsdCounterFormat: s.nsqadmin.getOpts().StatsdCounterFormat,
, StatsdGaugeFormat: s.nsqadmin.getOpts().StatsdGaugeFormat,
StatsdGaugeFormat: s.ctx.nsqadmin.getOpts().StatsdGaugeFormat, StatsdPrefix: s.nsqadmin.getOpts().StatsdPrefix,
StatsdPrefix: s.ctx.nsqadmin.getOpts().StatsdPrefix, NSQLookupd: s.nsqadmin.getOpts().NSQLookupdHTTPAddresses
NSQLookupd: s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddre ,
sses,
IsAdmin: s.isAuthorizedAdminRequest(req), IsAdmin: s.isAuthorizedAdminRequest(req),
}) })
return nil, nil return nil, nil
} }
func (s *httpServer) staticAssetHandler(w http.ResponseWriter, req *http.Request , ps httprouter.Params) (interface{}, error) { func (s *httpServer) staticAssetHandler(w http.ResponseWriter, req *http.Request , ps httprouter.Params) (interface{}, error) {
assetName := ps.ByName("asset") assetName := ps.ByName("asset")
asset, err := Asset(assetName) asset, err := staticAsset(assetName)
if err != nil { if err != nil {
return nil, http_api.Err{404, "NOT_FOUND"} return nil, http_api.Err{404, "NOT_FOUND"}
} }
ext := path.Ext(assetName) ext := path.Ext(assetName)
ct := mime.TypeByExtension(ext) ct := mime.TypeByExtension(ext)
if ct == "" { if ct == "" {
switch ext { switch ext {
case ".map":
ct = "application/json"
case ".svg": case ".svg":
ct = "image/svg+xml" ct = "image/svg+xml"
case ".woff": case ".woff":
ct = "application/font-woff" ct = "application/font-woff"
case ".ttf": case ".ttf":
ct = "application/font-sfnt" ct = "application/font-sfnt"
case ".eot": case ".eot":
ct = "application/vnd.ms-fontobject" ct = "application/vnd.ms-fontobject"
case ".woff2": case ".woff2":
ct = "application/font-woff2" ct = "application/font-woff2"
skipping to change at line 204 skipping to change at line 206
func (s *httpServer) topicsHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { func (s *httpServer) topicsHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
var messages []string var messages []string
reqParams, err := http_api.NewReqParams(req) reqParams, err := http_api.NewReqParams(req)
if err != nil { if err != nil {
return nil, http_api.Err{400, err.Error()} return nil, http_api.Err{400, err.Error()}
} }
var topics []string var topics []string
if len(s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses) != 0 { if len(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) != 0 {
topics, err = s.ci.GetLookupdTopics(s.ctx.nsqadmin.getOpts().NSQL topics, err = s.ci.GetLookupdTopics(s.nsqadmin.getOpts().NSQLooku
ookupdHTTPAddresses) pdHTTPAddresses)
} else { } else {
topics, err = s.ci.GetNSQDTopics(s.ctx.nsqadmin.getOpts().NSQDHTT PAddresses) topics, err = s.ci.GetNSQDTopics(s.nsqadmin.getOpts().NSQDHTTPAdd resses)
} }
if err != nil { if err != nil {
pe, ok := err.(clusterinfo.PartialErr) pe, ok := err.(clusterinfo.PartialErr)
if !ok { if !ok {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get topics - %s ", err) s.nsqadmin.logf(LOG_ERROR, "failed to get topics - %s", e rr)
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)}
} }
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) s.nsqadmin.logf(LOG_WARN, "%s", err)
messages = append(messages, pe.Error()) messages = append(messages, pe.Error())
} }
inactive, _ := reqParams.Get("inactive") inactive, _ := reqParams.Get("inactive")
if inactive == "true" { if inactive == "true" {
topicChannelMap := make(map[string][]string) topicChannelMap := make(map[string][]string)
if len(s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses) == 0 { if len(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) == 0 {
goto respond goto respond
} }
for _, topicName := range topics { for _, topicName := range topics {
producers, _ := s.ci.GetLookupdTopicProducers( producers, _ := s.ci.GetLookupdTopicProducers(
topicName, s.ctx.nsqadmin.getOpts().NSQLookupdHTT PAddresses) topicName, s.nsqadmin.getOpts().NSQLookupdHTTPAdd resses)
if len(producers) == 0 { if len(producers) == 0 {
topicChannels, _ := s.ci.GetLookupdTopicChannels( topicChannels, _ := s.ci.GetLookupdTopicChannels(
topicName, s.ctx.nsqadmin.getOpts().NSQLo okupdHTTPAddresses) topicName, s.nsqadmin.getOpts().NSQLookup dHTTPAddresses)
topicChannelMap[topicName] = topicChannels topicChannelMap[topicName] = topicChannels
} }
} }
respond: respond:
return struct { return struct {
Topics map[string][]string `json:"topics"` Topics map[string][]string `json:"topics"`
Message string `json:"message"` Message string `json:"message"`
}{topicChannelMap, maybeWarnMsg(messages)}, nil }{topicChannelMap, maybeWarnMsg(messages)}, nil
} }
skipping to change at line 253 skipping to change at line 255
Message string `json:"message"` Message string `json:"message"`
}{topics, maybeWarnMsg(messages)}, nil }{topics, maybeWarnMsg(messages)}, nil
} }
func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps h ttprouter.Params) (interface{}, error) { func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps h ttprouter.Params) (interface{}, error) {
var messages []string var messages []string
topicName := ps.ByName("topic") topicName := ps.ByName("topic")
producers, err := s.ci.GetTopicProducers(topicName, producers, err := s.ci.GetTopicProducers(topicName,
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) s.nsqadmin.getOpts().NSQDHTTPAddresses)
if err != nil { if err != nil {
pe, ok := err.(clusterinfo.PartialErr) pe, ok := err.(clusterinfo.PartialErr)
if !ok { if !ok {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get topic produ cers - %s", err) s.nsqadmin.logf(LOG_ERROR, "failed to get topic producers - %s", err)
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)}
} }
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) s.nsqadmin.logf(LOG_WARN, "%s", err)
messages = append(messages, pe.Error()) messages = append(messages, pe.Error())
} }
topicStats, _, err := s.ci.GetNSQDStats(producers, topicName, "", false) topicStats, _, err := s.ci.GetNSQDStats(producers, topicName, "", false)
if err != nil { if err != nil {
pe, ok := err.(clusterinfo.PartialErr) pe, ok := err.(clusterinfo.PartialErr)
if !ok { if !ok {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get topic metad ata - %s", err) s.nsqadmin.logf(LOG_ERROR, "failed to get topic metadata - %s", err)
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)}
} }
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) s.nsqadmin.logf(LOG_WARN, "%s", err)
messages = append(messages, pe.Error()) messages = append(messages, pe.Error())
} }
allNodesTopicStats := &clusterinfo.TopicStats{TopicName: topicName} allNodesTopicStats := &clusterinfo.TopicStats{TopicName: topicName}
for _, t := range topicStats { for _, t := range topicStats {
allNodesTopicStats.Add(t) allNodesTopicStats.Add(t)
} }
return struct { return struct {
*clusterinfo.TopicStats *clusterinfo.TopicStats
skipping to change at line 293 skipping to change at line 295
}{allNodesTopicStats, maybeWarnMsg(messages)}, nil }{allNodesTopicStats, maybeWarnMsg(messages)}, nil
} }
func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
var messages []string var messages []string
topicName := ps.ByName("topic") topicName := ps.ByName("topic")
channelName := ps.ByName("channel") channelName := ps.ByName("channel")
producers, err := s.ci.GetTopicProducers(topicName, producers, err := s.ci.GetTopicProducers(topicName,
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) s.nsqadmin.getOpts().NSQDHTTPAddresses)
if err != nil { if err != nil {
pe, ok := err.(clusterinfo.PartialErr) pe, ok := err.(clusterinfo.PartialErr)
if !ok { if !ok {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get topic produ cers - %s", err) s.nsqadmin.logf(LOG_ERROR, "failed to get topic producers - %s", err)
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)}
} }
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) s.nsqadmin.logf(LOG_WARN, "%s", err)
messages = append(messages, pe.Error()) messages = append(messages, pe.Error())
} }
_, channelStats, err := s.ci.GetNSQDStats(producers, topicName, channelNa me, true) _, channelStats, err := s.ci.GetNSQDStats(producers, topicName, channelNa me, true)
if err != nil { if err != nil {
pe, ok := err.(clusterinfo.PartialErr) pe, ok := err.(clusterinfo.PartialErr)
if !ok { if !ok {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get channel met adata - %s", err) s.nsqadmin.logf(LOG_ERROR, "failed to get channel metadat a - %s", err)
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)}
} }
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) s.nsqadmin.logf(LOG_WARN, "%s", err)
messages = append(messages, pe.Error()) messages = append(messages, pe.Error())
} }
return struct { return struct {
*clusterinfo.ChannelStats *clusterinfo.ChannelStats
Message string `json:"message"` Message string `json:"message"`
}{channelStats[channelName], maybeWarnMsg(messages)}, nil }{channelStats[channelName], maybeWarnMsg(messages)}, nil
} }
func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps h ttprouter.Params) (interface{}, error) { func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps h ttprouter.Params) (interface{}, error) {
var messages []string var messages []string
producers, err := s.ci.GetProducers(s.ctx.nsqadmin.getOpts().NSQLookupdHT TPAddresses, s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) producers, err := s.ci.GetProducers(s.nsqadmin.getOpts().NSQLookupdHTTPAd dresses, s.nsqadmin.getOpts().NSQDHTTPAddresses)
if err != nil { if err != nil {
pe, ok := err.(clusterinfo.PartialErr) pe, ok := err.(clusterinfo.PartialErr)
if !ok { if !ok {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get nodes - %s" , err) s.nsqadmin.logf(LOG_ERROR, "failed to get nodes - %s", er r)
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)}
} }
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) s.nsqadmin.logf(LOG_WARN, "%s", err)
messages = append(messages, pe.Error()) messages = append(messages, pe.Error())
} }
return struct { return struct {
Nodes clusterinfo.Producers `json:"nodes"` Nodes clusterinfo.Producers `json:"nodes"`
Message string `json:"message"` Message string `json:"message"`
}{producers, maybeWarnMsg(messages)}, nil }{producers, maybeWarnMsg(messages)}, nil
} }
func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht tprouter.Params) (interface{}, error) { func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht tprouter.Params) (interface{}, error) {
var messages []string var messages []string
node := ps.ByName("node") node := ps.ByName("node")
producers, err := s.ci.GetProducers(s.ctx.nsqadmin.getOpts().NSQLookupdHT TPAddresses, s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) producers, err := s.ci.GetProducers(s.nsqadmin.getOpts().NSQLookupdHTTPAd dresses, s.nsqadmin.getOpts().NSQDHTTPAddresses)
if err != nil { if err != nil {
pe, ok := err.(clusterinfo.PartialErr) pe, ok := err.(clusterinfo.PartialErr)
if !ok { if !ok {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get producers - %s", err) s.nsqadmin.logf(LOG_ERROR, "failed to get producers - %s" , err)
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)}
} }
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) s.nsqadmin.logf(LOG_WARN, "%s", err)
messages = append(messages, pe.Error()) messages = append(messages, pe.Error())
} }
producer := producers.Search(node) producer := producers.Search(node)
if producer == nil { if producer == nil {
return nil, http_api.Err{404, "NODE_NOT_FOUND"} return nil, http_api.Err{404, "NODE_NOT_FOUND"}
} }
topicStats, _, err := s.ci.GetNSQDStats(clusterinfo.Producers{producer}, "", "", true) topicStats, _, err := s.ci.GetNSQDStats(clusterinfo.Producers{producer}, "", "", true)
if err != nil { if err != nil {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s", e rr) s.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s", err)
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", e rr)} return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", e rr)}
} }
var totalClients int64 var totalClients int64
var totalMessages int64 var totalMessages int64
for _, ts := range topicStats { for _, ts := range topicStats {
for _, cs := range ts.Channels { for _, cs := range ts.Channels {
totalClients += int64(len(cs.Clients)) totalClients += int64(len(cs.Clients))
} }
totalMessages += ts.MessageCount totalMessages += ts.MessageCount
skipping to change at line 410 skipping to change at line 412
err := json.NewDecoder(req.Body).Decode(&body) err := json.NewDecoder(req.Body).Decode(&body)
if err != nil { if err != nil {
return nil, http_api.Err{400, "INVALID_BODY"} return nil, http_api.Err{400, "INVALID_BODY"}
} }
if !protocol.IsValidTopicName(body.Topic) { if !protocol.IsValidTopicName(body.Topic) {
return nil, http_api.Err{400, "INVALID_TOPIC"} return nil, http_api.Err{400, "INVALID_TOPIC"}
} }
err = s.ci.TombstoneNodeForTopic(body.Topic, node, err = s.ci.TombstoneNodeForTopic(body.Topic, node,
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses) s.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
if err != nil { if err != nil {
pe, ok := err.(clusterinfo.PartialErr) pe, ok := err.(clusterinfo.PartialErr)
if !ok { if !ok {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to tombstone node for topic - %s", err) s.nsqadmin.logf(LOG_ERROR, "failed to tombstone node for topic - %s", err)
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)}
} }
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) s.nsqadmin.logf(LOG_WARN, "%s", err)
messages = append(messages, pe.Error()) messages = append(messages, pe.Error())
} }
s.notifyAdminAction("tombstone_topic_producer", body.Topic, "", node, req ) s.notifyAdminAction("tombstone_topic_producer", body.Topic, "", node, req )
return struct { return struct {
Message string `json:"message"` Message string `json:"message"`
}{maybeWarnMsg(messages)}, nil }{maybeWarnMsg(messages)}, nil
} }
skipping to change at line 454 skipping to change at line 456
if !protocol.IsValidTopicName(body.Topic) { if !protocol.IsValidTopicName(body.Topic) {
return nil, http_api.Err{400, "INVALID_TOPIC"} return nil, http_api.Err{400, "INVALID_TOPIC"}
} }
if len(body.Channel) > 0 && !protocol.IsValidChannelName(body.Channel) { if len(body.Channel) > 0 && !protocol.IsValidChannelName(body.Channel) {
return nil, http_api.Err{400, "INVALID_CHANNEL"} return nil, http_api.Err{400, "INVALID_CHANNEL"}
} }
err = s.ci.CreateTopicChannel(body.Topic, body.Channel, err = s.ci.CreateTopicChannel(body.Topic, body.Channel,
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses) s.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
if err != nil { if err != nil {
pe, ok := err.(clusterinfo.PartialErr) pe, ok := err.(clusterinfo.PartialErr)
if !ok { if !ok {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to create topic/ch annel - %s", err) s.nsqadmin.logf(LOG_ERROR, "failed to create topic/channe l - %s", err)
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)}
} }
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) s.nsqadmin.logf(LOG_WARN, "%s", err)
messages = append(messages, pe.Error()) messages = append(messages, pe.Error())
} }
s.notifyAdminAction("create_topic", body.Topic, "", "", req) s.notifyAdminAction("create_topic", body.Topic, "", "", req)
if len(body.Channel) > 0 { if len(body.Channel) > 0 {
s.notifyAdminAction("create_channel", body.Topic, body.Channel, " ", req) s.notifyAdminAction("create_channel", body.Topic, body.Channel, " ", req)
} }
return struct { return struct {
Message string `json:"message"` Message string `json:"message"`
skipping to change at line 485 skipping to change at line 487
func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request , ps httprouter.Params) (interface{}, error) { func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request , ps httprouter.Params) (interface{}, error) {
var messages []string var messages []string
if !s.isAuthorizedAdminRequest(req) { if !s.isAuthorizedAdminRequest(req) {
return nil, http_api.Err{403, "FORBIDDEN"} return nil, http_api.Err{403, "FORBIDDEN"}
} }
topicName := ps.ByName("topic") topicName := ps.ByName("topic")
err := s.ci.DeleteTopic(topicName, err := s.ci.DeleteTopic(topicName,
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) s.nsqadmin.getOpts().NSQDHTTPAddresses)
if err != nil { if err != nil {
pe, ok := err.(clusterinfo.PartialErr) pe, ok := err.(clusterinfo.PartialErr)
if !ok { if !ok {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to delete topic - %s", err) s.nsqadmin.logf(LOG_ERROR, "failed to delete topic - %s", err)
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)}
} }
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) s.nsqadmin.logf(LOG_WARN, "%s", err)
messages = append(messages, pe.Error()) messages = append(messages, pe.Error())
} }
s.notifyAdminAction("delete_topic", topicName, "", "", req) s.notifyAdminAction("delete_topic", topicName, "", "", req)
return struct { return struct {
Message string `json:"message"` Message string `json:"message"`
}{maybeWarnMsg(messages)}, nil }{maybeWarnMsg(messages)}, nil
} }
skipping to change at line 515 skipping to change at line 517
var messages []string var messages []string
if !s.isAuthorizedAdminRequest(req) { if !s.isAuthorizedAdminRequest(req) {
return nil, http_api.Err{403, "FORBIDDEN"} return nil, http_api.Err{403, "FORBIDDEN"}
} }
topicName := ps.ByName("topic") topicName := ps.ByName("topic")
channelName := ps.ByName("channel") channelName := ps.ByName("channel")
err := s.ci.DeleteChannel(topicName, channelName, err := s.ci.DeleteChannel(topicName, channelName,
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) s.nsqadmin.getOpts().NSQDHTTPAddresses)
if err != nil { if err != nil {
pe, ok := err.(clusterinfo.PartialErr) pe, ok := err.(clusterinfo.PartialErr)
if !ok { if !ok {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to delete channel - %s", err) s.nsqadmin.logf(LOG_ERROR, "failed to delete channel - %s ", err)
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)}
} }
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) s.nsqadmin.logf(LOG_WARN, "%s", err)
messages = append(messages, pe.Error()) messages = append(messages, pe.Error())
} }
s.notifyAdminAction("delete_channel", topicName, channelName, "", req) s.notifyAdminAction("delete_channel", topicName, channelName, "", req)
return struct { return struct {
Message string `json:"message"` Message string `json:"message"`
}{maybeWarnMsg(messages)}, nil }{maybeWarnMsg(messages)}, nil
} }
skipping to change at line 565 skipping to change at line 567
err := json.NewDecoder(req.Body).Decode(&body) err := json.NewDecoder(req.Body).Decode(&body)
if err != nil { if err != nil {
return nil, http_api.Err{400, err.Error()} return nil, http_api.Err{400, err.Error()}
} }
switch body.Action { switch body.Action {
case "pause": case "pause":
if channelName != "" { if channelName != "" {
err = s.ci.PauseChannel(topicName, channelName, err = s.ci.PauseChannel(topicName, channelName,
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) s.nsqadmin.getOpts().NSQDHTTPAddresses)
s.notifyAdminAction("pause_channel", topicName, channelNa me, "", req) s.notifyAdminAction("pause_channel", topicName, channelNa me, "", req)
} else { } else {
err = s.ci.PauseTopic(topicName, err = s.ci.PauseTopic(topicName,
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) s.nsqadmin.getOpts().NSQDHTTPAddresses)
s.notifyAdminAction("pause_topic", topicName, "", "", req ) s.notifyAdminAction("pause_topic", topicName, "", "", req )
} }
case "unpause": case "unpause":
if channelName != "" { if channelName != "" {
err = s.ci.UnPauseChannel(topicName, channelName, err = s.ci.UnPauseChannel(topicName, channelName,
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) s.nsqadmin.getOpts().NSQDHTTPAddresses)
s.notifyAdminAction("unpause_channel", topicName, channel Name, "", req) s.notifyAdminAction("unpause_channel", topicName, channel Name, "", req)
} else { } else {
err = s.ci.UnPauseTopic(topicName, err = s.ci.UnPauseTopic(topicName,
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) s.nsqadmin.getOpts().NSQDHTTPAddresses)
s.notifyAdminAction("unpause_topic", topicName, "", "", r eq) s.notifyAdminAction("unpause_topic", topicName, "", "", r eq)
} }
case "empty": case "empty":
if channelName != "" { if channelName != "" {
err = s.ci.EmptyChannel(topicName, channelName, err = s.ci.EmptyChannel(topicName, channelName,
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) s.nsqadmin.getOpts().NSQDHTTPAddresses)
s.notifyAdminAction("empty_channel", topicName, channelNa me, "", req) s.notifyAdminAction("empty_channel", topicName, channelNa me, "", req)
} else { } else {
err = s.ci.EmptyTopic(topicName, err = s.ci.EmptyTopic(topicName,
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) s.nsqadmin.getOpts().NSQDHTTPAddresses)
s.notifyAdminAction("empty_topic", topicName, "", "", req ) s.notifyAdminAction("empty_topic", topicName, "", "", req )
} }
default: default:
return nil, http_api.Err{400, "INVALID_ACTION"} return nil, http_api.Err{400, "INVALID_ACTION"}
} }
if err != nil { if err != nil {
pe, ok := err.(clusterinfo.PartialErr) pe, ok := err.(clusterinfo.PartialErr)
if !ok { if !ok {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to %s topic/channe l - %s", body.Action, err) s.nsqadmin.logf(LOG_ERROR, "failed to %s topic/channel - %s", body.Action, err)
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)}
} }
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) s.nsqadmin.logf(LOG_WARN, "%s", err)
messages = append(messages, pe.Error()) messages = append(messages, pe.Error())
} }
return struct { return struct {
Message string `json:"message"` Message string `json:"message"`
}{maybeWarnMsg(messages)}, nil }{maybeWarnMsg(messages)}, nil
} }
type counterStats struct { type counterStats struct {
Node string `json:"node"` Node string `json:"node"`
TopicName string `json:"topic_name"` TopicName string `json:"topic_name"`
ChannelName string `json:"channel_name"` ChannelName string `json:"channel_name"`
MessageCount int64 `json:"message_count"` MessageCount int64 `json:"message_count"`
} }
func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
var messages []string var messages []string
stats := make(map[string]*counterStats) stats := make(map[string]*counterStats)
producers, err := s.ci.GetProducers(s.ctx.nsqadmin.getOpts().NSQLookupdHT TPAddresses, s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) producers, err := s.ci.GetProducers(s.nsqadmin.getOpts().NSQLookupdHTTPAd dresses, s.nsqadmin.getOpts().NSQDHTTPAddresses)
if err != nil { if err != nil {
pe, ok := err.(clusterinfo.PartialErr) pe, ok := err.(clusterinfo.PartialErr)
if !ok { if !ok {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get counter pro ducer list - %s", err) s.nsqadmin.logf(LOG_ERROR, "failed to get counter produce r list - %s", err)
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)}
} }
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) s.nsqadmin.logf(LOG_WARN, "%s", err)
messages = append(messages, pe.Error()) messages = append(messages, pe.Error())
} }
_, channelStats, err := s.ci.GetNSQDStats(producers, "", "", false) _, channelStats, err := s.ci.GetNSQDStats(producers, "", "", false)
if err != nil { if err != nil {
pe, ok := err.(clusterinfo.PartialErr) pe, ok := err.(clusterinfo.PartialErr)
if !ok { if !ok {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s", err) s.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s ", err)
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)} return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR : %s", err)}
} }
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) s.nsqadmin.logf(LOG_WARN, "%s", err)
messages = append(messages, pe.Error()) messages = append(messages, pe.Error())
} }
for _, channelStats := range channelStats { for _, channelStats := range channelStats {
for _, hostChannelStats := range channelStats.NodeStats { for _, hostChannelStats := range channelStats.NodeStats {
key := fmt.Sprintf("%s:%s:%s", channelStats.TopicName, ch annelStats.ChannelName, hostChannelStats.Node) key := fmt.Sprintf("%s:%s:%s", channelStats.TopicName, ch annelStats.ChannelName, hostChannelStats.Node)
s, ok := stats[key] s, ok := stats[key]
if !ok { if !ok {
s = &counterStats{ s = &counterStats{
Node: hostChannelStats.Node, Node: hostChannelStats.Node,
skipping to change at line 694 skipping to change at line 696
if err != nil || metric != "rate" { if err != nil || metric != "rate" {
return nil, http_api.Err{400, "INVALID_ARG_METRIC"} return nil, http_api.Err{400, "INVALID_ARG_METRIC"}
} }
target, err := reqParams.Get("target") target, err := reqParams.Get("target")
if err != nil { if err != nil {
return nil, http_api.Err{400, "INVALID_ARG_TARGET"} return nil, http_api.Err{400, "INVALID_ARG_TARGET"}
} }
params := url.Values{} params := url.Values{}
params.Set("from", fmt.Sprintf("-%dsec", s.ctx.nsqadmin.getOpts().StatsdI params.Set("from", fmt.Sprintf("-%dsec", s.nsqadmin.getOpts().StatsdInter
nterval*2/time.Second)) val*2/time.Second))
params.Set("until", fmt.Sprintf("-%dsec", s.ctx.nsqadmin.getOpts().Statsd params.Set("until", fmt.Sprintf("-%dsec", s.nsqadmin.getOpts().StatsdInte
Interval/time.Second)) rval/time.Second))
params.Set("format", "json") params.Set("format", "json")
params.Set("target", target) params.Set("target", target)
query := fmt.Sprintf("/render?%s", params.Encode()) query := fmt.Sprintf("/render?%s", params.Encode())
url := s.ctx.nsqadmin.getOpts().GraphiteURL + query url := s.nsqadmin.getOpts().GraphiteURL + query
s.ctx.nsqadmin.logf(LOG_INFO, "GRAPHITE: %s", url) s.nsqadmin.logf(LOG_INFO, "GRAPHITE: %s", url)
var response []struct { var response []struct {
Target string `json:"target"` Target string `json:"target"`
DataPoints [][]*float64 `json:"datapoints"` DataPoints [][]*float64 `json:"datapoints"`
} }
err = s.client.GETV1(url, &response) err = s.client.GETV1(url, &response)
if err != nil { if err != nil {
s.ctx.nsqadmin.logf(LOG_ERROR, "graphite request failed - %s", er r) s.nsqadmin.logf(LOG_ERROR, "graphite request failed - %s", err)
return nil, http_api.Err{500, "INTERNAL_ERROR"} return nil, http_api.Err{500, "INTERNAL_ERROR"}
} }
var rateStr string var rateStr string
rate := *response[0].DataPoints[0][0] rate := *response[0].DataPoints[0][0]
if rate < 0 { if rate < 0 {
rateStr = "N/A" rateStr = "N/A"
} else { } else {
rateDivisor := s.ctx.nsqadmin.getOpts().StatsdInterval / time.Sec ond rateDivisor := s.nsqadmin.getOpts().StatsdInterval / time.Second
rateStr = fmt.Sprintf("%.2f", rate/float64(rateDivisor)) rateStr = fmt.Sprintf("%.2f", rate/float64(rateDivisor))
} }
return struct { return struct {
Rate string `json:"rate"` Rate string `json:"rate"`
}{rateStr}, nil }{rateStr}, nil
} }
func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr outer.Params) (interface{}, error) { func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr outer.Params) (interface{}, error) {
opt := ps.ByName("opt") opt := ps.ByName("opt")
allowConfigFromCIDR := s.ctx.nsqadmin.getOpts().AllowConfigFromCIDR allowConfigFromCIDR := s.nsqadmin.getOpts().AllowConfigFromCIDR
if allowConfigFromCIDR != "" { if allowConfigFromCIDR != "" {
_, ipnet, _ := net.ParseCIDR(allowConfigFromCIDR) _, ipnet, _ := net.ParseCIDR(allowConfigFromCIDR)
addr, _, err := net.SplitHostPort(req.RemoteAddr) addr, _, err := net.SplitHostPort(req.RemoteAddr)
if err != nil { if err != nil {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to parse RemoteAdd r %s", req.RemoteAddr) s.nsqadmin.logf(LOG_ERROR, "failed to parse RemoteAddr %s ", req.RemoteAddr)
return nil, http_api.Err{400, "INVALID_REMOTE_ADDR"} return nil, http_api.Err{400, "INVALID_REMOTE_ADDR"}
} }
ip := net.ParseIP(addr) ip := net.ParseIP(addr)
if ip == nil { if ip == nil {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to parse RemoteAdd r %s", req.RemoteAddr) s.nsqadmin.logf(LOG_ERROR, "failed to parse RemoteAddr %s ", req.RemoteAddr)
return nil, http_api.Err{400, "INVALID_REMOTE_ADDR"} return nil, http_api.Err{400, "INVALID_REMOTE_ADDR"}
} }
if !ipnet.Contains(ip) { if !ipnet.Contains(ip) {
return nil, http_api.Err{403, "FORBIDDEN"} return nil, http_api.Err{403, "FORBIDDEN"}
} }
} }
if req.Method == "PUT" { if req.Method == "PUT" {
// add 1 so that it's greater than our max when we test for it // add 1 so that it's greater than our max when we test for it
// (LimitReader returns a "fake" EOF) // (LimitReader returns a "fake" EOF)
readMax := int64(1024*1024 + 1) readMax := int64(1024*1024 + 1)
body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax)) body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax))
if err != nil { if err != nil {
return nil, http_api.Err{500, "INTERNAL_ERROR"} return nil, http_api.Err{500, "INTERNAL_ERROR"}
} }
if int64(len(body)) == readMax || len(body) == 0 { if int64(len(body)) == readMax || len(body) == 0 {
return nil, http_api.Err{413, "INVALID_VALUE"} return nil, http_api.Err{413, "INVALID_VALUE"}
} }
opts := *s.ctx.nsqadmin.getOpts() opts := *s.nsqadmin.getOpts()
switch opt { switch opt {
case "nsqlookupd_http_addresses": case "nsqlookupd_http_addresses":
err := json.Unmarshal(body, &opts.NSQLookupdHTTPAddresses ) err := json.Unmarshal(body, &opts.NSQLookupdHTTPAddresses )
if err != nil { if err != nil {
return nil, http_api.Err{400, "INVALID_VALUE"} return nil, http_api.Err{400, "INVALID_VALUE"}
} }
case "log_level": case "log_level":
logLevelStr := string(body) logLevelStr := string(body)
logLevel, err := lg.ParseLogLevel(logLevelStr) logLevel, err := lg.ParseLogLevel(logLevelStr)
if err != nil { if err != nil {
return nil, http_api.Err{400, "INVALID_VALUE"} return nil, http_api.Err{400, "INVALID_VALUE"}
} }
opts.LogLevel = logLevel opts.LogLevel = logLevel
default: default:
return nil, http_api.Err{400, "INVALID_OPTION"} return nil, http_api.Err{400, "INVALID_OPTION"}
} }
s.ctx.nsqadmin.swapOpts(&opts) s.nsqadmin.swapOpts(&opts)
} }
v, ok := getOptByCfgName(s.ctx.nsqadmin.getOpts(), opt) v, ok := getOptByCfgName(s.nsqadmin.getOpts(), opt)
if !ok { if !ok {
return nil, http_api.Err{400, "INVALID_OPTION"} return nil, http_api.Err{400, "INVALID_OPTION"}
} }
return v, nil return v, nil
} }
func (s *httpServer) isAuthorizedAdminRequest(req *http.Request) bool { func (s *httpServer) isAuthorizedAdminRequest(req *http.Request) bool {
adminUsers := s.ctx.nsqadmin.getOpts().AdminUsers adminUsers := s.nsqadmin.getOpts().AdminUsers
if len(adminUsers) == 0 { if len(adminUsers) == 0 {
return true return true
} }
aclHttpHeader := s.ctx.nsqadmin.getOpts().AclHttpHeader aclHttpHeader := s.nsqadmin.getOpts().AclHttpHeader
user := req.Header.Get(aclHttpHeader) user := req.Header.Get(aclHttpHeader)
for _, v := range adminUsers { for _, v := range adminUsers {
if v == user { if v == user {
return true return true
} }
} }
return false return false
} }
func getOptByCfgName(opts interface{}, name string) (interface{}, bool) { func getOptByCfgName(opts interface{}, name string) (interface{}, bool) {
 End of changes. 72 change blocks. 
106 lines changed or deleted 107 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)