http.go (nsq-1.2.0) | : | http.go (nsq-1.2.1) | ||
---|---|---|---|---|
skipping to change at line 16 | skipping to change at line 16 | |||
"net/http/pprof" | "net/http/pprof" | |||
"sync/atomic" | "sync/atomic" | |||
"github.com/julienschmidt/httprouter" | "github.com/julienschmidt/httprouter" | |||
"github.com/nsqio/nsq/internal/http_api" | "github.com/nsqio/nsq/internal/http_api" | |||
"github.com/nsqio/nsq/internal/protocol" | "github.com/nsqio/nsq/internal/protocol" | |||
"github.com/nsqio/nsq/internal/version" | "github.com/nsqio/nsq/internal/version" | |||
) | ) | |||
type httpServer struct { | type httpServer struct { | |||
ctx *Context | nsqlookupd *NSQLookupd | |||
router http.Handler | router http.Handler | |||
} | } | |||
func newHTTPServer(ctx *Context) *httpServer { | func newHTTPServer(l *NSQLookupd) *httpServer { | |||
log := http_api.Log(ctx.nsqlookupd.logf) | log := http_api.Log(l.logf) | |||
router := httprouter.New() | router := httprouter.New() | |||
router.HandleMethodNotAllowed = true | router.HandleMethodNotAllowed = true | |||
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.logf) | router.PanicHandler = http_api.LogPanicHandler(l.logf) | |||
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.logf) | router.NotFound = http_api.LogNotFoundHandler(l.logf) | |||
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlook | router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(l.logf) | |||
upd.logf) | ||||
s := &httpServer{ | s := &httpServer{ | |||
ctx: ctx, | nsqlookupd: l, | |||
router: router, | router: router, | |||
} | } | |||
router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_ api.PlainText)) | router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_ api.PlainText)) | |||
router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V 1)) | router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V 1)) | |||
// v1 negotiate | // v1 negotiate | |||
router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api .V1)) | router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api .V1)) | |||
router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_a pi.V1)) | router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_a pi.V1)) | |||
router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_a pi.V1)) | router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_a pi.V1)) | |||
router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, ht tp_api.V1)) | router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, ht tp_api.V1)) | |||
skipping to change at line 81 | skipping to change at line 81 | |||
func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprou ter.Params) (interface{}, error) { | func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprou ter.Params) (interface{}, error) { | |||
return struct { | return struct { | |||
Version string `json:"version"` | Version string `json:"version"` | |||
}{ | }{ | |||
Version: version.Binary, | Version: version.Binary, | |||
}, nil | }, nil | |||
} | } | |||
func (s *httpServer) doTopics(w http.ResponseWriter, req *http.Request, ps httpr outer.Params) (interface{}, error) { | func (s *httpServer) doTopics(w http.ResponseWriter, req *http.Request, ps httpr outer.Params) (interface{}, error) { | |||
topics := s.ctx.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys() | topics := s.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys() | |||
return map[string]interface{}{ | return map[string]interface{}{ | |||
"topics": topics, | "topics": topics, | |||
}, nil | }, nil | |||
} | } | |||
func (s *httpServer) doChannels(w http.ResponseWriter, req *http.Request, ps htt prouter.Params) (interface{}, error) { | func (s *httpServer) doChannels(w http.ResponseWriter, req *http.Request, ps htt prouter.Params) (interface{}, error) { | |||
reqParams, err := http_api.NewReqParams(req) | reqParams, err := http_api.NewReqParams(req) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "INVALID_REQUEST"} | return nil, http_api.Err{400, "INVALID_REQUEST"} | |||
} | } | |||
topicName, err := reqParams.Get("topic") | topicName, err := reqParams.Get("topic") | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | |||
} | } | |||
channels := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, " *").SubKeys() | channels := s.nsqlookupd.DB.FindRegistrations("channel", topicName, "*"). SubKeys() | |||
return map[string]interface{}{ | return map[string]interface{}{ | |||
"channels": channels, | "channels": channels, | |||
}, nil | }, nil | |||
} | } | |||
func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httpr outer.Params) (interface{}, error) { | func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httpr outer.Params) (interface{}, error) { | |||
reqParams, err := http_api.NewReqParams(req) | reqParams, err := http_api.NewReqParams(req) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "INVALID_REQUEST"} | return nil, http_api.Err{400, "INVALID_REQUEST"} | |||
} | } | |||
topicName, err := reqParams.Get("topic") | topicName, err := reqParams.Get("topic") | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | |||
} | } | |||
registration := s.ctx.nsqlookupd.DB.FindRegistrations("topic", topicName, "") | registration := s.nsqlookupd.DB.FindRegistrations("topic", topicName, "") | |||
if len(registration) == 0 { | if len(registration) == 0 { | |||
return nil, http_api.Err{404, "TOPIC_NOT_FOUND"} | return nil, http_api.Err{404, "TOPIC_NOT_FOUND"} | |||
} | } | |||
channels := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, " | channels := s.nsqlookupd.DB.FindRegistrations("channel", topicName, "*"). | |||
*").SubKeys() | SubKeys() | |||
producers := s.ctx.nsqlookupd.DB.FindProducers("topic", topicName, "") | producers := s.nsqlookupd.DB.FindProducers("topic", topicName, "") | |||
producers = producers.FilterByActive(s.ctx.nsqlookupd.opts.InactiveProduc | producers = producers.FilterByActive(s.nsqlookupd.opts.InactiveProducerTi | |||
erTimeout, | meout, | |||
s.ctx.nsqlookupd.opts.TombstoneLifetime) | s.nsqlookupd.opts.TombstoneLifetime) | |||
return map[string]interface{}{ | return map[string]interface{}{ | |||
"channels": channels, | "channels": channels, | |||
"producers": producers.PeerInfo(), | "producers": producers.PeerInfo(), | |||
}, nil | }, nil | |||
} | } | |||
func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { | func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { | |||
reqParams, err := http_api.NewReqParams(req) | reqParams, err := http_api.NewReqParams(req) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "INVALID_REQUEST"} | return nil, http_api.Err{400, "INVALID_REQUEST"} | |||
skipping to change at line 145 | skipping to change at line 145 | |||
topicName, err := reqParams.Get("topic") | topicName, err := reqParams.Get("topic") | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | |||
} | } | |||
if !protocol.IsValidTopicName(topicName) { | if !protocol.IsValidTopicName(topicName) { | |||
return nil, http_api.Err{400, "INVALID_ARG_TOPIC"} | return nil, http_api.Err{400, "INVALID_ARG_TOPIC"} | |||
} | } | |||
s.ctx.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName) | s.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName) | |||
key := Registration{"topic", topicName, ""} | key := Registration{"topic", topicName, ""} | |||
s.ctx.nsqlookupd.DB.AddRegistration(key) | s.nsqlookupd.DB.AddRegistration(key) | |||
return nil, nil | return nil, nil | |||
} | } | |||
func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { | func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { | |||
reqParams, err := http_api.NewReqParams(req) | reqParams, err := http_api.NewReqParams(req) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "INVALID_REQUEST"} | return nil, http_api.Err{400, "INVALID_REQUEST"} | |||
} | } | |||
topicName, err := reqParams.Get("topic") | topicName, err := reqParams.Get("topic") | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | |||
} | } | |||
registrations := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicNa me, "*") | registrations := s.nsqlookupd.DB.FindRegistrations("channel", topicName, "*") | |||
for _, registration := range registrations { | for _, registration := range registrations { | |||
s.ctx.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from to | s.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic( | |||
pic(%s)", registration.SubKey, topicName) | %s)", registration.SubKey, topicName) | |||
s.ctx.nsqlookupd.DB.RemoveRegistration(registration) | s.nsqlookupd.DB.RemoveRegistration(registration) | |||
} | } | |||
registrations = s.ctx.nsqlookupd.DB.FindRegistrations("topic", topicName, "") | registrations = s.nsqlookupd.DB.FindRegistrations("topic", topicName, "") | |||
for _, registration := range registrations { | for _, registration := range registrations { | |||
s.ctx.nsqlookupd.logf(LOG_INFO, "DB: removing topic(%s)", topicNa | s.nsqlookupd.logf(LOG_INFO, "DB: removing topic(%s)", topicName) | |||
me) | s.nsqlookupd.DB.RemoveRegistration(registration) | |||
s.ctx.nsqlookupd.DB.RemoveRegistration(registration) | ||||
} | } | |||
return nil, nil | return nil, nil | |||
} | } | |||
func (s *httpServer) doTombstoneTopicProducer(w http.ResponseWriter, req *http.R equest, ps httprouter.Params) (interface{}, error) { | func (s *httpServer) doTombstoneTopicProducer(w http.ResponseWriter, req *http.R equest, ps httprouter.Params) (interface{}, error) { | |||
reqParams, err := http_api.NewReqParams(req) | reqParams, err := http_api.NewReqParams(req) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "INVALID_REQUEST"} | return nil, http_api.Err{400, "INVALID_REQUEST"} | |||
} | } | |||
skipping to change at line 194 | skipping to change at line 194 | |||
topicName, err := reqParams.Get("topic") | topicName, err := reqParams.Get("topic") | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} | |||
} | } | |||
node, err := reqParams.Get("node") | node, err := reqParams.Get("node") | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "MISSING_ARG_NODE"} | return nil, http_api.Err{400, "MISSING_ARG_NODE"} | |||
} | } | |||
s.ctx.nsqlookupd.logf(LOG_INFO, "DB: setting tombstone for producer@%s of | s.nsqlookupd.logf(LOG_INFO, "DB: setting tombstone for producer@%s of top | |||
topic(%s)", node, topicName) | ic(%s)", node, topicName) | |||
producers := s.ctx.nsqlookupd.DB.FindProducers("topic", topicName, "") | producers := s.nsqlookupd.DB.FindProducers("topic", topicName, "") | |||
for _, p := range producers { | for _, p := range producers { | |||
thisNode := fmt.Sprintf("%s:%d", p.peerInfo.BroadcastAddress, p.p eerInfo.HTTPPort) | thisNode := fmt.Sprintf("%s:%d", p.peerInfo.BroadcastAddress, p.p eerInfo.HTTPPort) | |||
if thisNode == node { | if thisNode == node { | |||
p.Tombstone() | p.Tombstone() | |||
} | } | |||
} | } | |||
return nil, nil | return nil, nil | |||
} | } | |||
skipping to change at line 217 | skipping to change at line 217 | |||
reqParams, err := http_api.NewReqParams(req) | reqParams, err := http_api.NewReqParams(req) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "INVALID_REQUEST"} | return nil, http_api.Err{400, "INVALID_REQUEST"} | |||
} | } | |||
topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) | topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, err.Error()} | return nil, http_api.Err{400, err.Error()} | |||
} | } | |||
s.ctx.nsqlookupd.logf(LOG_INFO, "DB: adding channel(%s) in topic(%s)", ch annelName, topicName) | s.nsqlookupd.logf(LOG_INFO, "DB: adding channel(%s) in topic(%s)", channe lName, topicName) | |||
key := Registration{"channel", topicName, channelName} | key := Registration{"channel", topicName, channelName} | |||
s.ctx.nsqlookupd.DB.AddRegistration(key) | s.nsqlookupd.DB.AddRegistration(key) | |||
s.ctx.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName) | s.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName) | |||
key = Registration{"topic", topicName, ""} | key = Registration{"topic", topicName, ""} | |||
s.ctx.nsqlookupd.DB.AddRegistration(key) | s.nsqlookupd.DB.AddRegistration(key) | |||
return nil, nil | return nil, nil | |||
} | } | |||
func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, p s httprouter.Params) (interface{}, error) { | func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, p s httprouter.Params) (interface{}, error) { | |||
reqParams, err := http_api.NewReqParams(req) | reqParams, err := http_api.NewReqParams(req) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, "INVALID_REQUEST"} | return nil, http_api.Err{400, "INVALID_REQUEST"} | |||
} | } | |||
topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) | topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) | |||
if err != nil { | if err != nil { | |||
return nil, http_api.Err{400, err.Error()} | return nil, http_api.Err{400, err.Error()} | |||
} | } | |||
registrations := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicNa me, channelName) | registrations := s.nsqlookupd.DB.FindRegistrations("channel", topicName, channelName) | |||
if len(registrations) == 0 { | if len(registrations) == 0 { | |||
return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"} | return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"} | |||
} | } | |||
s.ctx.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic(%s)" , channelName, topicName) | s.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic(%s)", ch annelName, topicName) | |||
for _, registration := range registrations { | for _, registration := range registrations { | |||
s.ctx.nsqlookupd.DB.RemoveRegistration(registration) | s.nsqlookupd.DB.RemoveRegistration(registration) | |||
} | } | |||
return nil, nil | return nil, nil | |||
} | } | |||
type node struct { | type node struct { | |||
RemoteAddress string `json:"remote_address"` | RemoteAddress string `json:"remote_address"` | |||
Hostname string `json:"hostname"` | Hostname string `json:"hostname"` | |||
BroadcastAddress string `json:"broadcast_address"` | BroadcastAddress string `json:"broadcast_address"` | |||
TCPPort int `json:"tcp_port"` | TCPPort int `json:"tcp_port"` | |||
HTTPPort int `json:"http_port"` | HTTPPort int `json:"http_port"` | |||
Version string `json:"version"` | Version string `json:"version"` | |||
Tombstones []bool `json:"tombstones"` | Tombstones []bool `json:"tombstones"` | |||
Topics []string `json:"topics"` | Topics []string `json:"topics"` | |||
} | } | |||
func (s *httpServer) doNodes(w http.ResponseWriter, req *http.Request, ps httpro uter.Params) (interface{}, error) { | func (s *httpServer) doNodes(w http.ResponseWriter, req *http.Request, ps httpro uter.Params) (interface{}, error) { | |||
// dont filter out tombstoned nodes | // dont filter out tombstoned nodes | |||
producers := s.ctx.nsqlookupd.DB.FindProducers("client", "", "").FilterBy | producers := s.nsqlookupd.DB.FindProducers("client", "", "").FilterByActi | |||
Active( | ve( | |||
s.ctx.nsqlookupd.opts.InactiveProducerTimeout, 0) | s.nsqlookupd.opts.InactiveProducerTimeout, 0) | |||
nodes := make([]*node, len(producers)) | nodes := make([]*node, len(producers)) | |||
topicProducersMap := make(map[string]Producers) | topicProducersMap := make(map[string]Producers) | |||
for i, p := range producers { | for i, p := range producers { | |||
topics := s.ctx.nsqlookupd.DB.LookupRegistrations(p.peerInfo.id). Filter("topic", "*", "").Keys() | topics := s.nsqlookupd.DB.LookupRegistrations(p.peerInfo.id).Filt er("topic", "*", "").Keys() | |||
// for each topic find the producer that matches this peer | // for each topic find the producer that matches this peer | |||
// to add tombstone information | // to add tombstone information | |||
tombstones := make([]bool, len(topics)) | tombstones := make([]bool, len(topics)) | |||
for j, t := range topics { | for j, t := range topics { | |||
if _, exists := topicProducersMap[t]; !exists { | if _, exists := topicProducersMap[t]; !exists { | |||
topicProducersMap[t] = s.ctx.nsqlookupd.DB.FindPr oducers("topic", t, "") | topicProducersMap[t] = s.nsqlookupd.DB.FindProduc ers("topic", t, "") | |||
} | } | |||
topicProducers := topicProducersMap[t] | topicProducers := topicProducersMap[t] | |||
for _, tp := range topicProducers { | for _, tp := range topicProducers { | |||
if tp.peerInfo == p.peerInfo { | if tp.peerInfo == p.peerInfo { | |||
tombstones[j] = tp.IsTombstoned(s.ctx.nsq lookupd.opts.TombstoneLifetime) | tombstones[j] = tp.IsTombstoned(s.nsqlook upd.opts.TombstoneLifetime) | |||
break | break | |||
} | } | |||
} | } | |||
} | } | |||
nodes[i] = &node{ | nodes[i] = &node{ | |||
RemoteAddress: p.peerInfo.RemoteAddress, | RemoteAddress: p.peerInfo.RemoteAddress, | |||
Hostname: p.peerInfo.Hostname, | Hostname: p.peerInfo.Hostname, | |||
BroadcastAddress: p.peerInfo.BroadcastAddress, | BroadcastAddress: p.peerInfo.BroadcastAddress, | |||
TCPPort: p.peerInfo.TCPPort, | TCPPort: p.peerInfo.TCPPort, | |||
skipping to change at line 307 | skipping to change at line 307 | |||
Topics: topics, | Topics: topics, | |||
} | } | |||
} | } | |||
return map[string]interface{}{ | return map[string]interface{}{ | |||
"producers": nodes, | "producers": nodes, | |||
}, nil | }, nil | |||
} | } | |||
func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httpro uter.Params) (interface{}, error) { | func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httpro uter.Params) (interface{}, error) { | |||
s.ctx.nsqlookupd.DB.RLock() | s.nsqlookupd.DB.RLock() | |||
defer s.ctx.nsqlookupd.DB.RUnlock() | defer s.nsqlookupd.DB.RUnlock() | |||
data := make(map[string][]map[string]interface{}) | data := make(map[string][]map[string]interface{}) | |||
for r, producers := range s.ctx.nsqlookupd.DB.registrationMap { | for r, producers := range s.nsqlookupd.DB.registrationMap { | |||
key := r.Category + ":" + r.Key + ":" + r.SubKey | key := r.Category + ":" + r.Key + ":" + r.SubKey | |||
for _, p := range producers { | for _, p := range producers { | |||
m := map[string]interface{}{ | m := map[string]interface{}{ | |||
"id": p.peerInfo.id, | "id": p.peerInfo.id, | |||
"hostname": p.peerInfo.Hostname, | "hostname": p.peerInfo.Hostname, | |||
"broadcast_address": p.peerInfo.BroadcastAddress, | "broadcast_address": p.peerInfo.BroadcastAddress, | |||
"tcp_port": p.peerInfo.TCPPort, | "tcp_port": p.peerInfo.TCPPort, | |||
"http_port": p.peerInfo.HTTPPort, | "http_port": p.peerInfo.HTTPPort, | |||
"version": p.peerInfo.Version, | "version": p.peerInfo.Version, | |||
"last_update": atomic.LoadInt64(&p.peerInfo .lastUpdate), | "last_update": atomic.LoadInt64(&p.peerInfo .lastUpdate), | |||
End of changes. 28 change blocks. | ||||
48 lines changed or deleted | 46 lines changed or added |