tcp.go (nsq-1.2.0) | : | tcp.go (nsq-1.2.1) | ||
---|---|---|---|---|
package nsqlookupd | package nsqlookupd | |||
import ( | import ( | |||
"io" | "io" | |||
"net" | "net" | |||
"sync" | ||||
"github.com/nsqio/nsq/internal/protocol" | "github.com/nsqio/nsq/internal/protocol" | |||
) | ) | |||
type tcpServer struct { | type tcpServer struct { | |||
ctx *Context | nsqlookupd *NSQLookupd | |||
conns sync.Map | ||||
} | } | |||
func (p *tcpServer) Handle(clientConn net.Conn) { | func (p *tcpServer) Handle(conn net.Conn) { | |||
p.ctx.nsqlookupd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteA | p.nsqlookupd.logf(LOG_INFO, "TCP: new client(%s)", conn.RemoteAddr()) | |||
ddr()) | ||||
// The client should initialize itself by sending a 4 byte sequence indic ating | // The client should initialize itself by sending a 4 byte sequence indic ating | |||
// the version of the protocol that it intends to communicate, this will allow us | // the version of the protocol that it intends to communicate, this will allow us | |||
// to gracefully upgrade the protocol away from text/line oriented to wha tever... | // to gracefully upgrade the protocol away from text/line oriented to wha tever... | |||
buf := make([]byte, 4) | buf := make([]byte, 4) | |||
_, err := io.ReadFull(clientConn, buf) | _, err := io.ReadFull(conn, buf) | |||
if err != nil { | if err != nil { | |||
p.ctx.nsqlookupd.logf(LOG_ERROR, "failed to read protocol version | p.nsqlookupd.logf(LOG_ERROR, "failed to read protocol version - % | |||
- %s", err) | s", err) | |||
clientConn.Close() | conn.Close() | |||
return | return | |||
} | } | |||
protocolMagic := string(buf) | protocolMagic := string(buf) | |||
p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'" | p.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'", | |||
, | conn.RemoteAddr(), protocolMagic) | |||
clientConn.RemoteAddr(), protocolMagic) | ||||
var prot protocol.Protocol | var prot protocol.Protocol | |||
switch protocolMagic { | switch protocolMagic { | |||
case " V1": | case " V1": | |||
prot = &LookupProtocolV1{ctx: p.ctx} | prot = &LookupProtocolV1{nsqlookupd: p.nsqlookupd} | |||
default: | default: | |||
protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL")) | protocol.SendResponse(conn, []byte("E_BAD_PROTOCOL")) | |||
clientConn.Close() | conn.Close() | |||
p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic ' | p.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'" | |||
%s'", | , | |||
clientConn.RemoteAddr(), protocolMagic) | conn.RemoteAddr(), protocolMagic) | |||
return | return | |||
} | } | |||
err = prot.IOLoop(clientConn) | client := prot.NewClient(conn) | |||
p.conns.Store(conn.RemoteAddr(), client) | ||||
err = prot.IOLoop(client) | ||||
if err != nil { | if err != nil { | |||
p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", clientConn.Re | p.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr() | |||
moteAddr(), err) | , err) | |||
return | ||||
} | } | |||
p.conns.Delete(conn.RemoteAddr()) | ||||
client.Close() | ||||
} | ||||
func (p *tcpServer) Close() { | ||||
p.conns.Range(func(k, v interface{}) bool { | ||||
v.(protocol.Client).Close() | ||||
return true | ||||
}) | ||||
} | } | |||
End of changes. 11 change blocks. | ||||
21 lines changed or deleted | 33 lines changed or added |