topic.go (nsq-1.2.0) | : | topic.go (nsq-1.2.1) | ||
---|---|---|---|---|
package nsqd | package nsqd | |||
import ( | import ( | |||
"bytes" | ||||
"errors" | "errors" | |||
"strings" | "strings" | |||
"sync" | "sync" | |||
"sync/atomic" | "sync/atomic" | |||
"time" | "time" | |||
"github.com/nsqio/go-diskqueue" | "github.com/nsqio/go-diskqueue" | |||
"github.com/nsqio/nsq/internal/lg" | "github.com/nsqio/nsq/internal/lg" | |||
"github.com/nsqio/nsq/internal/quantile" | "github.com/nsqio/nsq/internal/quantile" | |||
"github.com/nsqio/nsq/internal/util" | "github.com/nsqio/nsq/internal/util" | |||
skipping to change at line 42 | skipping to change at line 41 | |||
exitFlag int32 | exitFlag int32 | |||
idFactory *guidFactory | idFactory *guidFactory | |||
ephemeral bool | ephemeral bool | |||
deleteCallback func(*Topic) | deleteCallback func(*Topic) | |||
deleter sync.Once | deleter sync.Once | |||
paused int32 | paused int32 | |||
pauseChan chan int | pauseChan chan int | |||
ctx *context | nsqd *NSQD | |||
} | } | |||
// Topic constructor | // Topic constructor | |||
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi c { | func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic { | |||
t := &Topic{ | t := &Topic{ | |||
name: topicName, | name: topicName, | |||
channelMap: make(map[string]*Channel), | channelMap: make(map[string]*Channel), | |||
memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueu eSize), | memoryMsgChan: nil, | |||
startChan: make(chan int, 1), | startChan: make(chan int, 1), | |||
exitChan: make(chan int), | exitChan: make(chan int), | |||
channelUpdateChan: make(chan int), | channelUpdateChan: make(chan int), | |||
ctx: ctx, | nsqd: nsqd, | |||
paused: 0, | paused: 0, | |||
pauseChan: make(chan int), | pauseChan: make(chan int), | |||
deleteCallback: deleteCallback, | deleteCallback: deleteCallback, | |||
idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID), | idFactory: NewGUIDFactory(nsqd.getOpts().ID), | |||
} | ||||
// create mem-queue only if size > 0 (do not use unbuffered chan) | ||||
if nsqd.getOpts().MemQueueSize > 0 { | ||||
t.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize | ||||
) | ||||
} | } | |||
if strings.HasSuffix(topicName, "#ephemeral") { | if strings.HasSuffix(topicName, "#ephemeral") { | |||
t.ephemeral = true | t.ephemeral = true | |||
t.backend = newDummyBackendQueue() | t.backend = newDummyBackendQueue() | |||
} else { | } else { | |||
dqLogf := func(level diskqueue.LogLevel, f string, args ...interf ace{}) { | dqLogf := func(level diskqueue.LogLevel, f string, args ...interf ace{}) { | |||
opts := ctx.nsqd.getOpts() | opts := nsqd.getOpts() | |||
lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f , args...) | lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f , args...) | |||
} | } | |||
t.backend = diskqueue.New( | t.backend = diskqueue.New( | |||
topicName, | topicName, | |||
ctx.nsqd.getOpts().DataPath, | nsqd.getOpts().DataPath, | |||
ctx.nsqd.getOpts().MaxBytesPerFile, | nsqd.getOpts().MaxBytesPerFile, | |||
int32(minValidMsgLength), | int32(minValidMsgLength), | |||
int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength, | int32(nsqd.getOpts().MaxMsgSize)+minValidMsgLength, | |||
ctx.nsqd.getOpts().SyncEvery, | nsqd.getOpts().SyncEvery, | |||
ctx.nsqd.getOpts().SyncTimeout, | nsqd.getOpts().SyncTimeout, | |||
dqLogf, | dqLogf, | |||
) | ) | |||
} | } | |||
t.waitGroup.Wrap(t.messagePump) | t.waitGroup.Wrap(t.messagePump) | |||
t.ctx.nsqd.Notify(t) | t.nsqd.Notify(t, !t.ephemeral) | |||
return t | return t | |||
} | } | |||
func (t *Topic) Start() { | func (t *Topic) Start() { | |||
select { | select { | |||
case t.startChan <- 1: | case t.startChan <- 1: | |||
default: | default: | |||
} | } | |||
} | } | |||
skipping to change at line 126 | skipping to change at line 128 | |||
return channel | return channel | |||
} | } | |||
// this expects the caller to handle locking | // this expects the caller to handle locking | |||
func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) { | func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) { | |||
channel, ok := t.channelMap[channelName] | channel, ok := t.channelMap[channelName] | |||
if !ok { | if !ok { | |||
deleteCallback := func(c *Channel) { | deleteCallback := func(c *Channel) { | |||
t.DeleteExistingChannel(c.name) | t.DeleteExistingChannel(c.name) | |||
} | } | |||
channel = NewChannel(t.name, channelName, t.ctx, deleteCallback) | channel = NewChannel(t.name, channelName, t.nsqd, deleteCallback) | |||
t.channelMap[channelName] = channel | t.channelMap[channelName] = channel | |||
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): new channel(%s)", t.name, c hannel.name) | t.nsqd.logf(LOG_INFO, "TOPIC(%s): new channel(%s)", t.name, chann el.name) | |||
return channel, true | return channel, true | |||
} | } | |||
return channel, false | return channel, false | |||
} | } | |||
func (t *Topic) GetExistingChannel(channelName string) (*Channel, error) { | func (t *Topic) GetExistingChannel(channelName string) (*Channel, error) { | |||
t.RLock() | t.RLock() | |||
defer t.RUnlock() | defer t.RUnlock() | |||
channel, ok := t.channelMap[channelName] | channel, ok := t.channelMap[channelName] | |||
if !ok { | if !ok { | |||
return nil, errors.New("channel does not exist") | return nil, errors.New("channel does not exist") | |||
} | } | |||
return channel, nil | return channel, nil | |||
} | } | |||
// DeleteExistingChannel removes a channel from the topic only if it exists | // DeleteExistingChannel removes a channel from the topic only if it exists | |||
func (t *Topic) DeleteExistingChannel(channelName string) error { | func (t *Topic) DeleteExistingChannel(channelName string) error { | |||
t.Lock() | t.RLock() | |||
channel, ok := t.channelMap[channelName] | channel, ok := t.channelMap[channelName] | |||
t.RUnlock() | ||||
if !ok { | if !ok { | |||
t.Unlock() | ||||
return errors.New("channel does not exist") | return errors.New("channel does not exist") | |||
} | } | |||
delete(t.channelMap, channelName) | ||||
// not defered so that we can continue while the channel async closes | ||||
numChannels := len(t.channelMap) | ||||
t.Unlock() | ||||
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting channel %s", t.name, chann el.name) | t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting channel %s", t.name, channel.n ame) | |||
// delete empties the channel before closing | // delete empties the channel before closing | |||
// (so that we dont leave any messages around) | // (so that we dont leave any messages around) | |||
// | ||||
// we do this before removing the channel from map below (with no lock) | ||||
// so that any incoming subs will error and not create a new channel | ||||
// to enforce ordering | ||||
channel.Delete() | channel.Delete() | |||
t.Lock() | ||||
delete(t.channelMap, channelName) | ||||
numChannels := len(t.channelMap) | ||||
t.Unlock() | ||||
// update messagePump state | // update messagePump state | |||
select { | select { | |||
case t.channelUpdateChan <- 1: | case t.channelUpdateChan <- 1: | |||
case <-t.exitChan: | case <-t.exitChan: | |||
} | } | |||
if numChannels == 0 && t.ephemeral == true { | if numChannels == 0 && t.ephemeral == true { | |||
go t.deleter.Do(func() { t.deleteCallback(t) }) | go t.deleter.Do(func() { t.deleteCallback(t) }) | |||
} | } | |||
skipping to change at line 221 | skipping to change at line 228 | |||
atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes)) | atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes)) | |||
atomic.AddUint64(&t.messageCount, uint64(len(msgs))) | atomic.AddUint64(&t.messageCount, uint64(len(msgs))) | |||
return nil | return nil | |||
} | } | |||
func (t *Topic) put(m *Message) error { | func (t *Topic) put(m *Message) error { | |||
select { | select { | |||
case t.memoryMsgChan <- m: | case t.memoryMsgChan <- m: | |||
default: | default: | |||
b := bufferPoolGet() | err := writeMessageToBackend(m, t.backend) | |||
err := writeMessageToBackend(b, m, t.backend) | t.nsqd.SetHealth(err) | |||
bufferPoolPut(b) | ||||
t.ctx.nsqd.SetHealth(err) | ||||
if err != nil { | if err != nil { | |||
t.ctx.nsqd.logf(LOG_ERROR, | t.nsqd.logf(LOG_ERROR, | |||
"TOPIC(%s) ERROR: failed to write message to back end - %s", | "TOPIC(%s) ERROR: failed to write message to back end - %s", | |||
t.name, err) | t.name, err) | |||
return err | return err | |||
} | } | |||
} | } | |||
return nil | return nil | |||
} | } | |||
func (t *Topic) Depth() int64 { | func (t *Topic) Depth() int64 { | |||
return int64(len(t.memoryMsgChan)) + t.backend.Depth() | return int64(len(t.memoryMsgChan)) + t.backend.Depth() | |||
} | } | |||
// messagePump selects over the in-memory and backend queue and | // messagePump selects over the in-memory and backend queue and | |||
// writes messages to every channel for this topic | // writes messages to every channel for this topic | |||
func (t *Topic) messagePump() { | func (t *Topic) messagePump() { | |||
var msg *Message | var msg *Message | |||
var buf []byte | var buf []byte | |||
var err error | var err error | |||
var chans []*Channel | var chans []*Channel | |||
var memoryMsgChan chan *Message | var memoryMsgChan chan *Message | |||
var backendChan chan []byte | var backendChan <-chan []byte | |||
// do not pass messages before Start(), but avoid blocking Pause() or Get Channel() | // do not pass messages before Start(), but avoid blocking Pause() or Get Channel() | |||
for { | for { | |||
select { | select { | |||
case <-t.channelUpdateChan: | case <-t.channelUpdateChan: | |||
continue | continue | |||
case <-t.pauseChan: | case <-t.pauseChan: | |||
continue | continue | |||
case <-t.exitChan: | case <-t.exitChan: | |||
goto exit | goto exit | |||
skipping to change at line 279 | skipping to change at line 284 | |||
backendChan = t.backend.ReadChan() | backendChan = t.backend.ReadChan() | |||
} | } | |||
// main message loop | // main message loop | |||
for { | for { | |||
select { | select { | |||
case msg = <-memoryMsgChan: | case msg = <-memoryMsgChan: | |||
case buf = <-backendChan: | case buf = <-backendChan: | |||
msg, err = decodeMessage(buf) | msg, err = decodeMessage(buf) | |||
if err != nil { | if err != nil { | |||
t.ctx.nsqd.logf(LOG_ERROR, "failed to decode mess age - %s", err) | t.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) | |||
continue | continue | |||
} | } | |||
case <-t.channelUpdateChan: | case <-t.channelUpdateChan: | |||
chans = chans[:0] | chans = chans[:0] | |||
t.RLock() | t.RLock() | |||
for _, c := range t.channelMap { | for _, c := range t.channelMap { | |||
chans = append(chans, c) | chans = append(chans, c) | |||
} | } | |||
t.RUnlock() | t.RUnlock() | |||
if len(chans) == 0 || t.IsPaused() { | if len(chans) == 0 || t.IsPaused() { | |||
skipping to change at line 327 | skipping to change at line 332 | |||
chanMsg = NewMessage(msg.ID, msg.Body) | chanMsg = NewMessage(msg.ID, msg.Body) | |||
chanMsg.Timestamp = msg.Timestamp | chanMsg.Timestamp = msg.Timestamp | |||
chanMsg.deferred = msg.deferred | chanMsg.deferred = msg.deferred | |||
} | } | |||
if chanMsg.deferred != 0 { | if chanMsg.deferred != 0 { | |||
channel.PutMessageDeferred(chanMsg, chanMsg.defer red) | channel.PutMessageDeferred(chanMsg, chanMsg.defer red) | |||
continue | continue | |||
} | } | |||
err := channel.PutMessage(chanMsg) | err := channel.PutMessage(chanMsg) | |||
if err != nil { | if err != nil { | |||
t.ctx.nsqd.logf(LOG_ERROR, | t.nsqd.logf(LOG_ERROR, | |||
"TOPIC(%s) ERROR: failed to put msg(%s) t o channel(%s) - %s", | "TOPIC(%s) ERROR: failed to put msg(%s) t o channel(%s) - %s", | |||
t.name, msg.ID, channel.name, err) | t.name, msg.ID, channel.name, err) | |||
} | } | |||
} | } | |||
} | } | |||
exit: | exit: | |||
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name) | t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name) | |||
} | } | |||
// Delete empties the topic and all its channels and closes | // Delete empties the topic and all its channels and closes | |||
func (t *Topic) Delete() error { | func (t *Topic) Delete() error { | |||
return t.exit(true) | return t.exit(true) | |||
} | } | |||
// Close persists all outstanding topic data and closes all its channels | // Close persists all outstanding topic data and closes all its channels | |||
func (t *Topic) Close() error { | func (t *Topic) Close() error { | |||
return t.exit(false) | return t.exit(false) | |||
} | } | |||
func (t *Topic) exit(deleted bool) error { | func (t *Topic) exit(deleted bool) error { | |||
if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) { | if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) { | |||
return errors.New("exiting") | return errors.New("exiting") | |||
} | } | |||
if deleted { | if deleted { | |||
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting", t.name) | t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting", t.name) | |||
// since we are explicitly deleting a topic (not just at system e xit time) | // since we are explicitly deleting a topic (not just at system e xit time) | |||
// de-register this from the lookupd | // de-register this from the lookupd | |||
t.ctx.nsqd.Notify(t) | t.nsqd.Notify(t, !t.ephemeral) | |||
} else { | } else { | |||
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name) | t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name) | |||
} | } | |||
close(t.exitChan) | close(t.exitChan) | |||
// synchronize the close of messagePump() | // synchronize the close of messagePump() | |||
t.waitGroup.Wait() | t.waitGroup.Wait() | |||
if deleted { | if deleted { | |||
t.Lock() | t.Lock() | |||
for _, channel := range t.channelMap { | for _, channel := range t.channelMap { | |||
skipping to change at line 382 | skipping to change at line 387 | |||
channel.Delete() | channel.Delete() | |||
} | } | |||
t.Unlock() | t.Unlock() | |||
// empty the queue (deletes the backend files, too) | // empty the queue (deletes the backend files, too) | |||
t.Empty() | t.Empty() | |||
return t.backend.Delete() | return t.backend.Delete() | |||
} | } | |||
// close all the channels | // close all the channels | |||
t.RLock() | ||||
for _, channel := range t.channelMap { | for _, channel := range t.channelMap { | |||
err := channel.Close() | err := channel.Close() | |||
if err != nil { | if err != nil { | |||
// we need to continue regardless of error to close all t he channels | // we need to continue regardless of error to close all t he channels | |||
t.ctx.nsqd.logf(LOG_ERROR, "channel(%s) close - %s", chan nel.name, err) | t.nsqd.logf(LOG_ERROR, "channel(%s) close - %s", channel. name, err) | |||
} | } | |||
} | } | |||
t.RUnlock() | ||||
// write anything leftover to disk | // write anything leftover to disk | |||
t.flush() | t.flush() | |||
return t.backend.Close() | return t.backend.Close() | |||
} | } | |||
func (t *Topic) Empty() error { | func (t *Topic) Empty() error { | |||
for { | for { | |||
select { | select { | |||
case <-t.memoryMsgChan: | case <-t.memoryMsgChan: | |||
default: | default: | |||
goto finish | goto finish | |||
} | } | |||
} | } | |||
finish: | finish: | |||
return t.backend.Empty() | return t.backend.Empty() | |||
} | } | |||
func (t *Topic) flush() error { | func (t *Topic) flush() error { | |||
var msgBuf bytes.Buffer | ||||
if len(t.memoryMsgChan) > 0 { | if len(t.memoryMsgChan) > 0 { | |||
t.ctx.nsqd.logf(LOG_INFO, | t.nsqd.logf(LOG_INFO, | |||
"TOPIC(%s): flushing %d memory messages to backend", | "TOPIC(%s): flushing %d memory messages to backend", | |||
t.name, len(t.memoryMsgChan)) | t.name, len(t.memoryMsgChan)) | |||
} | } | |||
for { | for { | |||
select { | select { | |||
case msg := <-t.memoryMsgChan: | case msg := <-t.memoryMsgChan: | |||
err := writeMessageToBackend(&msgBuf, msg, t.backend) | err := writeMessageToBackend(msg, t.backend) | |||
if err != nil { | if err != nil { | |||
t.ctx.nsqd.logf(LOG_ERROR, | t.nsqd.logf(LOG_ERROR, | |||
"ERROR: failed to write message to backen d - %s", err) | "ERROR: failed to write message to backen d - %s", err) | |||
} | } | |||
default: | default: | |||
goto finish | goto finish | |||
} | } | |||
} | } | |||
finish: | finish: | |||
return nil | return nil | |||
} | } | |||
skipping to change at line 448 | skipping to change at line 453 | |||
for _, c := range t.channelMap { | for _, c := range t.channelMap { | |||
realChannels = append(realChannels, c) | realChannels = append(realChannels, c) | |||
} | } | |||
t.RUnlock() | t.RUnlock() | |||
for _, c := range realChannels { | for _, c := range realChannels { | |||
if c.e2eProcessingLatencyStream == nil { | if c.e2eProcessingLatencyStream == nil { | |||
continue | continue | |||
} | } | |||
if latencyStream == nil { | if latencyStream == nil { | |||
latencyStream = quantile.New( | latencyStream = quantile.New( | |||
t.ctx.nsqd.getOpts().E2EProcessingLatencyWindowTi | t.nsqd.getOpts().E2EProcessingLatencyWindowTime, | |||
me, | t.nsqd.getOpts().E2EProcessingLatencyPercentiles) | |||
t.ctx.nsqd.getOpts().E2EProcessingLatencyPercenti | ||||
les) | ||||
} | } | |||
latencyStream.Merge(c.e2eProcessingLatencyStream) | latencyStream.Merge(c.e2eProcessingLatencyStream) | |||
} | } | |||
return latencyStream | return latencyStream | |||
} | } | |||
func (t *Topic) Pause() error { | func (t *Topic) Pause() error { | |||
return t.doPause(true) | return t.doPause(true) | |||
} | } | |||
skipping to change at line 484 | skipping to change at line 489 | |||
} | } | |||
return nil | return nil | |||
} | } | |||
func (t *Topic) IsPaused() bool { | func (t *Topic) IsPaused() bool { | |||
return atomic.LoadInt32(&t.paused) == 1 | return atomic.LoadInt32(&t.paused) == 1 | |||
} | } | |||
func (t *Topic) GenerateID() MessageID { | func (t *Topic) GenerateID() MessageID { | |||
retry: | var i int64 = 0 | |||
id, err := t.idFactory.NewGUID() | for { | |||
if err != nil { | id, err := t.idFactory.NewGUID() | |||
if err == nil { | ||||
return id.Hex() | ||||
} | ||||
if i%10000 == 0 { | ||||
t.nsqd.logf(LOG_ERROR, "TOPIC(%s): failed to create guid | ||||
- %s", t.name, err) | ||||
} | ||||
time.Sleep(time.Millisecond) | time.Sleep(time.Millisecond) | |||
goto retry | i++ | |||
} | } | |||
return id.Hex() | ||||
} | } | |||
End of changes. 40 change blocks. | ||||
50 lines changed or deleted | 60 lines changed or added |