"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "nsqlookupd/tcp.go" between
nsq-1.2.0.tar.gz and nsq-1.2.1.tar.gz

About: nsq is a realtime distributed and and decentralized messaging platform.

tcp.go  (nsq-1.2.0):tcp.go  (nsq-1.2.1)
package nsqlookupd package nsqlookupd
import ( import (
"io" "io"
"net" "net"
"sync"
"github.com/nsqio/nsq/internal/protocol" "github.com/nsqio/nsq/internal/protocol"
) )
type tcpServer struct { type tcpServer struct {
ctx *Context nsqlookupd *NSQLookupd
conns sync.Map
} }
func (p *tcpServer) Handle(clientConn net.Conn) { func (p *tcpServer) Handle(conn net.Conn) {
p.ctx.nsqlookupd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteA p.nsqlookupd.logf(LOG_INFO, "TCP: new client(%s)", conn.RemoteAddr())
ddr())
// The client should initialize itself by sending a 4 byte sequence indic ating // The client should initialize itself by sending a 4 byte sequence indic ating
// the version of the protocol that it intends to communicate, this will allow us // the version of the protocol that it intends to communicate, this will allow us
// to gracefully upgrade the protocol away from text/line oriented to wha tever... // to gracefully upgrade the protocol away from text/line oriented to wha tever...
buf := make([]byte, 4) buf := make([]byte, 4)
_, err := io.ReadFull(clientConn, buf) _, err := io.ReadFull(conn, buf)
if err != nil { if err != nil {
p.ctx.nsqlookupd.logf(LOG_ERROR, "failed to read protocol version p.nsqlookupd.logf(LOG_ERROR, "failed to read protocol version - %
- %s", err) s", err)
clientConn.Close() conn.Close()
return return
} }
protocolMagic := string(buf) protocolMagic := string(buf)
p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'" p.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
, conn.RemoteAddr(), protocolMagic)
clientConn.RemoteAddr(), protocolMagic)
var prot protocol.Protocol var prot protocol.Protocol
switch protocolMagic { switch protocolMagic {
case " V1": case " V1":
prot = &LookupProtocolV1{ctx: p.ctx} prot = &LookupProtocolV1{nsqlookupd: p.nsqlookupd}
default: default:
protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL")) protocol.SendResponse(conn, []byte("E_BAD_PROTOCOL"))
clientConn.Close() conn.Close()
p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic ' p.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'"
%s'", ,
clientConn.RemoteAddr(), protocolMagic) conn.RemoteAddr(), protocolMagic)
return return
} }
err = prot.IOLoop(clientConn) client := prot.NewClient(conn)
p.conns.Store(conn.RemoteAddr(), client)
err = prot.IOLoop(client)
if err != nil { if err != nil {
p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", clientConn.Re p.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr()
moteAddr(), err) , err)
return
} }
p.conns.Delete(conn.RemoteAddr())
client.Close()
}
func (p *tcpServer) Close() {
p.conns.Range(func(k, v interface{}) bool {
v.(protocol.Client).Close()
return true
})
} }
 End of changes. 11 change blocks. 
21 lines changed or deleted 33 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)