"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "nsqd/topic.go" between
nsq-1.2.0.tar.gz and nsq-1.2.1.tar.gz

About: nsq is a realtime distributed and and decentralized messaging platform.

topic.go  (nsq-1.2.0):topic.go  (nsq-1.2.1)
package nsqd package nsqd
import ( import (
"bytes"
"errors" "errors"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/nsqio/go-diskqueue" "github.com/nsqio/go-diskqueue"
"github.com/nsqio/nsq/internal/lg" "github.com/nsqio/nsq/internal/lg"
"github.com/nsqio/nsq/internal/quantile" "github.com/nsqio/nsq/internal/quantile"
"github.com/nsqio/nsq/internal/util" "github.com/nsqio/nsq/internal/util"
skipping to change at line 42 skipping to change at line 41
exitFlag int32 exitFlag int32
idFactory *guidFactory idFactory *guidFactory
ephemeral bool ephemeral bool
deleteCallback func(*Topic) deleteCallback func(*Topic)
deleter sync.Once deleter sync.Once
paused int32 paused int32
pauseChan chan int pauseChan chan int
ctx *context nsqd *NSQD
} }
// Topic constructor // Topic constructor
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi c { func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic {
t := &Topic{ t := &Topic{
name: topicName, name: topicName,
channelMap: make(map[string]*Channel), channelMap: make(map[string]*Channel),
memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueu eSize), memoryMsgChan: nil,
startChan: make(chan int, 1), startChan: make(chan int, 1),
exitChan: make(chan int), exitChan: make(chan int),
channelUpdateChan: make(chan int), channelUpdateChan: make(chan int),
ctx: ctx, nsqd: nsqd,
paused: 0, paused: 0,
pauseChan: make(chan int), pauseChan: make(chan int),
deleteCallback: deleteCallback, deleteCallback: deleteCallback,
idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID), idFactory: NewGUIDFactory(nsqd.getOpts().ID),
}
// create mem-queue only if size > 0 (do not use unbuffered chan)
if nsqd.getOpts().MemQueueSize > 0 {
t.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize
)
} }
if strings.HasSuffix(topicName, "#ephemeral") { if strings.HasSuffix(topicName, "#ephemeral") {
t.ephemeral = true t.ephemeral = true
t.backend = newDummyBackendQueue() t.backend = newDummyBackendQueue()
} else { } else {
dqLogf := func(level diskqueue.LogLevel, f string, args ...interf ace{}) { dqLogf := func(level diskqueue.LogLevel, f string, args ...interf ace{}) {
opts := ctx.nsqd.getOpts() opts := nsqd.getOpts()
lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f , args...) lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f , args...)
} }
t.backend = diskqueue.New( t.backend = diskqueue.New(
topicName, topicName,
ctx.nsqd.getOpts().DataPath, nsqd.getOpts().DataPath,
ctx.nsqd.getOpts().MaxBytesPerFile, nsqd.getOpts().MaxBytesPerFile,
int32(minValidMsgLength), int32(minValidMsgLength),
int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength, int32(nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
ctx.nsqd.getOpts().SyncEvery, nsqd.getOpts().SyncEvery,
ctx.nsqd.getOpts().SyncTimeout, nsqd.getOpts().SyncTimeout,
dqLogf, dqLogf,
) )
} }
t.waitGroup.Wrap(t.messagePump) t.waitGroup.Wrap(t.messagePump)
t.ctx.nsqd.Notify(t) t.nsqd.Notify(t, !t.ephemeral)
return t return t
} }
func (t *Topic) Start() { func (t *Topic) Start() {
select { select {
case t.startChan <- 1: case t.startChan <- 1:
default: default:
} }
} }
skipping to change at line 126 skipping to change at line 128
return channel return channel
} }
// this expects the caller to handle locking // this expects the caller to handle locking
func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) { func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) {
channel, ok := t.channelMap[channelName] channel, ok := t.channelMap[channelName]
if !ok { if !ok {
deleteCallback := func(c *Channel) { deleteCallback := func(c *Channel) {
t.DeleteExistingChannel(c.name) t.DeleteExistingChannel(c.name)
} }
channel = NewChannel(t.name, channelName, t.ctx, deleteCallback) channel = NewChannel(t.name, channelName, t.nsqd, deleteCallback)
t.channelMap[channelName] = channel t.channelMap[channelName] = channel
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): new channel(%s)", t.name, c hannel.name) t.nsqd.logf(LOG_INFO, "TOPIC(%s): new channel(%s)", t.name, chann el.name)
return channel, true return channel, true
} }
return channel, false return channel, false
} }
func (t *Topic) GetExistingChannel(channelName string) (*Channel, error) { func (t *Topic) GetExistingChannel(channelName string) (*Channel, error) {
t.RLock() t.RLock()
defer t.RUnlock() defer t.RUnlock()
channel, ok := t.channelMap[channelName] channel, ok := t.channelMap[channelName]
if !ok { if !ok {
return nil, errors.New("channel does not exist") return nil, errors.New("channel does not exist")
} }
return channel, nil return channel, nil
} }
// DeleteExistingChannel removes a channel from the topic only if it exists // DeleteExistingChannel removes a channel from the topic only if it exists
func (t *Topic) DeleteExistingChannel(channelName string) error { func (t *Topic) DeleteExistingChannel(channelName string) error {
t.Lock() t.RLock()
channel, ok := t.channelMap[channelName] channel, ok := t.channelMap[channelName]
t.RUnlock()
if !ok { if !ok {
t.Unlock()
return errors.New("channel does not exist") return errors.New("channel does not exist")
} }
delete(t.channelMap, channelName)
// not defered so that we can continue while the channel async closes
numChannels := len(t.channelMap)
t.Unlock()
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting channel %s", t.name, chann el.name) t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting channel %s", t.name, channel.n ame)
// delete empties the channel before closing // delete empties the channel before closing
// (so that we dont leave any messages around) // (so that we dont leave any messages around)
//
// we do this before removing the channel from map below (with no lock)
// so that any incoming subs will error and not create a new channel
// to enforce ordering
channel.Delete() channel.Delete()
t.Lock()
delete(t.channelMap, channelName)
numChannels := len(t.channelMap)
t.Unlock()
// update messagePump state // update messagePump state
select { select {
case t.channelUpdateChan <- 1: case t.channelUpdateChan <- 1:
case <-t.exitChan: case <-t.exitChan:
} }
if numChannels == 0 && t.ephemeral == true { if numChannels == 0 && t.ephemeral == true {
go t.deleter.Do(func() { t.deleteCallback(t) }) go t.deleter.Do(func() { t.deleteCallback(t) })
} }
skipping to change at line 221 skipping to change at line 228
atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes)) atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))
atomic.AddUint64(&t.messageCount, uint64(len(msgs))) atomic.AddUint64(&t.messageCount, uint64(len(msgs)))
return nil return nil
} }
func (t *Topic) put(m *Message) error { func (t *Topic) put(m *Message) error {
select { select {
case t.memoryMsgChan <- m: case t.memoryMsgChan <- m:
default: default:
b := bufferPoolGet() err := writeMessageToBackend(m, t.backend)
err := writeMessageToBackend(b, m, t.backend) t.nsqd.SetHealth(err)
bufferPoolPut(b)
t.ctx.nsqd.SetHealth(err)
if err != nil { if err != nil {
t.ctx.nsqd.logf(LOG_ERROR, t.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to write message to back end - %s", "TOPIC(%s) ERROR: failed to write message to back end - %s",
t.name, err) t.name, err)
return err return err
} }
} }
return nil return nil
} }
func (t *Topic) Depth() int64 { func (t *Topic) Depth() int64 {
return int64(len(t.memoryMsgChan)) + t.backend.Depth() return int64(len(t.memoryMsgChan)) + t.backend.Depth()
} }
// messagePump selects over the in-memory and backend queue and // messagePump selects over the in-memory and backend queue and
// writes messages to every channel for this topic // writes messages to every channel for this topic
func (t *Topic) messagePump() { func (t *Topic) messagePump() {
var msg *Message var msg *Message
var buf []byte var buf []byte
var err error var err error
var chans []*Channel var chans []*Channel
var memoryMsgChan chan *Message var memoryMsgChan chan *Message
var backendChan chan []byte var backendChan <-chan []byte
// do not pass messages before Start(), but avoid blocking Pause() or Get Channel() // do not pass messages before Start(), but avoid blocking Pause() or Get Channel()
for { for {
select { select {
case <-t.channelUpdateChan: case <-t.channelUpdateChan:
continue continue
case <-t.pauseChan: case <-t.pauseChan:
continue continue
case <-t.exitChan: case <-t.exitChan:
goto exit goto exit
skipping to change at line 279 skipping to change at line 284
backendChan = t.backend.ReadChan() backendChan = t.backend.ReadChan()
} }
// main message loop // main message loop
for { for {
select { select {
case msg = <-memoryMsgChan: case msg = <-memoryMsgChan:
case buf = <-backendChan: case buf = <-backendChan:
msg, err = decodeMessage(buf) msg, err = decodeMessage(buf)
if err != nil { if err != nil {
t.ctx.nsqd.logf(LOG_ERROR, "failed to decode mess age - %s", err) t.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue continue
} }
case <-t.channelUpdateChan: case <-t.channelUpdateChan:
chans = chans[:0] chans = chans[:0]
t.RLock() t.RLock()
for _, c := range t.channelMap { for _, c := range t.channelMap {
chans = append(chans, c) chans = append(chans, c)
} }
t.RUnlock() t.RUnlock()
if len(chans) == 0 || t.IsPaused() { if len(chans) == 0 || t.IsPaused() {
skipping to change at line 327 skipping to change at line 332
chanMsg = NewMessage(msg.ID, msg.Body) chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred chanMsg.deferred = msg.deferred
} }
if chanMsg.deferred != 0 { if chanMsg.deferred != 0 {
channel.PutMessageDeferred(chanMsg, chanMsg.defer red) channel.PutMessageDeferred(chanMsg, chanMsg.defer red)
continue continue
} }
err := channel.PutMessage(chanMsg) err := channel.PutMessage(chanMsg)
if err != nil { if err != nil {
t.ctx.nsqd.logf(LOG_ERROR, t.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to put msg(%s) t o channel(%s) - %s", "TOPIC(%s) ERROR: failed to put msg(%s) t o channel(%s) - %s",
t.name, msg.ID, channel.name, err) t.name, msg.ID, channel.name, err)
} }
} }
} }
exit: exit:
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name) t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
} }
// Delete empties the topic and all its channels and closes // Delete empties the topic and all its channels and closes
func (t *Topic) Delete() error { func (t *Topic) Delete() error {
return t.exit(true) return t.exit(true)
} }
// Close persists all outstanding topic data and closes all its channels // Close persists all outstanding topic data and closes all its channels
func (t *Topic) Close() error { func (t *Topic) Close() error {
return t.exit(false) return t.exit(false)
} }
func (t *Topic) exit(deleted bool) error { func (t *Topic) exit(deleted bool) error {
if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) { if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) {
return errors.New("exiting") return errors.New("exiting")
} }
if deleted { if deleted {
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting", t.name) t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting", t.name)
// since we are explicitly deleting a topic (not just at system e xit time) // since we are explicitly deleting a topic (not just at system e xit time)
// de-register this from the lookupd // de-register this from the lookupd
t.ctx.nsqd.Notify(t) t.nsqd.Notify(t, !t.ephemeral)
} else { } else {
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name) t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name)
} }
close(t.exitChan) close(t.exitChan)
// synchronize the close of messagePump() // synchronize the close of messagePump()
t.waitGroup.Wait() t.waitGroup.Wait()
if deleted { if deleted {
t.Lock() t.Lock()
for _, channel := range t.channelMap { for _, channel := range t.channelMap {
skipping to change at line 382 skipping to change at line 387
channel.Delete() channel.Delete()
} }
t.Unlock() t.Unlock()
// empty the queue (deletes the backend files, too) // empty the queue (deletes the backend files, too)
t.Empty() t.Empty()
return t.backend.Delete() return t.backend.Delete()
} }
// close all the channels // close all the channels
t.RLock()
for _, channel := range t.channelMap { for _, channel := range t.channelMap {
err := channel.Close() err := channel.Close()
if err != nil { if err != nil {
// we need to continue regardless of error to close all t he channels // we need to continue regardless of error to close all t he channels
t.ctx.nsqd.logf(LOG_ERROR, "channel(%s) close - %s", chan nel.name, err) t.nsqd.logf(LOG_ERROR, "channel(%s) close - %s", channel. name, err)
} }
} }
t.RUnlock()
// write anything leftover to disk // write anything leftover to disk
t.flush() t.flush()
return t.backend.Close() return t.backend.Close()
} }
func (t *Topic) Empty() error { func (t *Topic) Empty() error {
for { for {
select { select {
case <-t.memoryMsgChan: case <-t.memoryMsgChan:
default: default:
goto finish goto finish
} }
} }
finish: finish:
return t.backend.Empty() return t.backend.Empty()
} }
func (t *Topic) flush() error { func (t *Topic) flush() error {
var msgBuf bytes.Buffer
if len(t.memoryMsgChan) > 0 { if len(t.memoryMsgChan) > 0 {
t.ctx.nsqd.logf(LOG_INFO, t.nsqd.logf(LOG_INFO,
"TOPIC(%s): flushing %d memory messages to backend", "TOPIC(%s): flushing %d memory messages to backend",
t.name, len(t.memoryMsgChan)) t.name, len(t.memoryMsgChan))
} }
for { for {
select { select {
case msg := <-t.memoryMsgChan: case msg := <-t.memoryMsgChan:
err := writeMessageToBackend(&msgBuf, msg, t.backend) err := writeMessageToBackend(msg, t.backend)
if err != nil { if err != nil {
t.ctx.nsqd.logf(LOG_ERROR, t.nsqd.logf(LOG_ERROR,
"ERROR: failed to write message to backen d - %s", err) "ERROR: failed to write message to backen d - %s", err)
} }
default: default:
goto finish goto finish
} }
} }
finish: finish:
return nil return nil
} }
skipping to change at line 448 skipping to change at line 453
for _, c := range t.channelMap { for _, c := range t.channelMap {
realChannels = append(realChannels, c) realChannels = append(realChannels, c)
} }
t.RUnlock() t.RUnlock()
for _, c := range realChannels { for _, c := range realChannels {
if c.e2eProcessingLatencyStream == nil { if c.e2eProcessingLatencyStream == nil {
continue continue
} }
if latencyStream == nil { if latencyStream == nil {
latencyStream = quantile.New( latencyStream = quantile.New(
t.ctx.nsqd.getOpts().E2EProcessingLatencyWindowTi t.nsqd.getOpts().E2EProcessingLatencyWindowTime,
me, t.nsqd.getOpts().E2EProcessingLatencyPercentiles)
t.ctx.nsqd.getOpts().E2EProcessingLatencyPercenti
les)
} }
latencyStream.Merge(c.e2eProcessingLatencyStream) latencyStream.Merge(c.e2eProcessingLatencyStream)
} }
return latencyStream return latencyStream
} }
func (t *Topic) Pause() error { func (t *Topic) Pause() error {
return t.doPause(true) return t.doPause(true)
} }
skipping to change at line 484 skipping to change at line 489
} }
return nil return nil
} }
func (t *Topic) IsPaused() bool { func (t *Topic) IsPaused() bool {
return atomic.LoadInt32(&t.paused) == 1 return atomic.LoadInt32(&t.paused) == 1
} }
func (t *Topic) GenerateID() MessageID { func (t *Topic) GenerateID() MessageID {
retry: var i int64 = 0
id, err := t.idFactory.NewGUID() for {
if err != nil { id, err := t.idFactory.NewGUID()
if err == nil {
return id.Hex()
}
if i%10000 == 0 {
t.nsqd.logf(LOG_ERROR, "TOPIC(%s): failed to create guid
- %s", t.name, err)
}
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
goto retry i++
} }
return id.Hex()
} }
 End of changes. 40 change blocks. 
50 lines changed or deleted 60 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)