channel.go (nsq-1.2.0) | : | channel.go (nsq-1.2.1) | ||
---|---|---|---|---|
package nsqd | package nsqd | |||
import ( | import ( | |||
"bytes" | ||||
"container/heap" | "container/heap" | |||
"errors" | "errors" | |||
"fmt" | ||||
"math" | "math" | |||
"strings" | "strings" | |||
"sync" | "sync" | |||
"sync/atomic" | "sync/atomic" | |||
"time" | "time" | |||
"github.com/nsqio/go-diskqueue" | "github.com/nsqio/go-diskqueue" | |||
"github.com/nsqio/nsq/internal/lg" | "github.com/nsqio/nsq/internal/lg" | |||
"github.com/nsqio/nsq/internal/pqueue" | "github.com/nsqio/nsq/internal/pqueue" | |||
"github.com/nsqio/nsq/internal/quantile" | "github.com/nsqio/nsq/internal/quantile" | |||
) | ) | |||
type Consumer interface { | type Consumer interface { | |||
UnPause() | UnPause() | |||
Pause() | Pause() | |||
Close() error | Close() error | |||
TimedOutMessage() | TimedOutMessage() | |||
Stats() ClientStats | Stats(string) ClientStats | |||
Empty() | Empty() | |||
} | } | |||
// Channel represents the concrete type for a NSQ channel (and also | // Channel represents the concrete type for a NSQ channel (and also | |||
// implements the Queue interface) | // implements the Queue interface) | |||
// | // | |||
// There can be multiple channels per topic, each with there own unique set | // There can be multiple channels per topic, each with there own unique set | |||
// of subscribers (clients). | // of subscribers (clients). | |||
// | // | |||
// Channels maintain all client and message metadata, orchestrating in-flight | // Channels maintain all client and message metadata, orchestrating in-flight | |||
skipping to change at line 46 | skipping to change at line 47 | |||
type Channel struct { | type Channel struct { | |||
// 64bit atomic vars need to be first for proper alignment on 32bit platf orms | // 64bit atomic vars need to be first for proper alignment on 32bit platf orms | |||
requeueCount uint64 | requeueCount uint64 | |||
messageCount uint64 | messageCount uint64 | |||
timeoutCount uint64 | timeoutCount uint64 | |||
sync.RWMutex | sync.RWMutex | |||
topicName string | topicName string | |||
name string | name string | |||
ctx *context | nsqd *NSQD | |||
backend BackendQueue | backend BackendQueue | |||
memoryMsgChan chan *Message | memoryMsgChan chan *Message | |||
exitFlag int32 | exitFlag int32 | |||
exitMutex sync.RWMutex | exitMutex sync.RWMutex | |||
// state tracking | // state tracking | |||
clients map[int64]Consumer | clients map[int64]Consumer | |||
paused int32 | paused int32 | |||
skipping to change at line 74 | skipping to change at line 75 | |||
// TODO: these can be DRYd up | // TODO: these can be DRYd up | |||
deferredMessages map[MessageID]*pqueue.Item | deferredMessages map[MessageID]*pqueue.Item | |||
deferredPQ pqueue.PriorityQueue | deferredPQ pqueue.PriorityQueue | |||
deferredMutex sync.Mutex | deferredMutex sync.Mutex | |||
inFlightMessages map[MessageID]*Message | inFlightMessages map[MessageID]*Message | |||
inFlightPQ inFlightPqueue | inFlightPQ inFlightPqueue | |||
inFlightMutex sync.Mutex | inFlightMutex sync.Mutex | |||
} | } | |||
// NewChannel creates a new instance of the Channel type and returns a pointer | // NewChannel creates a new instance of the Channel type and returns a pointer | |||
func NewChannel(topicName string, channelName string, ctx *context, | func NewChannel(topicName string, channelName string, nsqd *NSQD, | |||
deleteCallback func(*Channel)) *Channel { | deleteCallback func(*Channel)) *Channel { | |||
c := &Channel{ | c := &Channel{ | |||
topicName: topicName, | topicName: topicName, | |||
name: channelName, | name: channelName, | |||
memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSi ze), | memoryMsgChan: nil, | |||
clients: make(map[int64]Consumer), | clients: make(map[int64]Consumer), | |||
deleteCallback: deleteCallback, | deleteCallback: deleteCallback, | |||
ctx: ctx, | nsqd: nsqd, | |||
} | ||||
// create mem-queue only if size > 0 (do not use unbuffered chan) | ||||
if nsqd.getOpts().MemQueueSize > 0 { | ||||
c.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize | ||||
) | ||||
} | } | |||
if len(ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 { | if len(nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 { | |||
c.e2eProcessingLatencyStream = quantile.New( | c.e2eProcessingLatencyStream = quantile.New( | |||
ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime, | nsqd.getOpts().E2EProcessingLatencyWindowTime, | |||
ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles, | nsqd.getOpts().E2EProcessingLatencyPercentiles, | |||
) | ) | |||
} | } | |||
c.initPQ() | c.initPQ() | |||
if strings.HasSuffix(channelName, "#ephemeral") { | if strings.HasSuffix(channelName, "#ephemeral") { | |||
c.ephemeral = true | c.ephemeral = true | |||
c.backend = newDummyBackendQueue() | c.backend = newDummyBackendQueue() | |||
} else { | } else { | |||
dqLogf := func(level diskqueue.LogLevel, f string, args ...interf ace{}) { | dqLogf := func(level diskqueue.LogLevel, f string, args ...interf ace{}) { | |||
opts := ctx.nsqd.getOpts() | opts := nsqd.getOpts() | |||
lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f , args...) | lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f , args...) | |||
} | } | |||
// backend names, for uniqueness, automatically include the topic ... | // backend names, for uniqueness, automatically include the topic ... | |||
backendName := getBackendName(topicName, channelName) | backendName := getBackendName(topicName, channelName) | |||
c.backend = diskqueue.New( | c.backend = diskqueue.New( | |||
backendName, | backendName, | |||
ctx.nsqd.getOpts().DataPath, | nsqd.getOpts().DataPath, | |||
ctx.nsqd.getOpts().MaxBytesPerFile, | nsqd.getOpts().MaxBytesPerFile, | |||
int32(minValidMsgLength), | int32(minValidMsgLength), | |||
int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength, | int32(nsqd.getOpts().MaxMsgSize)+minValidMsgLength, | |||
ctx.nsqd.getOpts().SyncEvery, | nsqd.getOpts().SyncEvery, | |||
ctx.nsqd.getOpts().SyncTimeout, | nsqd.getOpts().SyncTimeout, | |||
dqLogf, | dqLogf, | |||
) | ) | |||
} | } | |||
c.ctx.nsqd.Notify(c) | c.nsqd.Notify(c, !c.ephemeral) | |||
return c | return c | |||
} | } | |||
func (c *Channel) initPQ() { | func (c *Channel) initPQ() { | |||
pqSize := int(math.Max(1, float64(c.ctx.nsqd.getOpts().MemQueueSize)/10)) | pqSize := int(math.Max(1, float64(c.nsqd.getOpts().MemQueueSize)/10)) | |||
c.inFlightMutex.Lock() | c.inFlightMutex.Lock() | |||
c.inFlightMessages = make(map[MessageID]*Message) | c.inFlightMessages = make(map[MessageID]*Message) | |||
c.inFlightPQ = newInFlightPqueue(pqSize) | c.inFlightPQ = newInFlightPqueue(pqSize) | |||
c.inFlightMutex.Unlock() | c.inFlightMutex.Unlock() | |||
c.deferredMutex.Lock() | c.deferredMutex.Lock() | |||
c.deferredMessages = make(map[MessageID]*pqueue.Item) | c.deferredMessages = make(map[MessageID]*pqueue.Item) | |||
c.deferredPQ = pqueue.New(pqSize) | c.deferredPQ = pqueue.New(pqSize) | |||
c.deferredMutex.Unlock() | c.deferredMutex.Unlock() | |||
skipping to change at line 159 | skipping to change at line 164 | |||
func (c *Channel) exit(deleted bool) error { | func (c *Channel) exit(deleted bool) error { | |||
c.exitMutex.Lock() | c.exitMutex.Lock() | |||
defer c.exitMutex.Unlock() | defer c.exitMutex.Unlock() | |||
if !atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) { | if !atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) { | |||
return errors.New("exiting") | return errors.New("exiting") | |||
} | } | |||
if deleted { | if deleted { | |||
c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): deleting", c.name) | c.nsqd.logf(LOG_INFO, "CHANNEL(%s): deleting", c.name) | |||
// since we are explicitly deleting a channel (not just at system exit time) | // since we are explicitly deleting a channel (not just at system exit time) | |||
// de-register this from the lookupd | // de-register this from the lookupd | |||
c.ctx.nsqd.Notify(c) | c.nsqd.Notify(c, !c.ephemeral) | |||
} else { | } else { | |||
c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): closing", c.name) | c.nsqd.logf(LOG_INFO, "CHANNEL(%s): closing", c.name) | |||
} | } | |||
// this forceably closes client connections | // this forceably closes client connections | |||
c.RLock() | c.RLock() | |||
for _, client := range c.clients { | for _, client := range c.clients { | |||
client.Close() | client.Close() | |||
} | } | |||
c.RUnlock() | c.RUnlock() | |||
if deleted { | if deleted { | |||
skipping to change at line 210 | skipping to change at line 215 | |||
} | } | |||
} | } | |||
finish: | finish: | |||
return c.backend.Empty() | return c.backend.Empty() | |||
} | } | |||
// flush persists all the messages in internal memory buffers to the backend | // flush persists all the messages in internal memory buffers to the backend | |||
// it does not drain inflight/deferred because it is only called in Close() | // it does not drain inflight/deferred because it is only called in Close() | |||
func (c *Channel) flush() error { | func (c *Channel) flush() error { | |||
var msgBuf bytes.Buffer | ||||
if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.defer redMessages) > 0 { | if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.defer redMessages) > 0 { | |||
c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): flushing %d memory %d in- flight %d deferred messages to backend", | c.nsqd.logf(LOG_INFO, "CHANNEL(%s): flushing %d memory %d in-flig ht %d deferred messages to backend", | |||
c.name, len(c.memoryMsgChan), len(c.inFlightMessages), le n(c.deferredMessages)) | c.name, len(c.memoryMsgChan), len(c.inFlightMessages), le n(c.deferredMessages)) | |||
} | } | |||
for { | for { | |||
select { | select { | |||
case msg := <-c.memoryMsgChan: | case msg := <-c.memoryMsgChan: | |||
err := writeMessageToBackend(&msgBuf, msg, c.backend) | err := writeMessageToBackend(msg, c.backend) | |||
if err != nil { | if err != nil { | |||
c.ctx.nsqd.logf(LOG_ERROR, "failed to write messa ge to backend - %s", err) | c.nsqd.logf(LOG_ERROR, "failed to write message t o backend - %s", err) | |||
} | } | |||
default: | default: | |||
goto finish | goto finish | |||
} | } | |||
} | } | |||
finish: | finish: | |||
c.inFlightMutex.Lock() | c.inFlightMutex.Lock() | |||
for _, msg := range c.inFlightMessages { | for _, msg := range c.inFlightMessages { | |||
err := writeMessageToBackend(&msgBuf, msg, c.backend) | err := writeMessageToBackend(msg, c.backend) | |||
if err != nil { | if err != nil { | |||
c.ctx.nsqd.logf(LOG_ERROR, "failed to write message to ba ckend - %s", err) | c.nsqd.logf(LOG_ERROR, "failed to write message to backen d - %s", err) | |||
} | } | |||
} | } | |||
c.inFlightMutex.Unlock() | c.inFlightMutex.Unlock() | |||
c.deferredMutex.Lock() | c.deferredMutex.Lock() | |||
for _, item := range c.deferredMessages { | for _, item := range c.deferredMessages { | |||
msg := item.Value.(*Message) | msg := item.Value.(*Message) | |||
err := writeMessageToBackend(&msgBuf, msg, c.backend) | err := writeMessageToBackend(msg, c.backend) | |||
if err != nil { | if err != nil { | |||
c.ctx.nsqd.logf(LOG_ERROR, "failed to write message to ba ckend - %s", err) | c.nsqd.logf(LOG_ERROR, "failed to write message to backen d - %s", err) | |||
} | } | |||
} | } | |||
c.deferredMutex.Unlock() | c.deferredMutex.Unlock() | |||
return nil | return nil | |||
} | } | |||
func (c *Channel) Depth() int64 { | func (c *Channel) Depth() int64 { | |||
return int64(len(c.memoryMsgChan)) + c.backend.Depth() | return int64(len(c.memoryMsgChan)) + c.backend.Depth() | |||
} | } | |||
skipping to change at line 289 | skipping to change at line 292 | |||
c.RUnlock() | c.RUnlock() | |||
return nil | return nil | |||
} | } | |||
func (c *Channel) IsPaused() bool { | func (c *Channel) IsPaused() bool { | |||
return atomic.LoadInt32(&c.paused) == 1 | return atomic.LoadInt32(&c.paused) == 1 | |||
} | } | |||
// PutMessage writes a Message to the queue | // PutMessage writes a Message to the queue | |||
func (c *Channel) PutMessage(m *Message) error { | func (c *Channel) PutMessage(m *Message) error { | |||
c.RLock() | c.exitMutex.RLock() | |||
defer c.RUnlock() | defer c.exitMutex.RUnlock() | |||
if c.Exiting() { | if c.Exiting() { | |||
return errors.New("exiting") | return errors.New("exiting") | |||
} | } | |||
err := c.put(m) | err := c.put(m) | |||
if err != nil { | if err != nil { | |||
return err | return err | |||
} | } | |||
atomic.AddUint64(&c.messageCount, 1) | atomic.AddUint64(&c.messageCount, 1) | |||
return nil | return nil | |||
} | } | |||
func (c *Channel) put(m *Message) error { | func (c *Channel) put(m *Message) error { | |||
select { | select { | |||
case c.memoryMsgChan <- m: | case c.memoryMsgChan <- m: | |||
default: | default: | |||
b := bufferPoolGet() | err := writeMessageToBackend(m, c.backend) | |||
err := writeMessageToBackend(b, m, c.backend) | c.nsqd.SetHealth(err) | |||
bufferPoolPut(b) | ||||
c.ctx.nsqd.SetHealth(err) | ||||
if err != nil { | if err != nil { | |||
c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s", | c.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write mess age to backend - %s", | |||
c.name, err) | c.name, err) | |||
return err | return err | |||
} | } | |||
} | } | |||
return nil | return nil | |||
} | } | |||
func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) { | func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) { | |||
atomic.AddUint64(&c.messageCount, 1) | atomic.AddUint64(&c.messageCount, 1) | |||
c.StartDeferredTimeout(msg, timeout) | c.StartDeferredTimeout(msg, timeout) | |||
skipping to change at line 334 | skipping to change at line 335 | |||
// TouchMessage resets the timeout for an in-flight message | // TouchMessage resets the timeout for an in-flight message | |||
func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout ti me.Duration) error { | func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout ti me.Duration) error { | |||
msg, err := c.popInFlightMessage(clientID, id) | msg, err := c.popInFlightMessage(clientID, id) | |||
if err != nil { | if err != nil { | |||
return err | return err | |||
} | } | |||
c.removeFromInFlightPQ(msg) | c.removeFromInFlightPQ(msg) | |||
newTimeout := time.Now().Add(clientMsgTimeout) | newTimeout := time.Now().Add(clientMsgTimeout) | |||
if newTimeout.Sub(msg.deliveryTS) >= | if newTimeout.Sub(msg.deliveryTS) >= | |||
c.ctx.nsqd.getOpts().MaxMsgTimeout { | c.nsqd.getOpts().MaxMsgTimeout { | |||
// we would have gone over, set to the max | // we would have gone over, set to the max | |||
newTimeout = msg.deliveryTS.Add(c.ctx.nsqd.getOpts().MaxMsgTimeou t) | newTimeout = msg.deliveryTS.Add(c.nsqd.getOpts().MaxMsgTimeout) | |||
} | } | |||
msg.pri = newTimeout.UnixNano() | msg.pri = newTimeout.UnixNano() | |||
err = c.pushInFlightMessage(msg) | err = c.pushInFlightMessage(msg) | |||
if err != nil { | if err != nil { | |||
return err | return err | |||
} | } | |||
c.addToInFlightPQ(msg) | c.addToInFlightPQ(msg) | |||
return nil | return nil | |||
} | } | |||
skipping to change at line 393 | skipping to change at line 394 | |||
c.exitMutex.RUnlock() | c.exitMutex.RUnlock() | |||
return err | return err | |||
} | } | |||
// deferred requeue | // deferred requeue | |||
return c.StartDeferredTimeout(msg, timeout) | return c.StartDeferredTimeout(msg, timeout) | |||
} | } | |||
// AddClient adds a client to the Channel's client list | // AddClient adds a client to the Channel's client list | |||
func (c *Channel) AddClient(clientID int64, client Consumer) error { | func (c *Channel) AddClient(clientID int64, client Consumer) error { | |||
c.Lock() | c.exitMutex.RLock() | |||
defer c.Unlock() | defer c.exitMutex.RUnlock() | |||
if c.Exiting() { | ||||
return errors.New("exiting") | ||||
} | ||||
c.RLock() | ||||
_, ok := c.clients[clientID] | _, ok := c.clients[clientID] | |||
numClients := len(c.clients) | ||||
c.RUnlock() | ||||
if ok { | if ok { | |||
return nil | return nil | |||
} | } | |||
maxChannelConsumers := c.ctx.nsqd.getOpts().MaxChannelConsumers | maxChannelConsumers := c.nsqd.getOpts().MaxChannelConsumers | |||
if maxChannelConsumers != 0 && len(c.clients) >= maxChannelConsumers { | if maxChannelConsumers != 0 && numClients >= maxChannelConsumers { | |||
return errors.New("E_TOO_MANY_CHANNEL_CONSUMERS") | return fmt.Errorf("consumers for %s:%s exceeds limit of %d", | |||
c.topicName, c.name, maxChannelConsumers) | ||||
} | } | |||
c.Lock() | ||||
c.clients[clientID] = client | c.clients[clientID] = client | |||
c.Unlock() | ||||
return nil | return nil | |||
} | } | |||
// RemoveClient removes a client from the Channel's client list | // RemoveClient removes a client from the Channel's client list | |||
func (c *Channel) RemoveClient(clientID int64) { | func (c *Channel) RemoveClient(clientID int64) { | |||
c.Lock() | c.exitMutex.RLock() | |||
defer c.Unlock() | defer c.exitMutex.RUnlock() | |||
if c.Exiting() { | ||||
return | ||||
} | ||||
c.RLock() | ||||
_, ok := c.clients[clientID] | _, ok := c.clients[clientID] | |||
c.RUnlock() | ||||
if !ok { | if !ok { | |||
return | return | |||
} | } | |||
c.Lock() | ||||
delete(c.clients, clientID) | delete(c.clients, clientID) | |||
c.Unlock() | ||||
if len(c.clients) == 0 && c.ephemeral == true { | if len(c.clients) == 0 && c.ephemeral == true { | |||
go c.deleter.Do(func() { c.deleteCallback(c) }) | go c.deleter.Do(func() { c.deleteCallback(c) }) | |||
} | } | |||
} | } | |||
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout tim e.Duration) error { | func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout tim e.Duration) error { | |||
now := time.Now() | now := time.Now() | |||
msg.clientID = clientID | msg.clientID = clientID | |||
msg.deliveryTS = now | msg.deliveryTS = now | |||
End of changes. 42 change blocks. | ||||
45 lines changed or deleted | 66 lines changed or added |