tcp_server.go (nsq-1.2.0) | : | tcp_server.go (nsq-1.2.1) | ||
---|---|---|---|---|
package protocol | package protocol | |||
import ( | import ( | |||
"fmt" | "fmt" | |||
"net" | "net" | |||
"runtime" | "runtime" | |||
"strings" | "strings" | |||
"sync" | ||||
"github.com/nsqio/nsq/internal/lg" | "github.com/nsqio/nsq/internal/lg" | |||
) | ) | |||
type TCPHandler interface { | type TCPHandler interface { | |||
Handle(net.Conn) | Handle(net.Conn) | |||
} | } | |||
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) er ror { | func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) er ror { | |||
logf(lg.INFO, "TCP: listening on %s", listener.Addr()) | logf(lg.INFO, "TCP: listening on %s", listener.Addr()) | |||
var wg sync.WaitGroup | ||||
for { | for { | |||
clientConn, err := listener.Accept() | clientConn, err := listener.Accept() | |||
if err != nil { | if err != nil { | |||
if nerr, ok := err.(net.Error); ok && nerr.Temporary() { | if nerr, ok := err.(net.Error); ok && nerr.Temporary() { | |||
logf(lg.WARN, "temporary Accept() failure - %s", err) | logf(lg.WARN, "temporary Accept() failure - %s", err) | |||
runtime.Gosched() | runtime.Gosched() | |||
continue | continue | |||
} | } | |||
// theres no direct way to detect this error because it i s not exposed | // theres no direct way to detect this error because it i s not exposed | |||
if !strings.Contains(err.Error(), "use of closed network connection") { | if !strings.Contains(err.Error(), "use of closed network connection") { | |||
return fmt.Errorf("listener.Accept() error - %s", err) | return fmt.Errorf("listener.Accept() error - %s", err) | |||
} | } | |||
break | break | |||
} | } | |||
go handler.Handle(clientConn) | ||||
wg.Add(1) | ||||
go func() { | ||||
handler.Handle(clientConn) | ||||
wg.Done() | ||||
}() | ||||
} | } | |||
// wait to return until all handler goroutines complete | ||||
wg.Wait() | ||||
logf(lg.INFO, "TCP: closing %s", listener.Addr()) | logf(lg.INFO, "TCP: closing %s", listener.Addr()) | |||
return nil | return nil | |||
} | } | |||
End of changes. 4 change blocks. | ||||
1 lines changed or deleted | 12 lines changed or added |