"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "nsqd/tcp.go" between
nsq-1.2.0.tar.gz and nsq-1.2.1.tar.gz

About: nsq is a realtime distributed and and decentralized messaging platform.

tcp.go  (nsq-1.2.0):tcp.go  (nsq-1.2.1)
package nsqd package nsqd
import ( import (
"io" "io"
"net" "net"
"sync"
"github.com/nsqio/nsq/internal/protocol" "github.com/nsqio/nsq/internal/protocol"
) )
const (
typeConsumer = iota
typeProducer
)
type Client interface {
Type() int
Stats(string) ClientStats
}
type tcpServer struct { type tcpServer struct {
ctx *context nsqd *NSQD
conns sync.Map
} }
func (p *tcpServer) Handle(clientConn net.Conn) { func (p *tcpServer) Handle(conn net.Conn) {
p.ctx.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr()) p.nsqd.logf(LOG_INFO, "TCP: new client(%s)", conn.RemoteAddr())
// The client should initialize itself by sending a 4 byte sequence indic ating // The client should initialize itself by sending a 4 byte sequence indic ating
// the version of the protocol that it intends to communicate, this will allow us // the version of the protocol that it intends to communicate, this will allow us
// to gracefully upgrade the protocol away from text/line oriented to wha tever... // to gracefully upgrade the protocol away from text/line oriented to wha tever...
buf := make([]byte, 4) buf := make([]byte, 4)
_, err := io.ReadFull(clientConn, buf) _, err := io.ReadFull(conn, buf)
if err != nil { if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s" p.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", er
, err) r)
clientConn.Close() conn.Close()
return return
} }
protocolMagic := string(buf) protocolMagic := string(buf)
p.ctx.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'", p.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic) conn.RemoteAddr(), protocolMagic)
var prot protocol.Protocol var prot protocol.Protocol
switch protocolMagic { switch protocolMagic {
case " V2": case " V2":
prot = &protocolV2{ctx: p.ctx} prot = &protocolV2{nsqd: p.nsqd}
default: default:
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E protocol.SendFramedResponse(conn, frameTypeError, []byte("E_BAD_P
_BAD_PROTOCOL")) ROTOCOL"))
clientConn.Close() conn.Close()
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", p.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic) conn.RemoteAddr(), protocolMagic)
return return
} }
err = prot.IOLoop(clientConn) client := prot.NewClient(conn)
p.conns.Store(conn.RemoteAddr(), client)
err = prot.IOLoop(client)
if err != nil { if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAd p.nsqd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err)
dr(), err)
return
} }
p.conns.Delete(conn.RemoteAddr())
client.Close()
}
func (p *tcpServer) Close() {
p.conns.Range(func(k, v interface{}) bool {
v.(protocol.Client).Close()
return true
})
} }
 End of changes. 12 change blocks. 
19 lines changed or deleted 42 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)