tcp.go (nsq-1.2.0) | : | tcp.go (nsq-1.2.1) | ||
---|---|---|---|---|
package nsqd | package nsqd | |||
import ( | import ( | |||
"io" | "io" | |||
"net" | "net" | |||
"sync" | ||||
"github.com/nsqio/nsq/internal/protocol" | "github.com/nsqio/nsq/internal/protocol" | |||
) | ) | |||
const ( | ||||
typeConsumer = iota | ||||
typeProducer | ||||
) | ||||
type Client interface { | ||||
Type() int | ||||
Stats(string) ClientStats | ||||
} | ||||
type tcpServer struct { | type tcpServer struct { | |||
ctx *context | nsqd *NSQD | |||
conns sync.Map | ||||
} | } | |||
func (p *tcpServer) Handle(clientConn net.Conn) { | func (p *tcpServer) Handle(conn net.Conn) { | |||
p.ctx.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr()) | p.nsqd.logf(LOG_INFO, "TCP: new client(%s)", conn.RemoteAddr()) | |||
// The client should initialize itself by sending a 4 byte sequence indic ating | // The client should initialize itself by sending a 4 byte sequence indic ating | |||
// the version of the protocol that it intends to communicate, this will allow us | // the version of the protocol that it intends to communicate, this will allow us | |||
// to gracefully upgrade the protocol away from text/line oriented to wha tever... | // to gracefully upgrade the protocol away from text/line oriented to wha tever... | |||
buf := make([]byte, 4) | buf := make([]byte, 4) | |||
_, err := io.ReadFull(clientConn, buf) | _, err := io.ReadFull(conn, buf) | |||
if err != nil { | if err != nil { | |||
p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s" | p.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", er | |||
, err) | r) | |||
clientConn.Close() | conn.Close() | |||
return | return | |||
} | } | |||
protocolMagic := string(buf) | protocolMagic := string(buf) | |||
p.ctx.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'", | p.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'", | |||
clientConn.RemoteAddr(), protocolMagic) | conn.RemoteAddr(), protocolMagic) | |||
var prot protocol.Protocol | var prot protocol.Protocol | |||
switch protocolMagic { | switch protocolMagic { | |||
case " V2": | case " V2": | |||
prot = &protocolV2{ctx: p.ctx} | prot = &protocolV2{nsqd: p.nsqd} | |||
default: | default: | |||
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E | protocol.SendFramedResponse(conn, frameTypeError, []byte("E_BAD_P | |||
_BAD_PROTOCOL")) | ROTOCOL")) | |||
clientConn.Close() | conn.Close() | |||
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", | p.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", | |||
clientConn.RemoteAddr(), protocolMagic) | conn.RemoteAddr(), protocolMagic) | |||
return | return | |||
} | } | |||
err = prot.IOLoop(clientConn) | client := prot.NewClient(conn) | |||
p.conns.Store(conn.RemoteAddr(), client) | ||||
err = prot.IOLoop(client) | ||||
if err != nil { | if err != nil { | |||
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAd | p.nsqd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err) | |||
dr(), err) | ||||
return | ||||
} | } | |||
p.conns.Delete(conn.RemoteAddr()) | ||||
client.Close() | ||||
} | ||||
func (p *tcpServer) Close() { | ||||
p.conns.Range(func(k, v interface{}) bool { | ||||
v.(protocol.Client).Close() | ||||
return true | ||||
}) | ||||
} | } | |||
End of changes. 12 change blocks. | ||||
19 lines changed or deleted | 42 lines changed or added |