"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "nsqd/nsqd.go" between
nsq-1.2.0.tar.gz and nsq-1.2.1.tar.gz

About: nsq is a realtime distributed and and decentralized messaging platform.

nsqd.go  (nsq-1.2.0):nsqd.go  (nsq-1.2.1)
package nsqd package nsqd
import ( import (
"context"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"math/rand" "math/rand"
"net" "net"
"os" "os"
skipping to change at line 39 skipping to change at line 40
const ( const (
TLSNotRequired = iota TLSNotRequired = iota
TLSRequiredExceptHTTP TLSRequiredExceptHTTP
TLSRequired TLSRequired
) )
type errStore struct { type errStore struct {
err error err error
} }
type Client interface {
Stats() ClientStats
IsProducer() bool
}
type NSQD struct { type NSQD struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platf orms // 64bit atomic vars need to be first for proper alignment on 32bit platf orms
clientIDSequence int64 clientIDSequence int64
sync.RWMutex sync.RWMutex
ctx context.Context
// ctxCancel cancels a context that main() is waiting on
ctxCancel context.CancelFunc
opts atomic.Value opts atomic.Value
dl *dirlock.DirLock dl *dirlock.DirLock
isLoading int32 isLoading int32
isExiting int32
errValue atomic.Value errValue atomic.Value
startTime time.Time startTime time.Time
topicMap map[string]*Topic topicMap map[string]*Topic
clientLock sync.RWMutex
clients map[int64]Client
lookupPeers atomic.Value lookupPeers atomic.Value
tcpServer *tcpServer
tcpListener net.Listener tcpListener net.Listener
httpListener net.Listener httpListener net.Listener
httpsListener net.Listener httpsListener net.Listener
tlsConfig *tls.Config tlsConfig *tls.Config
poolSize int poolSize int
notifyChan chan interface{} notifyChan chan interface{}
optsNotificationChan chan struct{} optsNotificationChan chan struct{}
exitChan chan int exitChan chan int
skipping to change at line 94 skipping to change at line 92
cwd, _ := os.Getwd() cwd, _ := os.Getwd()
dataPath = cwd dataPath = cwd
} }
if opts.Logger == nil { if opts.Logger == nil {
opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Lt ime|log.Lmicroseconds) opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Lt ime|log.Lmicroseconds)
} }
n := &NSQD{ n := &NSQD{
startTime: time.Now(), startTime: time.Now(),
topicMap: make(map[string]*Topic), topicMap: make(map[string]*Topic),
clients: make(map[int64]Client),
exitChan: make(chan int), exitChan: make(chan int),
notifyChan: make(chan interface{}), notifyChan: make(chan interface{}),
optsNotificationChan: make(chan struct{}, 1), optsNotificationChan: make(chan struct{}, 1),
dl: dirlock.New(dataPath), dl: dirlock.New(dataPath),
} }
n.ctx, n.ctxCancel = context.WithCancel(context.Background())
httpcli := http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HT TPClientRequestTimeout) httpcli := http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HT TPClientRequestTimeout)
n.ci = clusterinfo.New(n.logf, httpcli) n.ci = clusterinfo.New(n.logf, httpcli)
n.lookupPeers.Store([]*lookupPeer{}) n.lookupPeers.Store([]*lookupPeer{})
n.swapOpts(opts) n.swapOpts(opts)
n.errValue.Store(errStore{}) n.errValue.Store(errStore{})
err = n.dl.Lock() err = n.dl.Lock()
if err != nil { if err != nil {
return nil, fmt.Errorf("--data-path=%s in use (possibly by anothe r instance of nsqd)", dataPath) return nil, fmt.Errorf("failed to lock data-path: %v", err)
} }
if opts.MaxDeflateLevel < 1 || opts.MaxDeflateLevel > 9 { if opts.MaxDeflateLevel < 1 || opts.MaxDeflateLevel > 9 {
return nil, errors.New("--max-deflate-level must be [1,9]") return nil, errors.New("--max-deflate-level must be [1,9]")
} }
if opts.ID < 0 || opts.ID >= 1024 { if opts.ID < 0 || opts.ID >= 1024 {
return nil, errors.New("--node-id must be [0,1024)") return nil, errors.New("--node-id must be [0,1024)")
} }
if opts.StatsdPrefix != "" {
var port string
_, port, err = net.SplitHostPort(opts.HTTPAddress)
if err != nil {
return nil, fmt.Errorf("failed to parse HTTP address (%s)
- %s", opts.HTTPAddress, err)
}
statsdHostKey := statsd.HostKey(net.JoinHostPort(opts.BroadcastAd
dress, port))
prefixWithHost := strings.Replace(opts.StatsdPrefix, "%s", statsd
HostKey, -1)
if prefixWithHost[len(prefixWithHost)-1] != '.' {
prefixWithHost += "."
}
opts.StatsdPrefix = prefixWithHost
}
if opts.TLSClientAuthPolicy != "" && opts.TLSRequired == TLSNotRequired { if opts.TLSClientAuthPolicy != "" && opts.TLSRequired == TLSNotRequired {
opts.TLSRequired = TLSRequired opts.TLSRequired = TLSRequired
} }
tlsConfig, err := buildTLSConfig(opts) tlsConfig, err := buildTLSConfig(opts)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to build TLS config - %s", err) return nil, fmt.Errorf("failed to build TLS config - %s", err)
} }
if tlsConfig == nil && opts.TLSRequired != TLSNotRequired { if tlsConfig == nil && opts.TLSRequired != TLSNotRequired {
return nil, errors.New("cannot require TLS client connections wit hout TLS key and cert") return nil, errors.New("cannot require TLS client connections wit hout TLS key and cert")
skipping to change at line 157 skipping to change at line 141
for _, v := range opts.E2EProcessingLatencyPercentiles { for _, v := range opts.E2EProcessingLatencyPercentiles {
if v <= 0 || v > 1 { if v <= 0 || v > 1 {
return nil, fmt.Errorf("invalid E2E processing latency pe rcentile: %v", v) return nil, fmt.Errorf("invalid E2E processing latency pe rcentile: %v", v)
} }
} }
n.logf(LOG_INFO, version.String("nsqd")) n.logf(LOG_INFO, version.String("nsqd"))
n.logf(LOG_INFO, "ID: %d", opts.ID) n.logf(LOG_INFO, "ID: %d", opts.ID)
n.tcpServer = &tcpServer{nsqd: n}
n.tcpListener, err = net.Listen("tcp", opts.TCPAddress) n.tcpListener, err = net.Listen("tcp", opts.TCPAddress)
if err != nil { if err != nil {
return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress , err) return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress , err)
} }
n.httpListener, err = net.Listen("tcp", opts.HTTPAddress) n.httpListener, err = net.Listen("tcp", opts.HTTPAddress)
if err != nil { if err != nil {
return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPAddres s, err) return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPAddres s, err)
} }
if n.tlsConfig != nil && opts.HTTPSAddress != "" { if n.tlsConfig != nil && opts.HTTPSAddress != "" {
n.httpsListener, err = tls.Listen("tcp", opts.HTTPSAddress, n.tls Config) n.httpsListener, err = tls.Listen("tcp", opts.HTTPSAddress, n.tls Config)
if err != nil { if err != nil {
return nil, fmt.Errorf("listen (%s) failed - %s", opts.HT TPSAddress, err) return nil, fmt.Errorf("listen (%s) failed - %s", opts.HT TPSAddress, err)
} }
} }
if opts.BroadcastHTTPPort == 0 {
opts.BroadcastHTTPPort = n.RealHTTPAddr().Port
}
if opts.BroadcastTCPPort == 0 {
opts.BroadcastTCPPort = n.RealTCPAddr().Port
}
if opts.StatsdPrefix != "" {
var port string = fmt.Sprint(opts.BroadcastHTTPPort)
statsdHostKey := statsd.HostKey(net.JoinHostPort(opts.BroadcastAd
dress, port))
prefixWithHost := strings.Replace(opts.StatsdPrefix, "%s", statsd
HostKey, -1)
if prefixWithHost[len(prefixWithHost)-1] != '.' {
prefixWithHost += "."
}
opts.StatsdPrefix = prefixWithHost
}
return n, nil return n, nil
} }
func (n *NSQD) getOpts() *Options { func (n *NSQD) getOpts() *Options {
return n.opts.Load().(*Options) return n.opts.Load().(*Options)
} }
func (n *NSQD) swapOpts(opts *Options) { func (n *NSQD) swapOpts(opts *Options) {
n.opts.Store(opts) n.opts.Store(opts)
} }
skipping to change at line 227 skipping to change at line 230
if err != nil { if err != nil {
return fmt.Sprintf("NOK - %s", err) return fmt.Sprintf("NOK - %s", err)
} }
return "OK" return "OK"
} }
func (n *NSQD) GetStartTime() time.Time { func (n *NSQD) GetStartTime() time.Time {
return n.startTime return n.startTime
} }
func (n *NSQD) AddClient(clientID int64, client Client) {
n.clientLock.Lock()
n.clients[clientID] = client
n.clientLock.Unlock()
}
func (n *NSQD) RemoveClient(clientID int64) {
n.clientLock.Lock()
_, ok := n.clients[clientID]
if !ok {
n.clientLock.Unlock()
return
}
delete(n.clients, clientID)
n.clientLock.Unlock()
}
func (n *NSQD) Main() error { func (n *NSQD) Main() error {
ctx := &context{n}
exitCh := make(chan error) exitCh := make(chan error)
var once sync.Once var once sync.Once
exitFunc := func(err error) { exitFunc := func(err error) {
once.Do(func() { once.Do(func() {
if err != nil { if err != nil {
n.logf(LOG_FATAL, "%s", err) n.logf(LOG_FATAL, "%s", err)
} }
exitCh <- err exitCh <- err
}) })
} }
tcpServer := &tcpServer{ctx: ctx}
n.waitGroup.Wrap(func() { n.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf)) exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
}) })
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSReq
uired) httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequi
red)
n.waitGroup.Wrap(func() { n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.log f)) exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.log f))
}) })
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsServer := newHTTPServer(ctx, true, true) httpsServer := newHTTPServer(n, true, true)
n.waitGroup.Wrap(func() { n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HT TPS", n.logf)) exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HT TPS", n.logf))
}) })
} }
n.waitGroup.Wrap(n.queueScanLoop) n.waitGroup.Wrap(n.queueScanLoop)
n.waitGroup.Wrap(n.lookupLoop) n.waitGroup.Wrap(n.lookupLoop)
if n.getOpts().StatsdAddress != "" { if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop) n.waitGroup.Wrap(n.statsdLoop)
} }
skipping to change at line 384 skipping to change at line 368
for _, topic := range n.topicMap { for _, topic := range n.topicMap {
if topic.ephemeral { if topic.ephemeral {
continue continue
} }
topicData := make(map[string]interface{}) topicData := make(map[string]interface{})
topicData["name"] = topic.name topicData["name"] = topic.name
topicData["paused"] = topic.IsPaused() topicData["paused"] = topic.IsPaused()
channels := []interface{}{} channels := []interface{}{}
topic.Lock() topic.Lock()
for _, channel := range topic.channelMap { for _, channel := range topic.channelMap {
channel.Lock()
if channel.ephemeral { if channel.ephemeral {
channel.Unlock()
continue continue
} }
channel.Lock()
channelData := make(map[string]interface{}) channelData := make(map[string]interface{})
channelData["name"] = channel.name channelData["name"] = channel.name
channelData["paused"] = channel.IsPaused() channelData["paused"] = channel.IsPaused()
channels = append(channels, channelData)
channel.Unlock() channel.Unlock()
channels = append(channels, channelData)
} }
topic.Unlock() topic.Unlock()
topicData["channels"] = channels topicData["channels"] = channels
topics = append(topics, topicData) topics = append(topics, topicData)
} }
js["version"] = version.Binary js["version"] = version.Binary
js["topics"] = topics js["topics"] = topics
data, err := json.Marshal(&js) data, err := json.Marshal(&js)
if err != nil { if err != nil {
skipping to change at line 423 skipping to change at line 406
err = os.Rename(tmpFileName, fileName) err = os.Rename(tmpFileName, fileName)
if err != nil { if err != nil {
return err return err
} }
// technically should fsync DataPath here // technically should fsync DataPath here
return nil return nil
} }
func (n *NSQD) Exit() { func (n *NSQD) Exit() {
if !atomic.CompareAndSwapInt32(&n.isExiting, 0, 1) {
// avoid double call
return
}
if n.tcpListener != nil { if n.tcpListener != nil {
n.tcpListener.Close() n.tcpListener.Close()
} }
if n.tcpServer != nil {
n.tcpServer.Close()
}
if n.httpListener != nil { if n.httpListener != nil {
n.httpListener.Close() n.httpListener.Close()
} }
if n.httpsListener != nil { if n.httpsListener != nil {
n.httpsListener.Close() n.httpsListener.Close()
} }
n.Lock() n.Lock()
err := n.PersistMetadata() err := n.PersistMetadata()
skipping to change at line 451 skipping to change at line 442
for _, topic := range n.topicMap { for _, topic := range n.topicMap {
topic.Close() topic.Close()
} }
n.Unlock() n.Unlock()
n.logf(LOG_INFO, "NSQ: stopping subsystems") n.logf(LOG_INFO, "NSQ: stopping subsystems")
close(n.exitChan) close(n.exitChan)
n.waitGroup.Wait() n.waitGroup.Wait()
n.dl.Unlock() n.dl.Unlock()
n.logf(LOG_INFO, "NSQ: bye") n.logf(LOG_INFO, "NSQ: bye")
n.ctxCancel()
} }
// GetTopic performs a thread safe operation // GetTopic performs a thread safe operation
// to return a pointer to a Topic object (potentially new) // to return a pointer to a Topic object (potentially new)
func (n *NSQD) GetTopic(topicName string) *Topic { func (n *NSQD) GetTopic(topicName string) *Topic {
// most likely, we already have this topic, so try read lock first. // most likely we already have this topic, so try read lock first
n.RLock() n.RLock()
t, ok := n.topicMap[topicName] t, ok := n.topicMap[topicName]
n.RUnlock() n.RUnlock()
if ok { if ok {
return t return t
} }
n.Lock() n.Lock()
t, ok = n.topicMap[topicName] t, ok = n.topicMap[topicName]
if ok { if ok {
n.Unlock() n.Unlock()
return t return t
} }
deleteCallback := func(t *Topic) { deleteCallback := func(t *Topic) {
n.DeleteExistingTopic(t.name) n.DeleteExistingTopic(t.name)
} }
t = NewTopic(topicName, &context{n}, deleteCallback) t = NewTopic(topicName, n, deleteCallback)
n.topicMap[topicName] = t n.topicMap[topicName] = t
n.Unlock() n.Unlock()
n.logf(LOG_INFO, "TOPIC(%s): created", t.name) n.logf(LOG_INFO, "TOPIC(%s): created", t.name)
// topic is created but messagePump not yet started // topic is created but messagePump not yet started
// if loading metadata at startup, no lookupd connections yet, topic star // if this topic was created while loading metadata at startup don't do a
ted after load ny further initialization
// (topic will be "started" after loading completes)
if atomic.LoadInt32(&n.isLoading) == 1 { if atomic.LoadInt32(&n.isLoading) == 1 {
return t return t
} }
// if using lookupd, make a blocking call to get the topics, and immediat // if using lookupd, make a blocking call to get channels and immediately
ely create them. create them
// this makes sure that any message received is buffered to the right cha // to ensure that all channels receive published messages
nnels
lookupdHTTPAddrs := n.lookupdHTTPAddrs() lookupdHTTPAddrs := n.lookupdHTTPAddrs()
if len(lookupdHTTPAddrs) > 0 { if len(lookupdHTTPAddrs) > 0 {
channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupd HTTPAddrs) channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupd HTTPAddrs)
if err != nil { if err != nil {
n.logf(LOG_WARN, "failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err) n.logf(LOG_WARN, "failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err)
} }
for _, channelName := range channelNames { for _, channelName := range channelNames {
if strings.HasSuffix(channelName, "#ephemeral") { if strings.HasSuffix(channelName, "#ephemeral") {
continue // do not create ephemeral channel with no consumer client continue // do not create ephemeral channel with no consumer client
} }
skipping to change at line 546 skipping to change at line 539
// to enforce ordering // to enforce ordering
topic.Delete() topic.Delete()
n.Lock() n.Lock()
delete(n.topicMap, topicName) delete(n.topicMap, topicName)
n.Unlock() n.Unlock()
return nil return nil
} }
func (n *NSQD) Notify(v interface{}) { func (n *NSQD) Notify(v interface{}, persist bool) {
// since the in-memory metadata is incomplete, // since the in-memory metadata is incomplete,
// should not persist metadata while loading it. // should not persist metadata while loading it.
// nsqd will call `PersistMetadata` it after loading // nsqd will call `PersistMetadata` it after loading
persist := atomic.LoadInt32(&n.isLoading) == 0 loading := atomic.LoadInt32(&n.isLoading) == 1
n.waitGroup.Wrap(func() { n.waitGroup.Wrap(func() {
// by selecting on exitChan we guarantee that // by selecting on exitChan we guarantee that
// we do not block exit, see issue #123 // we do not block exit, see issue #123
select { select {
case <-n.exitChan: case <-n.exitChan:
case n.notifyChan <- v: case n.notifyChan <- v:
if !persist { if loading || !persist {
return return
} }
n.Lock() n.Lock()
err := n.PersistMetadata() err := n.PersistMetadata()
if err != nil { if err != nil {
n.logf(LOG_ERROR, "failed to persist metadata - % s", err) n.logf(LOG_ERROR, "failed to persist metadata - % s", err)
} }
n.Unlock() n.Unlock()
} }
}) })
skipping to change at line 750 skipping to change at line 743
} }
tlsConfig.BuildNameToCertificate() tlsConfig.BuildNameToCertificate()
return tlsConfig, nil return tlsConfig, nil
} }
func (n *NSQD) IsAuthEnabled() bool { func (n *NSQD) IsAuthEnabled() bool {
return len(n.getOpts().AuthHTTPAddresses) != 0 return len(n.getOpts().AuthHTTPAddresses) != 0
} }
// Context returns a context that will be canceled when nsqd initiates the shutd
own
func (n *NSQD) Context() context.Context {
return n.ctx
}
 End of changes. 34 change blocks. 
65 lines changed or deleted 56 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)