"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "nsqlookupd/lookup_protocol_v1.go" between
nsq-1.2.0.tar.gz and nsq-1.2.1.tar.gz

About: nsq is a realtime distributed and and decentralized messaging platform.

lookup_protocol_v1.go  (nsq-1.2.0):lookup_protocol_v1.go  (nsq-1.2.1)
skipping to change at line 21 skipping to change at line 21
"os" "os"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/nsqio/nsq/internal/protocol" "github.com/nsqio/nsq/internal/protocol"
"github.com/nsqio/nsq/internal/version" "github.com/nsqio/nsq/internal/version"
) )
type LookupProtocolV1 struct { type LookupProtocolV1 struct {
ctx *Context nsqlookupd *NSQLookupd
} }
func (p *LookupProtocolV1) IOLoop(conn net.Conn) error { func (p *LookupProtocolV1) NewClient(conn net.Conn) protocol.Client {
return NewClientV1(conn)
}
func (p *LookupProtocolV1) IOLoop(c protocol.Client) error {
var err error var err error
var line string var line string
client := NewClientV1(conn) client := c.(*ClientV1)
reader := bufio.NewReader(client) reader := bufio.NewReader(client)
for { for {
line, err = reader.ReadString('\n') line, err = reader.ReadString('\n')
if err != nil { if err != nil {
break break
} }
line = strings.TrimSpace(line) line = strings.TrimSpace(line)
params := strings.Split(line, " ") params := strings.Split(line, " ")
var response []byte var response []byte
response, err = p.Exec(client, reader, params) response, err = p.Exec(client, reader, params)
if err != nil { if err != nil {
ctx := "" ctx := ""
if parentErr := err.(protocol.ChildErr).Parent(); parentE rr != nil { if parentErr := err.(protocol.ChildErr).Parent(); parentE rr != nil {
ctx = " - " + parentErr.Error() ctx = " - " + parentErr.Error()
} }
p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, e rr, ctx) p.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)
_, sendErr := protocol.SendResponse(client, []byte(err.Er ror())) _, sendErr := protocol.SendResponse(client, []byte(err.Er ror()))
if sendErr != nil { if sendErr != nil {
p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", c lient, sendErr, ctx) p.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", clien t, sendErr, ctx)
break break
} }
// errors of type FatalClientErr should forceably close t he connection // errors of type FatalClientErr should forceably close t he connection
if _, ok := err.(*protocol.FatalClientErr); ok { if _, ok := err.(*protocol.FatalClientErr); ok {
break break
} }
continue continue
} }
if response != nil { if response != nil {
_, err = protocol.SendResponse(client, response) _, err = protocol.SendResponse(client, response)
if err != nil { if err != nil {
break break
} }
} }
} }
conn.Close() p.nsqlookupd.logf(LOG_INFO, "PROTOCOL(V1): [%s] exiting ioloop", client)
p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): closing", client)
if client.peerInfo != nil { if client.peerInfo != nil {
registrations := p.ctx.nsqlookupd.DB.LookupRegistrations(client.p eerInfo.id) registrations := p.nsqlookupd.DB.LookupRegistrations(client.peerI nfo.id)
for _, r := range registrations { for _, r := range registrations {
if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, cl if removed, _ := p.nsqlookupd.DB.RemoveProducer(r, client
ient.peerInfo.id); removed { .peerInfo.id); removed {
p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) U p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREG
NREGISTER category:%s key:%s subkey:%s", ISTER category:%s key:%s subkey:%s",
client, r.Category, r.Key, r.SubKey) client, r.Category, r.Key, r.SubKey)
} }
} }
} }
return err return err
} }
func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params [ ]string) ([]byte, error) { func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params [ ]string) ([]byte, error) {
switch params[0] { switch params[0] {
case "PING": case "PING":
return p.PING(client, params) return p.PING(client, params)
case "IDENTIFY": case "IDENTIFY":
return p.IDENTIFY(client, reader, params[1:]) return p.IDENTIFY(client, reader, params[1:])
case "REGISTER": case "REGISTER":
skipping to change at line 131 skipping to change at line 137
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")
} }
topic, channel, err := getTopicChan("REGISTER", params) topic, channel, err := getTopicChan("REGISTER", params)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if channel != "" { if channel != "" {
key := Registration{"channel", topic, channel} key := Registration{"channel", topic, channel}
if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: clien if p.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.pe
t.peerInfo}) { erInfo}) {
p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER cate
category:%s key:%s subkey:%s", gory:%s key:%s subkey:%s",
client, "channel", topic, channel) client, "channel", topic, channel)
} }
} }
key := Registration{"topic", topic, ""} key := Registration{"topic", topic, ""}
if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerIn if p.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo})
fo}) { {
p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s
:%s key:%s subkey:%s", key:%s subkey:%s",
client, "topic", topic, "") client, "topic", topic, "")
} }
return []byte("OK"), nil return []byte("OK"), nil
} }
func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, pa rams []string) ([]byte, error) { func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, pa rams []string) ([]byte, error) {
if client.peerInfo == nil { if client.peerInfo == nil {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")
} }
topic, channel, err := getTopicChan("UNREGISTER", params) topic, channel, err := getTopicChan("UNREGISTER", params)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if channel != "" { if channel != "" {
key := Registration{"channel", topic, channel} key := Registration{"channel", topic, channel}
removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.p eerInfo.id) removed, left := p.nsqlookupd.DB.RemoveProducer(key, client.peerI nfo.id)
if removed { if removed {
p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTE R category:%s key:%s subkey:%s", p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER ca tegory:%s key:%s subkey:%s",
client, "channel", topic, channel) client, "channel", topic, channel)
} }
// for ephemeral channels, remove the channel as well if it has n o producers // for ephemeral channels, remove the channel as well if it has n o producers
if left == 0 && strings.HasSuffix(channel, "#ephemeral") { if left == 0 && strings.HasSuffix(channel, "#ephemeral") {
p.ctx.nsqlookupd.DB.RemoveRegistration(key) p.nsqlookupd.DB.RemoveRegistration(key)
} }
} else { } else {
// no channel was specified so this is a topic unregistration // no channel was specified so this is a topic unregistration
// remove all of the channel registrations... // remove all of the channel registrations...
// normally this shouldn't happen which is why we print a warning message // normally this shouldn't happen which is why we print a warning message
// if anything is actually removed // if anything is actually removed
registrations := p.ctx.nsqlookupd.DB.FindRegistrations("channel", topic, "*") registrations := p.nsqlookupd.DB.FindRegistrations("channel", top ic, "*")
for _, r := range registrations { for _, r := range registrations {
removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, clien t.peerInfo.id) removed, _ := p.nsqlookupd.DB.RemoveProducer(r, client.pe erInfo.id)
if removed { if removed {
p.ctx.nsqlookupd.logf(LOG_WARN, "client(%s) unexp ected UNREGISTER category:%s key:%s subkey:%s", p.nsqlookupd.logf(LOG_WARN, "client(%s) unexpecte d UNREGISTER category:%s key:%s subkey:%s",
client, "channel", topic, r.SubKey) client, "channel", topic, r.SubKey)
} }
} }
key := Registration{"topic", topic, ""} key := Registration{"topic", topic, ""}
removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.p eerInfo.id) removed, left := p.nsqlookupd.DB.RemoveProducer(key, client.peerI nfo.id)
if removed { if removed {
p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTE R category:%s key:%s subkey:%s", p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER ca tegory:%s key:%s subkey:%s",
client, "topic", topic, "") client, "topic", topic, "")
} }
if left == 0 && strings.HasSuffix(topic, "#ephemeral") { if left == 0 && strings.HasSuffix(topic, "#ephemeral") {
p.ctx.nsqlookupd.DB.RemoveRegistration(key) p.nsqlookupd.DB.RemoveRegistration(key)
} }
} }
return []byte("OK"), nil return []byte("OK"), nil
} }
func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, para ms []string) ([]byte, error) { func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, para ms []string) ([]byte, error) {
var err error var err error
if client.peerInfo != nil { if client.peerInfo != nil {
skipping to change at line 229 skipping to change at line 235
peerInfo.RemoteAddress = client.RemoteAddr().String() peerInfo.RemoteAddress = client.RemoteAddr().String()
// require all fields // require all fields
if peerInfo.BroadcastAddress == "" || peerInfo.TCPPort == 0 || peerInfo.H TTPPort == 0 || peerInfo.Version == "" { if peerInfo.BroadcastAddress == "" || peerInfo.TCPPort == 0 || peerInfo.H TTPPort == 0 || peerInfo.Version == "" {
return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTI FY missing fields") return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTI FY missing fields")
} }
atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano()) atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano())
p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): IDENTIFY Address:%s TCP:%d H TTP:%d Version:%s", p.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): IDENTIFY Address:%s TCP:%d HTTP: %d Version:%s",
client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTT PPort, peerInfo.Version) client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTT PPort, peerInfo.Version)
client.peerInfo = &peerInfo client.peerInfo = &peerInfo
if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Produ if p.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{
cer{peerInfo: client.peerInfo}) { peerInfo: client.peerInfo}) {
p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s
:%s key:%s subkey:%s", client, "client", "", "") key:%s subkey:%s", client, "client", "", "")
} }
// build a response // build a response
data := make(map[string]interface{}) data := make(map[string]interface{})
data["tcp_port"] = p.ctx.nsqlookupd.RealTCPAddr().Port data["tcp_port"] = p.nsqlookupd.RealTCPAddr().Port
data["http_port"] = p.ctx.nsqlookupd.RealHTTPAddr().Port data["http_port"] = p.nsqlookupd.RealHTTPAddr().Port
data["version"] = version.Binary data["version"] = version.Binary
hostname, err := os.Hostname() hostname, err := os.Hostname()
if err != nil { if err != nil {
log.Fatalf("ERROR: unable to get hostname %s", err) log.Fatalf("ERROR: unable to get hostname %s", err)
} }
data["broadcast_address"] = p.ctx.nsqlookupd.opts.BroadcastAddress data["broadcast_address"] = p.nsqlookupd.opts.BroadcastAddress
data["hostname"] = hostname data["hostname"] = hostname
response, err := json.Marshal(data) response, err := json.Marshal(data)
if err != nil { if err != nil {
p.ctx.nsqlookupd.logf(LOG_ERROR, "marshaling %v", data) p.nsqlookupd.logf(LOG_ERROR, "marshaling %v", data)
return []byte("OK"), nil return []byte("OK"), nil
} }
return response, nil return response, nil
} }
func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, erro r) { func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, erro r) {
if client.peerInfo != nil { if client.peerInfo != nil {
// we could get a PING before other commands on the same client c onnection // we could get a PING before other commands on the same client c onnection
cur := time.Unix(0, atomic.LoadInt64(&client.peerInfo.lastUpdate) ) cur := time.Unix(0, atomic.LoadInt64(&client.peerInfo.lastUpdate) )
now := time.Now() now := time.Now()
p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): pinged (last ping %s )", client.peerInfo.id, p.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): pinged (last ping %s)", client.peerInfo.id,
now.Sub(cur)) now.Sub(cur))
atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano()) atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano())
} }
return []byte("OK"), nil return []byte("OK"), nil
} }
 End of changes. 26 change blocks. 
39 lines changed or deleted 45 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)