"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "nsqlookupd/nsqlookupd.go" between
nsq-1.2.0.tar.gz and nsq-1.2.1.tar.gz

About: nsq is a realtime distributed and and decentralized messaging platform.

nsqlookupd.go  (nsq-1.2.0):nsqlookupd.go  (nsq-1.2.1)
skipping to change at line 21 skipping to change at line 21
"github.com/nsqio/nsq/internal/protocol" "github.com/nsqio/nsq/internal/protocol"
"github.com/nsqio/nsq/internal/util" "github.com/nsqio/nsq/internal/util"
"github.com/nsqio/nsq/internal/version" "github.com/nsqio/nsq/internal/version"
) )
type NSQLookupd struct { type NSQLookupd struct {
sync.RWMutex sync.RWMutex
opts *Options opts *Options
tcpListener net.Listener tcpListener net.Listener
httpListener net.Listener httpListener net.Listener
tcpServer *tcpServer
waitGroup util.WaitGroupWrapper waitGroup util.WaitGroupWrapper
DB *RegistrationDB DB *RegistrationDB
} }
func New(opts *Options) (*NSQLookupd, error) { func New(opts *Options) (*NSQLookupd, error) {
var err error var err error
if opts.Logger == nil { if opts.Logger == nil {
opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Lt ime|log.Lmicroseconds) opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Lt ime|log.Lmicroseconds)
} }
l := &NSQLookupd{ l := &NSQLookupd{
opts: opts, opts: opts,
DB: NewRegistrationDB(), DB: NewRegistrationDB(),
} }
l.logf(LOG_INFO, version.String("nsqlookupd")) l.logf(LOG_INFO, version.String("nsqlookupd"))
l.tcpServer = &tcpServer{nsqlookupd: l}
l.tcpListener, err = net.Listen("tcp", opts.TCPAddress) l.tcpListener, err = net.Listen("tcp", opts.TCPAddress)
if err != nil { if err != nil {
return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress , err) return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress , err)
} }
l.httpListener, err = net.Listen("tcp", opts.HTTPAddress) l.httpListener, err = net.Listen("tcp", opts.HTTPAddress)
if err != nil { if err != nil {
return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress , err) return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPAddres s, err)
} }
return l, nil return l, nil
} }
// Main starts an instance of nsqlookupd and returns an // Main starts an instance of nsqlookupd and returns an
// error if there was a problem starting up. // error if there was a problem starting up.
func (l *NSQLookupd) Main() error { func (l *NSQLookupd) Main() error {
ctx := &Context{l}
exitCh := make(chan error) exitCh := make(chan error)
var once sync.Once var once sync.Once
exitFunc := func(err error) { exitFunc := func(err error) {
once.Do(func() { once.Do(func() {
if err != nil { if err != nil {
l.logf(LOG_FATAL, "%s", err) l.logf(LOG_FATAL, "%s", err)
} }
exitCh <- err exitCh <- err
}) })
} }
tcpServer := &tcpServer{ctx: ctx}
l.waitGroup.Wrap(func() { l.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(l.tcpListener, tcpServer, l.logf)) exitFunc(protocol.TCPServer(l.tcpListener, l.tcpServer, l.logf))
}) })
httpServer := newHTTPServer(ctx) httpServer := newHTTPServer(l)
l.waitGroup.Wrap(func() { l.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.log f)) exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.log f))
}) })
err := <-exitCh err := <-exitCh
return err return err
} }
func (l *NSQLookupd) RealTCPAddr() *net.TCPAddr { func (l *NSQLookupd) RealTCPAddr() *net.TCPAddr {
return l.tcpListener.Addr().(*net.TCPAddr) return l.tcpListener.Addr().(*net.TCPAddr)
skipping to change at line 92 skipping to change at line 91
func (l *NSQLookupd) RealHTTPAddr() *net.TCPAddr { func (l *NSQLookupd) RealHTTPAddr() *net.TCPAddr {
return l.httpListener.Addr().(*net.TCPAddr) return l.httpListener.Addr().(*net.TCPAddr)
} }
func (l *NSQLookupd) Exit() { func (l *NSQLookupd) Exit() {
if l.tcpListener != nil { if l.tcpListener != nil {
l.tcpListener.Close() l.tcpListener.Close()
} }
if l.tcpServer != nil {
l.tcpServer.Close()
}
if l.httpListener != nil { if l.httpListener != nil {
l.httpListener.Close() l.httpListener.Close()
} }
l.waitGroup.Wait() l.waitGroup.Wait()
} }
 End of changes. 8 change blocks. 
6 lines changed or deleted 9 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)