nsqd.go (nsq-1.2.0) | : | nsqd.go (nsq-1.2.1) | ||
---|---|---|---|---|
package nsqd | package nsqd | |||
import ( | import ( | |||
"context" | ||||
"crypto/tls" | "crypto/tls" | |||
"crypto/x509" | "crypto/x509" | |||
"encoding/json" | "encoding/json" | |||
"errors" | "errors" | |||
"fmt" | "fmt" | |||
"io/ioutil" | "io/ioutil" | |||
"log" | "log" | |||
"math/rand" | "math/rand" | |||
"net" | "net" | |||
"os" | "os" | |||
skipping to change at line 39 | skipping to change at line 40 | |||
const ( | const ( | |||
TLSNotRequired = iota | TLSNotRequired = iota | |||
TLSRequiredExceptHTTP | TLSRequiredExceptHTTP | |||
TLSRequired | TLSRequired | |||
) | ) | |||
type errStore struct { | type errStore struct { | |||
err error | err error | |||
} | } | |||
type Client interface { | ||||
Stats() ClientStats | ||||
IsProducer() bool | ||||
} | ||||
type NSQD struct { | type NSQD struct { | |||
// 64bit atomic vars need to be first for proper alignment on 32bit platf orms | // 64bit atomic vars need to be first for proper alignment on 32bit platf orms | |||
clientIDSequence int64 | clientIDSequence int64 | |||
sync.RWMutex | sync.RWMutex | |||
ctx context.Context | ||||
// ctxCancel cancels a context that main() is waiting on | ||||
ctxCancel context.CancelFunc | ||||
opts atomic.Value | opts atomic.Value | |||
dl *dirlock.DirLock | dl *dirlock.DirLock | |||
isLoading int32 | isLoading int32 | |||
isExiting int32 | ||||
errValue atomic.Value | errValue atomic.Value | |||
startTime time.Time | startTime time.Time | |||
topicMap map[string]*Topic | topicMap map[string]*Topic | |||
clientLock sync.RWMutex | ||||
clients map[int64]Client | ||||
lookupPeers atomic.Value | lookupPeers atomic.Value | |||
tcpServer *tcpServer | ||||
tcpListener net.Listener | tcpListener net.Listener | |||
httpListener net.Listener | httpListener net.Listener | |||
httpsListener net.Listener | httpsListener net.Listener | |||
tlsConfig *tls.Config | tlsConfig *tls.Config | |||
poolSize int | poolSize int | |||
notifyChan chan interface{} | notifyChan chan interface{} | |||
optsNotificationChan chan struct{} | optsNotificationChan chan struct{} | |||
exitChan chan int | exitChan chan int | |||
skipping to change at line 94 | skipping to change at line 92 | |||
cwd, _ := os.Getwd() | cwd, _ := os.Getwd() | |||
dataPath = cwd | dataPath = cwd | |||
} | } | |||
if opts.Logger == nil { | if opts.Logger == nil { | |||
opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Lt ime|log.Lmicroseconds) | opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Lt ime|log.Lmicroseconds) | |||
} | } | |||
n := &NSQD{ | n := &NSQD{ | |||
startTime: time.Now(), | startTime: time.Now(), | |||
topicMap: make(map[string]*Topic), | topicMap: make(map[string]*Topic), | |||
clients: make(map[int64]Client), | ||||
exitChan: make(chan int), | exitChan: make(chan int), | |||
notifyChan: make(chan interface{}), | notifyChan: make(chan interface{}), | |||
optsNotificationChan: make(chan struct{}, 1), | optsNotificationChan: make(chan struct{}, 1), | |||
dl: dirlock.New(dataPath), | dl: dirlock.New(dataPath), | |||
} | } | |||
n.ctx, n.ctxCancel = context.WithCancel(context.Background()) | ||||
httpcli := http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HT TPClientRequestTimeout) | httpcli := http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HT TPClientRequestTimeout) | |||
n.ci = clusterinfo.New(n.logf, httpcli) | n.ci = clusterinfo.New(n.logf, httpcli) | |||
n.lookupPeers.Store([]*lookupPeer{}) | n.lookupPeers.Store([]*lookupPeer{}) | |||
n.swapOpts(opts) | n.swapOpts(opts) | |||
n.errValue.Store(errStore{}) | n.errValue.Store(errStore{}) | |||
err = n.dl.Lock() | err = n.dl.Lock() | |||
if err != nil { | if err != nil { | |||
return nil, fmt.Errorf("--data-path=%s in use (possibly by anothe r instance of nsqd)", dataPath) | return nil, fmt.Errorf("failed to lock data-path: %v", err) | |||
} | } | |||
if opts.MaxDeflateLevel < 1 || opts.MaxDeflateLevel > 9 { | if opts.MaxDeflateLevel < 1 || opts.MaxDeflateLevel > 9 { | |||
return nil, errors.New("--max-deflate-level must be [1,9]") | return nil, errors.New("--max-deflate-level must be [1,9]") | |||
} | } | |||
if opts.ID < 0 || opts.ID >= 1024 { | if opts.ID < 0 || opts.ID >= 1024 { | |||
return nil, errors.New("--node-id must be [0,1024)") | return nil, errors.New("--node-id must be [0,1024)") | |||
} | } | |||
if opts.StatsdPrefix != "" { | ||||
var port string | ||||
_, port, err = net.SplitHostPort(opts.HTTPAddress) | ||||
if err != nil { | ||||
return nil, fmt.Errorf("failed to parse HTTP address (%s) | ||||
- %s", opts.HTTPAddress, err) | ||||
} | ||||
statsdHostKey := statsd.HostKey(net.JoinHostPort(opts.BroadcastAd | ||||
dress, port)) | ||||
prefixWithHost := strings.Replace(opts.StatsdPrefix, "%s", statsd | ||||
HostKey, -1) | ||||
if prefixWithHost[len(prefixWithHost)-1] != '.' { | ||||
prefixWithHost += "." | ||||
} | ||||
opts.StatsdPrefix = prefixWithHost | ||||
} | ||||
if opts.TLSClientAuthPolicy != "" && opts.TLSRequired == TLSNotRequired { | if opts.TLSClientAuthPolicy != "" && opts.TLSRequired == TLSNotRequired { | |||
opts.TLSRequired = TLSRequired | opts.TLSRequired = TLSRequired | |||
} | } | |||
tlsConfig, err := buildTLSConfig(opts) | tlsConfig, err := buildTLSConfig(opts) | |||
if err != nil { | if err != nil { | |||
return nil, fmt.Errorf("failed to build TLS config - %s", err) | return nil, fmt.Errorf("failed to build TLS config - %s", err) | |||
} | } | |||
if tlsConfig == nil && opts.TLSRequired != TLSNotRequired { | if tlsConfig == nil && opts.TLSRequired != TLSNotRequired { | |||
return nil, errors.New("cannot require TLS client connections wit hout TLS key and cert") | return nil, errors.New("cannot require TLS client connections wit hout TLS key and cert") | |||
skipping to change at line 157 | skipping to change at line 141 | |||
for _, v := range opts.E2EProcessingLatencyPercentiles { | for _, v := range opts.E2EProcessingLatencyPercentiles { | |||
if v <= 0 || v > 1 { | if v <= 0 || v > 1 { | |||
return nil, fmt.Errorf("invalid E2E processing latency pe rcentile: %v", v) | return nil, fmt.Errorf("invalid E2E processing latency pe rcentile: %v", v) | |||
} | } | |||
} | } | |||
n.logf(LOG_INFO, version.String("nsqd")) | n.logf(LOG_INFO, version.String("nsqd")) | |||
n.logf(LOG_INFO, "ID: %d", opts.ID) | n.logf(LOG_INFO, "ID: %d", opts.ID) | |||
n.tcpServer = &tcpServer{nsqd: n} | ||||
n.tcpListener, err = net.Listen("tcp", opts.TCPAddress) | n.tcpListener, err = net.Listen("tcp", opts.TCPAddress) | |||
if err != nil { | if err != nil { | |||
return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress , err) | return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress , err) | |||
} | } | |||
n.httpListener, err = net.Listen("tcp", opts.HTTPAddress) | n.httpListener, err = net.Listen("tcp", opts.HTTPAddress) | |||
if err != nil { | if err != nil { | |||
return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPAddres s, err) | return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPAddres s, err) | |||
} | } | |||
if n.tlsConfig != nil && opts.HTTPSAddress != "" { | if n.tlsConfig != nil && opts.HTTPSAddress != "" { | |||
n.httpsListener, err = tls.Listen("tcp", opts.HTTPSAddress, n.tls Config) | n.httpsListener, err = tls.Listen("tcp", opts.HTTPSAddress, n.tls Config) | |||
if err != nil { | if err != nil { | |||
return nil, fmt.Errorf("listen (%s) failed - %s", opts.HT TPSAddress, err) | return nil, fmt.Errorf("listen (%s) failed - %s", opts.HT TPSAddress, err) | |||
} | } | |||
} | } | |||
if opts.BroadcastHTTPPort == 0 { | ||||
opts.BroadcastHTTPPort = n.RealHTTPAddr().Port | ||||
} | ||||
if opts.BroadcastTCPPort == 0 { | ||||
opts.BroadcastTCPPort = n.RealTCPAddr().Port | ||||
} | ||||
if opts.StatsdPrefix != "" { | ||||
var port string = fmt.Sprint(opts.BroadcastHTTPPort) | ||||
statsdHostKey := statsd.HostKey(net.JoinHostPort(opts.BroadcastAd | ||||
dress, port)) | ||||
prefixWithHost := strings.Replace(opts.StatsdPrefix, "%s", statsd | ||||
HostKey, -1) | ||||
if prefixWithHost[len(prefixWithHost)-1] != '.' { | ||||
prefixWithHost += "." | ||||
} | ||||
opts.StatsdPrefix = prefixWithHost | ||||
} | ||||
return n, nil | return n, nil | |||
} | } | |||
func (n *NSQD) getOpts() *Options { | func (n *NSQD) getOpts() *Options { | |||
return n.opts.Load().(*Options) | return n.opts.Load().(*Options) | |||
} | } | |||
func (n *NSQD) swapOpts(opts *Options) { | func (n *NSQD) swapOpts(opts *Options) { | |||
n.opts.Store(opts) | n.opts.Store(opts) | |||
} | } | |||
skipping to change at line 227 | skipping to change at line 230 | |||
if err != nil { | if err != nil { | |||
return fmt.Sprintf("NOK - %s", err) | return fmt.Sprintf("NOK - %s", err) | |||
} | } | |||
return "OK" | return "OK" | |||
} | } | |||
func (n *NSQD) GetStartTime() time.Time { | func (n *NSQD) GetStartTime() time.Time { | |||
return n.startTime | return n.startTime | |||
} | } | |||
func (n *NSQD) AddClient(clientID int64, client Client) { | ||||
n.clientLock.Lock() | ||||
n.clients[clientID] = client | ||||
n.clientLock.Unlock() | ||||
} | ||||
func (n *NSQD) RemoveClient(clientID int64) { | ||||
n.clientLock.Lock() | ||||
_, ok := n.clients[clientID] | ||||
if !ok { | ||||
n.clientLock.Unlock() | ||||
return | ||||
} | ||||
delete(n.clients, clientID) | ||||
n.clientLock.Unlock() | ||||
} | ||||
func (n *NSQD) Main() error { | func (n *NSQD) Main() error { | |||
ctx := &context{n} | ||||
exitCh := make(chan error) | exitCh := make(chan error) | |||
var once sync.Once | var once sync.Once | |||
exitFunc := func(err error) { | exitFunc := func(err error) { | |||
once.Do(func() { | once.Do(func() { | |||
if err != nil { | if err != nil { | |||
n.logf(LOG_FATAL, "%s", err) | n.logf(LOG_FATAL, "%s", err) | |||
} | } | |||
exitCh <- err | exitCh <- err | |||
}) | }) | |||
} | } | |||
tcpServer := &tcpServer{ctx: ctx} | ||||
n.waitGroup.Wrap(func() { | n.waitGroup.Wrap(func() { | |||
exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf)) | exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf)) | |||
}) | }) | |||
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSReq | ||||
uired) | httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequi | |||
red) | ||||
n.waitGroup.Wrap(func() { | n.waitGroup.Wrap(func() { | |||
exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.log f)) | exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.log f)) | |||
}) | }) | |||
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { | if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { | |||
httpsServer := newHTTPServer(ctx, true, true) | httpsServer := newHTTPServer(n, true, true) | |||
n.waitGroup.Wrap(func() { | n.waitGroup.Wrap(func() { | |||
exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HT TPS", n.logf)) | exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HT TPS", n.logf)) | |||
}) | }) | |||
} | } | |||
n.waitGroup.Wrap(n.queueScanLoop) | n.waitGroup.Wrap(n.queueScanLoop) | |||
n.waitGroup.Wrap(n.lookupLoop) | n.waitGroup.Wrap(n.lookupLoop) | |||
if n.getOpts().StatsdAddress != "" { | if n.getOpts().StatsdAddress != "" { | |||
n.waitGroup.Wrap(n.statsdLoop) | n.waitGroup.Wrap(n.statsdLoop) | |||
} | } | |||
skipping to change at line 384 | skipping to change at line 368 | |||
for _, topic := range n.topicMap { | for _, topic := range n.topicMap { | |||
if topic.ephemeral { | if topic.ephemeral { | |||
continue | continue | |||
} | } | |||
topicData := make(map[string]interface{}) | topicData := make(map[string]interface{}) | |||
topicData["name"] = topic.name | topicData["name"] = topic.name | |||
topicData["paused"] = topic.IsPaused() | topicData["paused"] = topic.IsPaused() | |||
channels := []interface{}{} | channels := []interface{}{} | |||
topic.Lock() | topic.Lock() | |||
for _, channel := range topic.channelMap { | for _, channel := range topic.channelMap { | |||
channel.Lock() | ||||
if channel.ephemeral { | if channel.ephemeral { | |||
channel.Unlock() | ||||
continue | continue | |||
} | } | |||
channel.Lock() | ||||
channelData := make(map[string]interface{}) | channelData := make(map[string]interface{}) | |||
channelData["name"] = channel.name | channelData["name"] = channel.name | |||
channelData["paused"] = channel.IsPaused() | channelData["paused"] = channel.IsPaused() | |||
channels = append(channels, channelData) | ||||
channel.Unlock() | channel.Unlock() | |||
channels = append(channels, channelData) | ||||
} | } | |||
topic.Unlock() | topic.Unlock() | |||
topicData["channels"] = channels | topicData["channels"] = channels | |||
topics = append(topics, topicData) | topics = append(topics, topicData) | |||
} | } | |||
js["version"] = version.Binary | js["version"] = version.Binary | |||
js["topics"] = topics | js["topics"] = topics | |||
data, err := json.Marshal(&js) | data, err := json.Marshal(&js) | |||
if err != nil { | if err != nil { | |||
skipping to change at line 423 | skipping to change at line 406 | |||
err = os.Rename(tmpFileName, fileName) | err = os.Rename(tmpFileName, fileName) | |||
if err != nil { | if err != nil { | |||
return err | return err | |||
} | } | |||
// technically should fsync DataPath here | // technically should fsync DataPath here | |||
return nil | return nil | |||
} | } | |||
func (n *NSQD) Exit() { | func (n *NSQD) Exit() { | |||
if !atomic.CompareAndSwapInt32(&n.isExiting, 0, 1) { | ||||
// avoid double call | ||||
return | ||||
} | ||||
if n.tcpListener != nil { | if n.tcpListener != nil { | |||
n.tcpListener.Close() | n.tcpListener.Close() | |||
} | } | |||
if n.tcpServer != nil { | ||||
n.tcpServer.Close() | ||||
} | ||||
if n.httpListener != nil { | if n.httpListener != nil { | |||
n.httpListener.Close() | n.httpListener.Close() | |||
} | } | |||
if n.httpsListener != nil { | if n.httpsListener != nil { | |||
n.httpsListener.Close() | n.httpsListener.Close() | |||
} | } | |||
n.Lock() | n.Lock() | |||
err := n.PersistMetadata() | err := n.PersistMetadata() | |||
skipping to change at line 451 | skipping to change at line 442 | |||
for _, topic := range n.topicMap { | for _, topic := range n.topicMap { | |||
topic.Close() | topic.Close() | |||
} | } | |||
n.Unlock() | n.Unlock() | |||
n.logf(LOG_INFO, "NSQ: stopping subsystems") | n.logf(LOG_INFO, "NSQ: stopping subsystems") | |||
close(n.exitChan) | close(n.exitChan) | |||
n.waitGroup.Wait() | n.waitGroup.Wait() | |||
n.dl.Unlock() | n.dl.Unlock() | |||
n.logf(LOG_INFO, "NSQ: bye") | n.logf(LOG_INFO, "NSQ: bye") | |||
n.ctxCancel() | ||||
} | } | |||
// GetTopic performs a thread safe operation | // GetTopic performs a thread safe operation | |||
// to return a pointer to a Topic object (potentially new) | // to return a pointer to a Topic object (potentially new) | |||
func (n *NSQD) GetTopic(topicName string) *Topic { | func (n *NSQD) GetTopic(topicName string) *Topic { | |||
// most likely, we already have this topic, so try read lock first. | // most likely we already have this topic, so try read lock first | |||
n.RLock() | n.RLock() | |||
t, ok := n.topicMap[topicName] | t, ok := n.topicMap[topicName] | |||
n.RUnlock() | n.RUnlock() | |||
if ok { | if ok { | |||
return t | return t | |||
} | } | |||
n.Lock() | n.Lock() | |||
t, ok = n.topicMap[topicName] | t, ok = n.topicMap[topicName] | |||
if ok { | if ok { | |||
n.Unlock() | n.Unlock() | |||
return t | return t | |||
} | } | |||
deleteCallback := func(t *Topic) { | deleteCallback := func(t *Topic) { | |||
n.DeleteExistingTopic(t.name) | n.DeleteExistingTopic(t.name) | |||
} | } | |||
t = NewTopic(topicName, &context{n}, deleteCallback) | t = NewTopic(topicName, n, deleteCallback) | |||
n.topicMap[topicName] = t | n.topicMap[topicName] = t | |||
n.Unlock() | n.Unlock() | |||
n.logf(LOG_INFO, "TOPIC(%s): created", t.name) | n.logf(LOG_INFO, "TOPIC(%s): created", t.name) | |||
// topic is created but messagePump not yet started | // topic is created but messagePump not yet started | |||
// if loading metadata at startup, no lookupd connections yet, topic star | // if this topic was created while loading metadata at startup don't do a | |||
ted after load | ny further initialization | |||
// (topic will be "started" after loading completes) | ||||
if atomic.LoadInt32(&n.isLoading) == 1 { | if atomic.LoadInt32(&n.isLoading) == 1 { | |||
return t | return t | |||
} | } | |||
// if using lookupd, make a blocking call to get the topics, and immediat | // if using lookupd, make a blocking call to get channels and immediately | |||
ely create them. | create them | |||
// this makes sure that any message received is buffered to the right cha | // to ensure that all channels receive published messages | |||
nnels | ||||
lookupdHTTPAddrs := n.lookupdHTTPAddrs() | lookupdHTTPAddrs := n.lookupdHTTPAddrs() | |||
if len(lookupdHTTPAddrs) > 0 { | if len(lookupdHTTPAddrs) > 0 { | |||
channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupd HTTPAddrs) | channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupd HTTPAddrs) | |||
if err != nil { | if err != nil { | |||
n.logf(LOG_WARN, "failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err) | n.logf(LOG_WARN, "failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err) | |||
} | } | |||
for _, channelName := range channelNames { | for _, channelName := range channelNames { | |||
if strings.HasSuffix(channelName, "#ephemeral") { | if strings.HasSuffix(channelName, "#ephemeral") { | |||
continue // do not create ephemeral channel with no consumer client | continue // do not create ephemeral channel with no consumer client | |||
} | } | |||
skipping to change at line 546 | skipping to change at line 539 | |||
// to enforce ordering | // to enforce ordering | |||
topic.Delete() | topic.Delete() | |||
n.Lock() | n.Lock() | |||
delete(n.topicMap, topicName) | delete(n.topicMap, topicName) | |||
n.Unlock() | n.Unlock() | |||
return nil | return nil | |||
} | } | |||
func (n *NSQD) Notify(v interface{}) { | func (n *NSQD) Notify(v interface{}, persist bool) { | |||
// since the in-memory metadata is incomplete, | // since the in-memory metadata is incomplete, | |||
// should not persist metadata while loading it. | // should not persist metadata while loading it. | |||
// nsqd will call `PersistMetadata` it after loading | // nsqd will call `PersistMetadata` it after loading | |||
persist := atomic.LoadInt32(&n.isLoading) == 0 | loading := atomic.LoadInt32(&n.isLoading) == 1 | |||
n.waitGroup.Wrap(func() { | n.waitGroup.Wrap(func() { | |||
// by selecting on exitChan we guarantee that | // by selecting on exitChan we guarantee that | |||
// we do not block exit, see issue #123 | // we do not block exit, see issue #123 | |||
select { | select { | |||
case <-n.exitChan: | case <-n.exitChan: | |||
case n.notifyChan <- v: | case n.notifyChan <- v: | |||
if !persist { | if loading || !persist { | |||
return | return | |||
} | } | |||
n.Lock() | n.Lock() | |||
err := n.PersistMetadata() | err := n.PersistMetadata() | |||
if err != nil { | if err != nil { | |||
n.logf(LOG_ERROR, "failed to persist metadata - % s", err) | n.logf(LOG_ERROR, "failed to persist metadata - % s", err) | |||
} | } | |||
n.Unlock() | n.Unlock() | |||
} | } | |||
}) | }) | |||
skipping to change at line 750 | skipping to change at line 743 | |||
} | } | |||
tlsConfig.BuildNameToCertificate() | tlsConfig.BuildNameToCertificate() | |||
return tlsConfig, nil | return tlsConfig, nil | |||
} | } | |||
func (n *NSQD) IsAuthEnabled() bool { | func (n *NSQD) IsAuthEnabled() bool { | |||
return len(n.getOpts().AuthHTTPAddresses) != 0 | return len(n.getOpts().AuthHTTPAddresses) != 0 | |||
} | } | |||
// Context returns a context that will be canceled when nsqd initiates the shutd | ||||
own | ||||
func (n *NSQD) Context() context.Context { | ||||
return n.ctx | ||||
} | ||||
End of changes. 34 change blocks. | ||||
65 lines changed or deleted | 56 lines changed or added |