"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "nsqlookupd/http.go" between
nsq-1.2.0.tar.gz and nsq-1.2.1.tar.gz

About: nsq is a realtime distributed and and decentralized messaging platform.

http.go  (nsq-1.2.0):http.go  (nsq-1.2.1)
skipping to change at line 16 skipping to change at line 16
"net/http/pprof" "net/http/pprof"
"sync/atomic" "sync/atomic"
"github.com/julienschmidt/httprouter" "github.com/julienschmidt/httprouter"
"github.com/nsqio/nsq/internal/http_api" "github.com/nsqio/nsq/internal/http_api"
"github.com/nsqio/nsq/internal/protocol" "github.com/nsqio/nsq/internal/protocol"
"github.com/nsqio/nsq/internal/version" "github.com/nsqio/nsq/internal/version"
) )
type httpServer struct { type httpServer struct {
ctx *Context nsqlookupd *NSQLookupd
router http.Handler router http.Handler
} }
func newHTTPServer(ctx *Context) *httpServer { func newHTTPServer(l *NSQLookupd) *httpServer {
log := http_api.Log(ctx.nsqlookupd.logf) log := http_api.Log(l.logf)
router := httprouter.New() router := httprouter.New()
router.HandleMethodNotAllowed = true router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.logf) router.PanicHandler = http_api.LogPanicHandler(l.logf)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.logf) router.NotFound = http_api.LogNotFoundHandler(l.logf)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlook router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(l.logf)
upd.logf)
s := &httpServer{ s := &httpServer{
ctx: ctx, nsqlookupd: l,
router: router, router: router,
} }
router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_ api.PlainText)) router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_ api.PlainText))
router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V 1)) router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V 1))
// v1 negotiate // v1 negotiate
router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api .V1)) router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api .V1))
router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_a pi.V1)) router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_a pi.V1))
router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_a pi.V1)) router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_a pi.V1))
router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, ht tp_api.V1)) router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, ht tp_api.V1))
skipping to change at line 81 skipping to change at line 81
func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprou ter.Params) (interface{}, error) { func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprou ter.Params) (interface{}, error) {
return struct { return struct {
Version string `json:"version"` Version string `json:"version"`
}{ }{
Version: version.Binary, Version: version.Binary,
}, nil }, nil
} }
func (s *httpServer) doTopics(w http.ResponseWriter, req *http.Request, ps httpr outer.Params) (interface{}, error) { func (s *httpServer) doTopics(w http.ResponseWriter, req *http.Request, ps httpr outer.Params) (interface{}, error) {
topics := s.ctx.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys() topics := s.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys()
return map[string]interface{}{ return map[string]interface{}{
"topics": topics, "topics": topics,
}, nil }, nil
} }
func (s *httpServer) doChannels(w http.ResponseWriter, req *http.Request, ps htt prouter.Params) (interface{}, error) { func (s *httpServer) doChannels(w http.ResponseWriter, req *http.Request, ps htt prouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req) reqParams, err := http_api.NewReqParams(req)
if err != nil { if err != nil {
return nil, http_api.Err{400, "INVALID_REQUEST"} return nil, http_api.Err{400, "INVALID_REQUEST"}
} }
topicName, err := reqParams.Get("topic") topicName, err := reqParams.Get("topic")
if err != nil { if err != nil {
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
} }
channels := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, " *").SubKeys() channels := s.nsqlookupd.DB.FindRegistrations("channel", topicName, "*"). SubKeys()
return map[string]interface{}{ return map[string]interface{}{
"channels": channels, "channels": channels,
}, nil }, nil
} }
func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httpr outer.Params) (interface{}, error) { func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httpr outer.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req) reqParams, err := http_api.NewReqParams(req)
if err != nil { if err != nil {
return nil, http_api.Err{400, "INVALID_REQUEST"} return nil, http_api.Err{400, "INVALID_REQUEST"}
} }
topicName, err := reqParams.Get("topic") topicName, err := reqParams.Get("topic")
if err != nil { if err != nil {
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
} }
registration := s.ctx.nsqlookupd.DB.FindRegistrations("topic", topicName, "") registration := s.nsqlookupd.DB.FindRegistrations("topic", topicName, "")
if len(registration) == 0 { if len(registration) == 0 {
return nil, http_api.Err{404, "TOPIC_NOT_FOUND"} return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
} }
channels := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, " channels := s.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").
*").SubKeys() SubKeys()
producers := s.ctx.nsqlookupd.DB.FindProducers("topic", topicName, "") producers := s.nsqlookupd.DB.FindProducers("topic", topicName, "")
producers = producers.FilterByActive(s.ctx.nsqlookupd.opts.InactiveProduc producers = producers.FilterByActive(s.nsqlookupd.opts.InactiveProducerTi
erTimeout, meout,
s.ctx.nsqlookupd.opts.TombstoneLifetime) s.nsqlookupd.opts.TombstoneLifetime)
return map[string]interface{}{ return map[string]interface{}{
"channels": channels, "channels": channels,
"producers": producers.PeerInfo(), "producers": producers.PeerInfo(),
}, nil }, nil
} }
func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req) reqParams, err := http_api.NewReqParams(req)
if err != nil { if err != nil {
return nil, http_api.Err{400, "INVALID_REQUEST"} return nil, http_api.Err{400, "INVALID_REQUEST"}
skipping to change at line 145 skipping to change at line 145
topicName, err := reqParams.Get("topic") topicName, err := reqParams.Get("topic")
if err != nil { if err != nil {
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
} }
if !protocol.IsValidTopicName(topicName) { if !protocol.IsValidTopicName(topicName) {
return nil, http_api.Err{400, "INVALID_ARG_TOPIC"} return nil, http_api.Err{400, "INVALID_ARG_TOPIC"}
} }
s.ctx.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName) s.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName)
key := Registration{"topic", topicName, ""} key := Registration{"topic", topicName, ""}
s.ctx.nsqlookupd.DB.AddRegistration(key) s.nsqlookupd.DB.AddRegistration(key)
return nil, nil return nil, nil
} }
func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req) reqParams, err := http_api.NewReqParams(req)
if err != nil { if err != nil {
return nil, http_api.Err{400, "INVALID_REQUEST"} return nil, http_api.Err{400, "INVALID_REQUEST"}
} }
topicName, err := reqParams.Get("topic") topicName, err := reqParams.Get("topic")
if err != nil { if err != nil {
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
} }
registrations := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicNa me, "*") registrations := s.nsqlookupd.DB.FindRegistrations("channel", topicName, "*")
for _, registration := range registrations { for _, registration := range registrations {
s.ctx.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from to s.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic(
pic(%s)", registration.SubKey, topicName) %s)", registration.SubKey, topicName)
s.ctx.nsqlookupd.DB.RemoveRegistration(registration) s.nsqlookupd.DB.RemoveRegistration(registration)
} }
registrations = s.ctx.nsqlookupd.DB.FindRegistrations("topic", topicName, "") registrations = s.nsqlookupd.DB.FindRegistrations("topic", topicName, "")
for _, registration := range registrations { for _, registration := range registrations {
s.ctx.nsqlookupd.logf(LOG_INFO, "DB: removing topic(%s)", topicNa s.nsqlookupd.logf(LOG_INFO, "DB: removing topic(%s)", topicName)
me) s.nsqlookupd.DB.RemoveRegistration(registration)
s.ctx.nsqlookupd.DB.RemoveRegistration(registration)
} }
return nil, nil return nil, nil
} }
func (s *httpServer) doTombstoneTopicProducer(w http.ResponseWriter, req *http.R equest, ps httprouter.Params) (interface{}, error) { func (s *httpServer) doTombstoneTopicProducer(w http.ResponseWriter, req *http.R equest, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req) reqParams, err := http_api.NewReqParams(req)
if err != nil { if err != nil {
return nil, http_api.Err{400, "INVALID_REQUEST"} return nil, http_api.Err{400, "INVALID_REQUEST"}
} }
skipping to change at line 194 skipping to change at line 194
topicName, err := reqParams.Get("topic") topicName, err := reqParams.Get("topic")
if err != nil { if err != nil {
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
} }
node, err := reqParams.Get("node") node, err := reqParams.Get("node")
if err != nil { if err != nil {
return nil, http_api.Err{400, "MISSING_ARG_NODE"} return nil, http_api.Err{400, "MISSING_ARG_NODE"}
} }
s.ctx.nsqlookupd.logf(LOG_INFO, "DB: setting tombstone for producer@%s of s.nsqlookupd.logf(LOG_INFO, "DB: setting tombstone for producer@%s of top
topic(%s)", node, topicName) ic(%s)", node, topicName)
producers := s.ctx.nsqlookupd.DB.FindProducers("topic", topicName, "") producers := s.nsqlookupd.DB.FindProducers("topic", topicName, "")
for _, p := range producers { for _, p := range producers {
thisNode := fmt.Sprintf("%s:%d", p.peerInfo.BroadcastAddress, p.p eerInfo.HTTPPort) thisNode := fmt.Sprintf("%s:%d", p.peerInfo.BroadcastAddress, p.p eerInfo.HTTPPort)
if thisNode == node { if thisNode == node {
p.Tombstone() p.Tombstone()
} }
} }
return nil, nil return nil, nil
} }
skipping to change at line 217 skipping to change at line 217
reqParams, err := http_api.NewReqParams(req) reqParams, err := http_api.NewReqParams(req)
if err != nil { if err != nil {
return nil, http_api.Err{400, "INVALID_REQUEST"} return nil, http_api.Err{400, "INVALID_REQUEST"}
} }
topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
if err != nil { if err != nil {
return nil, http_api.Err{400, err.Error()} return nil, http_api.Err{400, err.Error()}
} }
s.ctx.nsqlookupd.logf(LOG_INFO, "DB: adding channel(%s) in topic(%s)", ch annelName, topicName) s.nsqlookupd.logf(LOG_INFO, "DB: adding channel(%s) in topic(%s)", channe lName, topicName)
key := Registration{"channel", topicName, channelName} key := Registration{"channel", topicName, channelName}
s.ctx.nsqlookupd.DB.AddRegistration(key) s.nsqlookupd.DB.AddRegistration(key)
s.ctx.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName) s.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName)
key = Registration{"topic", topicName, ""} key = Registration{"topic", topicName, ""}
s.ctx.nsqlookupd.DB.AddRegistration(key) s.nsqlookupd.DB.AddRegistration(key)
return nil, nil return nil, nil
} }
func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, p s httprouter.Params) (interface{}, error) { func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, p s httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req) reqParams, err := http_api.NewReqParams(req)
if err != nil { if err != nil {
return nil, http_api.Err{400, "INVALID_REQUEST"} return nil, http_api.Err{400, "INVALID_REQUEST"}
} }
topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
if err != nil { if err != nil {
return nil, http_api.Err{400, err.Error()} return nil, http_api.Err{400, err.Error()}
} }
registrations := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicNa me, channelName) registrations := s.nsqlookupd.DB.FindRegistrations("channel", topicName, channelName)
if len(registrations) == 0 { if len(registrations) == 0 {
return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"} return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
} }
s.ctx.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic(%s)" , channelName, topicName) s.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic(%s)", ch annelName, topicName)
for _, registration := range registrations { for _, registration := range registrations {
s.ctx.nsqlookupd.DB.RemoveRegistration(registration) s.nsqlookupd.DB.RemoveRegistration(registration)
} }
return nil, nil return nil, nil
} }
type node struct { type node struct {
RemoteAddress string `json:"remote_address"` RemoteAddress string `json:"remote_address"`
Hostname string `json:"hostname"` Hostname string `json:"hostname"`
BroadcastAddress string `json:"broadcast_address"` BroadcastAddress string `json:"broadcast_address"`
TCPPort int `json:"tcp_port"` TCPPort int `json:"tcp_port"`
HTTPPort int `json:"http_port"` HTTPPort int `json:"http_port"`
Version string `json:"version"` Version string `json:"version"`
Tombstones []bool `json:"tombstones"` Tombstones []bool `json:"tombstones"`
Topics []string `json:"topics"` Topics []string `json:"topics"`
} }
func (s *httpServer) doNodes(w http.ResponseWriter, req *http.Request, ps httpro uter.Params) (interface{}, error) { func (s *httpServer) doNodes(w http.ResponseWriter, req *http.Request, ps httpro uter.Params) (interface{}, error) {
// dont filter out tombstoned nodes // dont filter out tombstoned nodes
producers := s.ctx.nsqlookupd.DB.FindProducers("client", "", "").FilterBy producers := s.nsqlookupd.DB.FindProducers("client", "", "").FilterByActi
Active( ve(
s.ctx.nsqlookupd.opts.InactiveProducerTimeout, 0) s.nsqlookupd.opts.InactiveProducerTimeout, 0)
nodes := make([]*node, len(producers)) nodes := make([]*node, len(producers))
topicProducersMap := make(map[string]Producers) topicProducersMap := make(map[string]Producers)
for i, p := range producers { for i, p := range producers {
topics := s.ctx.nsqlookupd.DB.LookupRegistrations(p.peerInfo.id). Filter("topic", "*", "").Keys() topics := s.nsqlookupd.DB.LookupRegistrations(p.peerInfo.id).Filt er("topic", "*", "").Keys()
// for each topic find the producer that matches this peer // for each topic find the producer that matches this peer
// to add tombstone information // to add tombstone information
tombstones := make([]bool, len(topics)) tombstones := make([]bool, len(topics))
for j, t := range topics { for j, t := range topics {
if _, exists := topicProducersMap[t]; !exists { if _, exists := topicProducersMap[t]; !exists {
topicProducersMap[t] = s.ctx.nsqlookupd.DB.FindPr oducers("topic", t, "") topicProducersMap[t] = s.nsqlookupd.DB.FindProduc ers("topic", t, "")
} }
topicProducers := topicProducersMap[t] topicProducers := topicProducersMap[t]
for _, tp := range topicProducers { for _, tp := range topicProducers {
if tp.peerInfo == p.peerInfo { if tp.peerInfo == p.peerInfo {
tombstones[j] = tp.IsTombstoned(s.ctx.nsq lookupd.opts.TombstoneLifetime) tombstones[j] = tp.IsTombstoned(s.nsqlook upd.opts.TombstoneLifetime)
break break
} }
} }
} }
nodes[i] = &node{ nodes[i] = &node{
RemoteAddress: p.peerInfo.RemoteAddress, RemoteAddress: p.peerInfo.RemoteAddress,
Hostname: p.peerInfo.Hostname, Hostname: p.peerInfo.Hostname,
BroadcastAddress: p.peerInfo.BroadcastAddress, BroadcastAddress: p.peerInfo.BroadcastAddress,
TCPPort: p.peerInfo.TCPPort, TCPPort: p.peerInfo.TCPPort,
skipping to change at line 307 skipping to change at line 307
Topics: topics, Topics: topics,
} }
} }
return map[string]interface{}{ return map[string]interface{}{
"producers": nodes, "producers": nodes,
}, nil }, nil
} }
func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httpro uter.Params) (interface{}, error) { func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httpro uter.Params) (interface{}, error) {
s.ctx.nsqlookupd.DB.RLock() s.nsqlookupd.DB.RLock()
defer s.ctx.nsqlookupd.DB.RUnlock() defer s.nsqlookupd.DB.RUnlock()
data := make(map[string][]map[string]interface{}) data := make(map[string][]map[string]interface{})
for r, producers := range s.ctx.nsqlookupd.DB.registrationMap { for r, producers := range s.nsqlookupd.DB.registrationMap {
key := r.Category + ":" + r.Key + ":" + r.SubKey key := r.Category + ":" + r.Key + ":" + r.SubKey
for _, p := range producers { for _, p := range producers {
m := map[string]interface{}{ m := map[string]interface{}{
"id": p.peerInfo.id, "id": p.peerInfo.id,
"hostname": p.peerInfo.Hostname, "hostname": p.peerInfo.Hostname,
"broadcast_address": p.peerInfo.BroadcastAddress, "broadcast_address": p.peerInfo.BroadcastAddress,
"tcp_port": p.peerInfo.TCPPort, "tcp_port": p.peerInfo.TCPPort,
"http_port": p.peerInfo.HTTPPort, "http_port": p.peerInfo.HTTPPort,
"version": p.peerInfo.Version, "version": p.peerInfo.Version,
"last_update": atomic.LoadInt64(&p.peerInfo .lastUpdate), "last_update": atomic.LoadInt64(&p.peerInfo .lastUpdate),
 End of changes. 28 change blocks. 
48 lines changed or deleted 46 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)