nsqlookupd.go (nsq-1.2.0) | : | nsqlookupd.go (nsq-1.2.1) | ||
---|---|---|---|---|
skipping to change at line 21 | skipping to change at line 21 | |||
"github.com/nsqio/nsq/internal/protocol" | "github.com/nsqio/nsq/internal/protocol" | |||
"github.com/nsqio/nsq/internal/util" | "github.com/nsqio/nsq/internal/util" | |||
"github.com/nsqio/nsq/internal/version" | "github.com/nsqio/nsq/internal/version" | |||
) | ) | |||
type NSQLookupd struct { | type NSQLookupd struct { | |||
sync.RWMutex | sync.RWMutex | |||
opts *Options | opts *Options | |||
tcpListener net.Listener | tcpListener net.Listener | |||
httpListener net.Listener | httpListener net.Listener | |||
tcpServer *tcpServer | ||||
waitGroup util.WaitGroupWrapper | waitGroup util.WaitGroupWrapper | |||
DB *RegistrationDB | DB *RegistrationDB | |||
} | } | |||
func New(opts *Options) (*NSQLookupd, error) { | func New(opts *Options) (*NSQLookupd, error) { | |||
var err error | var err error | |||
if opts.Logger == nil { | if opts.Logger == nil { | |||
opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Lt ime|log.Lmicroseconds) | opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Lt ime|log.Lmicroseconds) | |||
} | } | |||
l := &NSQLookupd{ | l := &NSQLookupd{ | |||
opts: opts, | opts: opts, | |||
DB: NewRegistrationDB(), | DB: NewRegistrationDB(), | |||
} | } | |||
l.logf(LOG_INFO, version.String("nsqlookupd")) | l.logf(LOG_INFO, version.String("nsqlookupd")) | |||
l.tcpServer = &tcpServer{nsqlookupd: l} | ||||
l.tcpListener, err = net.Listen("tcp", opts.TCPAddress) | l.tcpListener, err = net.Listen("tcp", opts.TCPAddress) | |||
if err != nil { | if err != nil { | |||
return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress , err) | return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress , err) | |||
} | } | |||
l.httpListener, err = net.Listen("tcp", opts.HTTPAddress) | l.httpListener, err = net.Listen("tcp", opts.HTTPAddress) | |||
if err != nil { | if err != nil { | |||
return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress , err) | return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPAddres s, err) | |||
} | } | |||
return l, nil | return l, nil | |||
} | } | |||
// Main starts an instance of nsqlookupd and returns an | // Main starts an instance of nsqlookupd and returns an | |||
// error if there was a problem starting up. | // error if there was a problem starting up. | |||
func (l *NSQLookupd) Main() error { | func (l *NSQLookupd) Main() error { | |||
ctx := &Context{l} | ||||
exitCh := make(chan error) | exitCh := make(chan error) | |||
var once sync.Once | var once sync.Once | |||
exitFunc := func(err error) { | exitFunc := func(err error) { | |||
once.Do(func() { | once.Do(func() { | |||
if err != nil { | if err != nil { | |||
l.logf(LOG_FATAL, "%s", err) | l.logf(LOG_FATAL, "%s", err) | |||
} | } | |||
exitCh <- err | exitCh <- err | |||
}) | }) | |||
} | } | |||
tcpServer := &tcpServer{ctx: ctx} | ||||
l.waitGroup.Wrap(func() { | l.waitGroup.Wrap(func() { | |||
exitFunc(protocol.TCPServer(l.tcpListener, tcpServer, l.logf)) | exitFunc(protocol.TCPServer(l.tcpListener, l.tcpServer, l.logf)) | |||
}) | }) | |||
httpServer := newHTTPServer(ctx) | httpServer := newHTTPServer(l) | |||
l.waitGroup.Wrap(func() { | l.waitGroup.Wrap(func() { | |||
exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.log f)) | exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.log f)) | |||
}) | }) | |||
err := <-exitCh | err := <-exitCh | |||
return err | return err | |||
} | } | |||
func (l *NSQLookupd) RealTCPAddr() *net.TCPAddr { | func (l *NSQLookupd) RealTCPAddr() *net.TCPAddr { | |||
return l.tcpListener.Addr().(*net.TCPAddr) | return l.tcpListener.Addr().(*net.TCPAddr) | |||
skipping to change at line 92 | skipping to change at line 91 | |||
func (l *NSQLookupd) RealHTTPAddr() *net.TCPAddr { | func (l *NSQLookupd) RealHTTPAddr() *net.TCPAddr { | |||
return l.httpListener.Addr().(*net.TCPAddr) | return l.httpListener.Addr().(*net.TCPAddr) | |||
} | } | |||
func (l *NSQLookupd) Exit() { | func (l *NSQLookupd) Exit() { | |||
if l.tcpListener != nil { | if l.tcpListener != nil { | |||
l.tcpListener.Close() | l.tcpListener.Close() | |||
} | } | |||
if l.tcpServer != nil { | ||||
l.tcpServer.Close() | ||||
} | ||||
if l.httpListener != nil { | if l.httpListener != nil { | |||
l.httpListener.Close() | l.httpListener.Close() | |||
} | } | |||
l.waitGroup.Wait() | l.waitGroup.Wait() | |||
} | } | |||
End of changes. 8 change blocks. | ||||
6 lines changed or deleted | 9 lines changed or added |