lookup_protocol_v1.go (nsq-1.2.0) | : | lookup_protocol_v1.go (nsq-1.2.1) | ||
---|---|---|---|---|
skipping to change at line 21 | skipping to change at line 21 | |||
"os" | "os" | |||
"strings" | "strings" | |||
"sync/atomic" | "sync/atomic" | |||
"time" | "time" | |||
"github.com/nsqio/nsq/internal/protocol" | "github.com/nsqio/nsq/internal/protocol" | |||
"github.com/nsqio/nsq/internal/version" | "github.com/nsqio/nsq/internal/version" | |||
) | ) | |||
type LookupProtocolV1 struct { | type LookupProtocolV1 struct { | |||
ctx *Context | nsqlookupd *NSQLookupd | |||
} | } | |||
func (p *LookupProtocolV1) IOLoop(conn net.Conn) error { | func (p *LookupProtocolV1) NewClient(conn net.Conn) protocol.Client { | |||
return NewClientV1(conn) | ||||
} | ||||
func (p *LookupProtocolV1) IOLoop(c protocol.Client) error { | ||||
var err error | var err error | |||
var line string | var line string | |||
client := NewClientV1(conn) | client := c.(*ClientV1) | |||
reader := bufio.NewReader(client) | reader := bufio.NewReader(client) | |||
for { | for { | |||
line, err = reader.ReadString('\n') | line, err = reader.ReadString('\n') | |||
if err != nil { | if err != nil { | |||
break | break | |||
} | } | |||
line = strings.TrimSpace(line) | line = strings.TrimSpace(line) | |||
params := strings.Split(line, " ") | params := strings.Split(line, " ") | |||
var response []byte | var response []byte | |||
response, err = p.Exec(client, reader, params) | response, err = p.Exec(client, reader, params) | |||
if err != nil { | if err != nil { | |||
ctx := "" | ctx := "" | |||
if parentErr := err.(protocol.ChildErr).Parent(); parentE rr != nil { | if parentErr := err.(protocol.ChildErr).Parent(); parentE rr != nil { | |||
ctx = " - " + parentErr.Error() | ctx = " - " + parentErr.Error() | |||
} | } | |||
p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, e rr, ctx) | p.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx) | |||
_, sendErr := protocol.SendResponse(client, []byte(err.Er ror())) | _, sendErr := protocol.SendResponse(client, []byte(err.Er ror())) | |||
if sendErr != nil { | if sendErr != nil { | |||
p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", c lient, sendErr, ctx) | p.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", clien t, sendErr, ctx) | |||
break | break | |||
} | } | |||
// errors of type FatalClientErr should forceably close t he connection | // errors of type FatalClientErr should forceably close t he connection | |||
if _, ok := err.(*protocol.FatalClientErr); ok { | if _, ok := err.(*protocol.FatalClientErr); ok { | |||
break | break | |||
} | } | |||
continue | continue | |||
} | } | |||
if response != nil { | if response != nil { | |||
_, err = protocol.SendResponse(client, response) | _, err = protocol.SendResponse(client, response) | |||
if err != nil { | if err != nil { | |||
break | break | |||
} | } | |||
} | } | |||
} | } | |||
conn.Close() | p.nsqlookupd.logf(LOG_INFO, "PROTOCOL(V1): [%s] exiting ioloop", client) | |||
p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): closing", client) | ||||
if client.peerInfo != nil { | if client.peerInfo != nil { | |||
registrations := p.ctx.nsqlookupd.DB.LookupRegistrations(client.p eerInfo.id) | registrations := p.nsqlookupd.DB.LookupRegistrations(client.peerI nfo.id) | |||
for _, r := range registrations { | for _, r := range registrations { | |||
if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, cl | if removed, _ := p.nsqlookupd.DB.RemoveProducer(r, client | |||
ient.peerInfo.id); removed { | .peerInfo.id); removed { | |||
p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) U | p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREG | |||
NREGISTER category:%s key:%s subkey:%s", | ISTER category:%s key:%s subkey:%s", | |||
client, r.Category, r.Key, r.SubKey) | client, r.Category, r.Key, r.SubKey) | |||
} | } | |||
} | } | |||
} | } | |||
return err | return err | |||
} | } | |||
func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params [ ]string) ([]byte, error) { | func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params [ ]string) ([]byte, error) { | |||
switch params[0] { | switch params[0] { | |||
case "PING": | case "PING": | |||
return p.PING(client, params) | return p.PING(client, params) | |||
case "IDENTIFY": | case "IDENTIFY": | |||
return p.IDENTIFY(client, reader, params[1:]) | return p.IDENTIFY(client, reader, params[1:]) | |||
case "REGISTER": | case "REGISTER": | |||
skipping to change at line 131 | skipping to change at line 137 | |||
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") | return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") | |||
} | } | |||
topic, channel, err := getTopicChan("REGISTER", params) | topic, channel, err := getTopicChan("REGISTER", params) | |||
if err != nil { | if err != nil { | |||
return nil, err | return nil, err | |||
} | } | |||
if channel != "" { | if channel != "" { | |||
key := Registration{"channel", topic, channel} | key := Registration{"channel", topic, channel} | |||
if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: clien | if p.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.pe | |||
t.peerInfo}) { | erInfo}) { | |||
p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER | p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER cate | |||
category:%s key:%s subkey:%s", | gory:%s key:%s subkey:%s", | |||
client, "channel", topic, channel) | client, "channel", topic, channel) | |||
} | } | |||
} | } | |||
key := Registration{"topic", topic, ""} | key := Registration{"topic", topic, ""} | |||
if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerIn | if p.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) | |||
fo}) { | { | |||
p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category | p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s | |||
:%s key:%s subkey:%s", | key:%s subkey:%s", | |||
client, "topic", topic, "") | client, "topic", topic, "") | |||
} | } | |||
return []byte("OK"), nil | return []byte("OK"), nil | |||
} | } | |||
func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, pa rams []string) ([]byte, error) { | func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, pa rams []string) ([]byte, error) { | |||
if client.peerInfo == nil { | if client.peerInfo == nil { | |||
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") | return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") | |||
} | } | |||
topic, channel, err := getTopicChan("UNREGISTER", params) | topic, channel, err := getTopicChan("UNREGISTER", params) | |||
if err != nil { | if err != nil { | |||
return nil, err | return nil, err | |||
} | } | |||
if channel != "" { | if channel != "" { | |||
key := Registration{"channel", topic, channel} | key := Registration{"channel", topic, channel} | |||
removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.p eerInfo.id) | removed, left := p.nsqlookupd.DB.RemoveProducer(key, client.peerI nfo.id) | |||
if removed { | if removed { | |||
p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTE R category:%s key:%s subkey:%s", | p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER ca tegory:%s key:%s subkey:%s", | |||
client, "channel", topic, channel) | client, "channel", topic, channel) | |||
} | } | |||
// for ephemeral channels, remove the channel as well if it has n o producers | // for ephemeral channels, remove the channel as well if it has n o producers | |||
if left == 0 && strings.HasSuffix(channel, "#ephemeral") { | if left == 0 && strings.HasSuffix(channel, "#ephemeral") { | |||
p.ctx.nsqlookupd.DB.RemoveRegistration(key) | p.nsqlookupd.DB.RemoveRegistration(key) | |||
} | } | |||
} else { | } else { | |||
// no channel was specified so this is a topic unregistration | // no channel was specified so this is a topic unregistration | |||
// remove all of the channel registrations... | // remove all of the channel registrations... | |||
// normally this shouldn't happen which is why we print a warning message | // normally this shouldn't happen which is why we print a warning message | |||
// if anything is actually removed | // if anything is actually removed | |||
registrations := p.ctx.nsqlookupd.DB.FindRegistrations("channel", topic, "*") | registrations := p.nsqlookupd.DB.FindRegistrations("channel", top ic, "*") | |||
for _, r := range registrations { | for _, r := range registrations { | |||
removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, clien t.peerInfo.id) | removed, _ := p.nsqlookupd.DB.RemoveProducer(r, client.pe erInfo.id) | |||
if removed { | if removed { | |||
p.ctx.nsqlookupd.logf(LOG_WARN, "client(%s) unexp ected UNREGISTER category:%s key:%s subkey:%s", | p.nsqlookupd.logf(LOG_WARN, "client(%s) unexpecte d UNREGISTER category:%s key:%s subkey:%s", | |||
client, "channel", topic, r.SubKey) | client, "channel", topic, r.SubKey) | |||
} | } | |||
} | } | |||
key := Registration{"topic", topic, ""} | key := Registration{"topic", topic, ""} | |||
removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.p eerInfo.id) | removed, left := p.nsqlookupd.DB.RemoveProducer(key, client.peerI nfo.id) | |||
if removed { | if removed { | |||
p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTE R category:%s key:%s subkey:%s", | p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER ca tegory:%s key:%s subkey:%s", | |||
client, "topic", topic, "") | client, "topic", topic, "") | |||
} | } | |||
if left == 0 && strings.HasSuffix(topic, "#ephemeral") { | if left == 0 && strings.HasSuffix(topic, "#ephemeral") { | |||
p.ctx.nsqlookupd.DB.RemoveRegistration(key) | p.nsqlookupd.DB.RemoveRegistration(key) | |||
} | } | |||
} | } | |||
return []byte("OK"), nil | return []byte("OK"), nil | |||
} | } | |||
func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, para ms []string) ([]byte, error) { | func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, para ms []string) ([]byte, error) { | |||
var err error | var err error | |||
if client.peerInfo != nil { | if client.peerInfo != nil { | |||
skipping to change at line 229 | skipping to change at line 235 | |||
peerInfo.RemoteAddress = client.RemoteAddr().String() | peerInfo.RemoteAddress = client.RemoteAddr().String() | |||
// require all fields | // require all fields | |||
if peerInfo.BroadcastAddress == "" || peerInfo.TCPPort == 0 || peerInfo.H TTPPort == 0 || peerInfo.Version == "" { | if peerInfo.BroadcastAddress == "" || peerInfo.TCPPort == 0 || peerInfo.H TTPPort == 0 || peerInfo.Version == "" { | |||
return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTI FY missing fields") | return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTI FY missing fields") | |||
} | } | |||
atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano()) | atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano()) | |||
p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): IDENTIFY Address:%s TCP:%d H TTP:%d Version:%s", | p.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): IDENTIFY Address:%s TCP:%d HTTP: %d Version:%s", | |||
client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTT PPort, peerInfo.Version) | client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTT PPort, peerInfo.Version) | |||
client.peerInfo = &peerInfo | client.peerInfo = &peerInfo | |||
if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Produ | if p.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{ | |||
cer{peerInfo: client.peerInfo}) { | peerInfo: client.peerInfo}) { | |||
p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category | p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s | |||
:%s key:%s subkey:%s", client, "client", "", "") | key:%s subkey:%s", client, "client", "", "") | |||
} | } | |||
// build a response | // build a response | |||
data := make(map[string]interface{}) | data := make(map[string]interface{}) | |||
data["tcp_port"] = p.ctx.nsqlookupd.RealTCPAddr().Port | data["tcp_port"] = p.nsqlookupd.RealTCPAddr().Port | |||
data["http_port"] = p.ctx.nsqlookupd.RealHTTPAddr().Port | data["http_port"] = p.nsqlookupd.RealHTTPAddr().Port | |||
data["version"] = version.Binary | data["version"] = version.Binary | |||
hostname, err := os.Hostname() | hostname, err := os.Hostname() | |||
if err != nil { | if err != nil { | |||
log.Fatalf("ERROR: unable to get hostname %s", err) | log.Fatalf("ERROR: unable to get hostname %s", err) | |||
} | } | |||
data["broadcast_address"] = p.ctx.nsqlookupd.opts.BroadcastAddress | data["broadcast_address"] = p.nsqlookupd.opts.BroadcastAddress | |||
data["hostname"] = hostname | data["hostname"] = hostname | |||
response, err := json.Marshal(data) | response, err := json.Marshal(data) | |||
if err != nil { | if err != nil { | |||
p.ctx.nsqlookupd.logf(LOG_ERROR, "marshaling %v", data) | p.nsqlookupd.logf(LOG_ERROR, "marshaling %v", data) | |||
return []byte("OK"), nil | return []byte("OK"), nil | |||
} | } | |||
return response, nil | return response, nil | |||
} | } | |||
func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, erro r) { | func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, erro r) { | |||
if client.peerInfo != nil { | if client.peerInfo != nil { | |||
// we could get a PING before other commands on the same client c onnection | // we could get a PING before other commands on the same client c onnection | |||
cur := time.Unix(0, atomic.LoadInt64(&client.peerInfo.lastUpdate) ) | cur := time.Unix(0, atomic.LoadInt64(&client.peerInfo.lastUpdate) ) | |||
now := time.Now() | now := time.Now() | |||
p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): pinged (last ping %s )", client.peerInfo.id, | p.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): pinged (last ping %s)", client.peerInfo.id, | |||
now.Sub(cur)) | now.Sub(cur)) | |||
atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano()) | atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano()) | |||
} | } | |||
return []byte("OK"), nil | return []byte("OK"), nil | |||
} | } | |||
End of changes. 26 change blocks. | ||||
39 lines changed or deleted | 45 lines changed or added |