"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "nsqd/channel.go" between
nsq-1.2.0.tar.gz and nsq-1.2.1.tar.gz

About: nsq is a realtime distributed and and decentralized messaging platform.

channel.go  (nsq-1.2.0):channel.go  (nsq-1.2.1)
package nsqd package nsqd
import ( import (
"bytes"
"container/heap" "container/heap"
"errors" "errors"
"fmt"
"math" "math"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/nsqio/go-diskqueue" "github.com/nsqio/go-diskqueue"
"github.com/nsqio/nsq/internal/lg" "github.com/nsqio/nsq/internal/lg"
"github.com/nsqio/nsq/internal/pqueue" "github.com/nsqio/nsq/internal/pqueue"
"github.com/nsqio/nsq/internal/quantile" "github.com/nsqio/nsq/internal/quantile"
) )
type Consumer interface { type Consumer interface {
UnPause() UnPause()
Pause() Pause()
Close() error Close() error
TimedOutMessage() TimedOutMessage()
Stats() ClientStats Stats(string) ClientStats
Empty() Empty()
} }
// Channel represents the concrete type for a NSQ channel (and also // Channel represents the concrete type for a NSQ channel (and also
// implements the Queue interface) // implements the Queue interface)
// //
// There can be multiple channels per topic, each with there own unique set // There can be multiple channels per topic, each with there own unique set
// of subscribers (clients). // of subscribers (clients).
// //
// Channels maintain all client and message metadata, orchestrating in-flight // Channels maintain all client and message metadata, orchestrating in-flight
skipping to change at line 46 skipping to change at line 47
type Channel struct { type Channel struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platf orms // 64bit atomic vars need to be first for proper alignment on 32bit platf orms
requeueCount uint64 requeueCount uint64
messageCount uint64 messageCount uint64
timeoutCount uint64 timeoutCount uint64
sync.RWMutex sync.RWMutex
topicName string topicName string
name string name string
ctx *context nsqd *NSQD
backend BackendQueue backend BackendQueue
memoryMsgChan chan *Message memoryMsgChan chan *Message
exitFlag int32 exitFlag int32
exitMutex sync.RWMutex exitMutex sync.RWMutex
// state tracking // state tracking
clients map[int64]Consumer clients map[int64]Consumer
paused int32 paused int32
skipping to change at line 74 skipping to change at line 75
// TODO: these can be DRYd up // TODO: these can be DRYd up
deferredMessages map[MessageID]*pqueue.Item deferredMessages map[MessageID]*pqueue.Item
deferredPQ pqueue.PriorityQueue deferredPQ pqueue.PriorityQueue
deferredMutex sync.Mutex deferredMutex sync.Mutex
inFlightMessages map[MessageID]*Message inFlightMessages map[MessageID]*Message
inFlightPQ inFlightPqueue inFlightPQ inFlightPqueue
inFlightMutex sync.Mutex inFlightMutex sync.Mutex
} }
// NewChannel creates a new instance of the Channel type and returns a pointer // NewChannel creates a new instance of the Channel type and returns a pointer
func NewChannel(topicName string, channelName string, ctx *context, func NewChannel(topicName string, channelName string, nsqd *NSQD,
deleteCallback func(*Channel)) *Channel { deleteCallback func(*Channel)) *Channel {
c := &Channel{ c := &Channel{
topicName: topicName, topicName: topicName,
name: channelName, name: channelName,
memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSi ze), memoryMsgChan: nil,
clients: make(map[int64]Consumer), clients: make(map[int64]Consumer),
deleteCallback: deleteCallback, deleteCallback: deleteCallback,
ctx: ctx, nsqd: nsqd,
}
// create mem-queue only if size > 0 (do not use unbuffered chan)
if nsqd.getOpts().MemQueueSize > 0 {
c.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize
)
} }
if len(ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 { if len(nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 {
c.e2eProcessingLatencyStream = quantile.New( c.e2eProcessingLatencyStream = quantile.New(
ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime, nsqd.getOpts().E2EProcessingLatencyWindowTime,
ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles, nsqd.getOpts().E2EProcessingLatencyPercentiles,
) )
} }
c.initPQ() c.initPQ()
if strings.HasSuffix(channelName, "#ephemeral") { if strings.HasSuffix(channelName, "#ephemeral") {
c.ephemeral = true c.ephemeral = true
c.backend = newDummyBackendQueue() c.backend = newDummyBackendQueue()
} else { } else {
dqLogf := func(level diskqueue.LogLevel, f string, args ...interf ace{}) { dqLogf := func(level diskqueue.LogLevel, f string, args ...interf ace{}) {
opts := ctx.nsqd.getOpts() opts := nsqd.getOpts()
lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f , args...) lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f , args...)
} }
// backend names, for uniqueness, automatically include the topic ... // backend names, for uniqueness, automatically include the topic ...
backendName := getBackendName(topicName, channelName) backendName := getBackendName(topicName, channelName)
c.backend = diskqueue.New( c.backend = diskqueue.New(
backendName, backendName,
ctx.nsqd.getOpts().DataPath, nsqd.getOpts().DataPath,
ctx.nsqd.getOpts().MaxBytesPerFile, nsqd.getOpts().MaxBytesPerFile,
int32(minValidMsgLength), int32(minValidMsgLength),
int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength, int32(nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
ctx.nsqd.getOpts().SyncEvery, nsqd.getOpts().SyncEvery,
ctx.nsqd.getOpts().SyncTimeout, nsqd.getOpts().SyncTimeout,
dqLogf, dqLogf,
) )
} }
c.ctx.nsqd.Notify(c) c.nsqd.Notify(c, !c.ephemeral)
return c return c
} }
func (c *Channel) initPQ() { func (c *Channel) initPQ() {
pqSize := int(math.Max(1, float64(c.ctx.nsqd.getOpts().MemQueueSize)/10)) pqSize := int(math.Max(1, float64(c.nsqd.getOpts().MemQueueSize)/10))
c.inFlightMutex.Lock() c.inFlightMutex.Lock()
c.inFlightMessages = make(map[MessageID]*Message) c.inFlightMessages = make(map[MessageID]*Message)
c.inFlightPQ = newInFlightPqueue(pqSize) c.inFlightPQ = newInFlightPqueue(pqSize)
c.inFlightMutex.Unlock() c.inFlightMutex.Unlock()
c.deferredMutex.Lock() c.deferredMutex.Lock()
c.deferredMessages = make(map[MessageID]*pqueue.Item) c.deferredMessages = make(map[MessageID]*pqueue.Item)
c.deferredPQ = pqueue.New(pqSize) c.deferredPQ = pqueue.New(pqSize)
c.deferredMutex.Unlock() c.deferredMutex.Unlock()
skipping to change at line 159 skipping to change at line 164
func (c *Channel) exit(deleted bool) error { func (c *Channel) exit(deleted bool) error {
c.exitMutex.Lock() c.exitMutex.Lock()
defer c.exitMutex.Unlock() defer c.exitMutex.Unlock()
if !atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) { if !atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) {
return errors.New("exiting") return errors.New("exiting")
} }
if deleted { if deleted {
c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): deleting", c.name) c.nsqd.logf(LOG_INFO, "CHANNEL(%s): deleting", c.name)
// since we are explicitly deleting a channel (not just at system exit time) // since we are explicitly deleting a channel (not just at system exit time)
// de-register this from the lookupd // de-register this from the lookupd
c.ctx.nsqd.Notify(c) c.nsqd.Notify(c, !c.ephemeral)
} else { } else {
c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): closing", c.name) c.nsqd.logf(LOG_INFO, "CHANNEL(%s): closing", c.name)
} }
// this forceably closes client connections // this forceably closes client connections
c.RLock() c.RLock()
for _, client := range c.clients { for _, client := range c.clients {
client.Close() client.Close()
} }
c.RUnlock() c.RUnlock()
if deleted { if deleted {
skipping to change at line 210 skipping to change at line 215
} }
} }
finish: finish:
return c.backend.Empty() return c.backend.Empty()
} }
// flush persists all the messages in internal memory buffers to the backend // flush persists all the messages in internal memory buffers to the backend
// it does not drain inflight/deferred because it is only called in Close() // it does not drain inflight/deferred because it is only called in Close()
func (c *Channel) flush() error { func (c *Channel) flush() error {
var msgBuf bytes.Buffer
if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.defer redMessages) > 0 { if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.defer redMessages) > 0 {
c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): flushing %d memory %d in- flight %d deferred messages to backend", c.nsqd.logf(LOG_INFO, "CHANNEL(%s): flushing %d memory %d in-flig ht %d deferred messages to backend",
c.name, len(c.memoryMsgChan), len(c.inFlightMessages), le n(c.deferredMessages)) c.name, len(c.memoryMsgChan), len(c.inFlightMessages), le n(c.deferredMessages))
} }
for { for {
select { select {
case msg := <-c.memoryMsgChan: case msg := <-c.memoryMsgChan:
err := writeMessageToBackend(&msgBuf, msg, c.backend) err := writeMessageToBackend(msg, c.backend)
if err != nil { if err != nil {
c.ctx.nsqd.logf(LOG_ERROR, "failed to write messa ge to backend - %s", err) c.nsqd.logf(LOG_ERROR, "failed to write message t o backend - %s", err)
} }
default: default:
goto finish goto finish
} }
} }
finish: finish:
c.inFlightMutex.Lock() c.inFlightMutex.Lock()
for _, msg := range c.inFlightMessages { for _, msg := range c.inFlightMessages {
err := writeMessageToBackend(&msgBuf, msg, c.backend) err := writeMessageToBackend(msg, c.backend)
if err != nil { if err != nil {
c.ctx.nsqd.logf(LOG_ERROR, "failed to write message to ba ckend - %s", err) c.nsqd.logf(LOG_ERROR, "failed to write message to backen d - %s", err)
} }
} }
c.inFlightMutex.Unlock() c.inFlightMutex.Unlock()
c.deferredMutex.Lock() c.deferredMutex.Lock()
for _, item := range c.deferredMessages { for _, item := range c.deferredMessages {
msg := item.Value.(*Message) msg := item.Value.(*Message)
err := writeMessageToBackend(&msgBuf, msg, c.backend) err := writeMessageToBackend(msg, c.backend)
if err != nil { if err != nil {
c.ctx.nsqd.logf(LOG_ERROR, "failed to write message to ba ckend - %s", err) c.nsqd.logf(LOG_ERROR, "failed to write message to backen d - %s", err)
} }
} }
c.deferredMutex.Unlock() c.deferredMutex.Unlock()
return nil return nil
} }
func (c *Channel) Depth() int64 { func (c *Channel) Depth() int64 {
return int64(len(c.memoryMsgChan)) + c.backend.Depth() return int64(len(c.memoryMsgChan)) + c.backend.Depth()
} }
skipping to change at line 289 skipping to change at line 292
c.RUnlock() c.RUnlock()
return nil return nil
} }
func (c *Channel) IsPaused() bool { func (c *Channel) IsPaused() bool {
return atomic.LoadInt32(&c.paused) == 1 return atomic.LoadInt32(&c.paused) == 1
} }
// PutMessage writes a Message to the queue // PutMessage writes a Message to the queue
func (c *Channel) PutMessage(m *Message) error { func (c *Channel) PutMessage(m *Message) error {
c.RLock() c.exitMutex.RLock()
defer c.RUnlock() defer c.exitMutex.RUnlock()
if c.Exiting() { if c.Exiting() {
return errors.New("exiting") return errors.New("exiting")
} }
err := c.put(m) err := c.put(m)
if err != nil { if err != nil {
return err return err
} }
atomic.AddUint64(&c.messageCount, 1) atomic.AddUint64(&c.messageCount, 1)
return nil return nil
} }
func (c *Channel) put(m *Message) error { func (c *Channel) put(m *Message) error {
select { select {
case c.memoryMsgChan <- m: case c.memoryMsgChan <- m:
default: default:
b := bufferPoolGet() err := writeMessageToBackend(m, c.backend)
err := writeMessageToBackend(b, m, c.backend) c.nsqd.SetHealth(err)
bufferPoolPut(b)
c.ctx.nsqd.SetHealth(err)
if err != nil { if err != nil {
c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s", c.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write mess age to backend - %s",
c.name, err) c.name, err)
return err return err
} }
} }
return nil return nil
} }
func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) { func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) {
atomic.AddUint64(&c.messageCount, 1) atomic.AddUint64(&c.messageCount, 1)
c.StartDeferredTimeout(msg, timeout) c.StartDeferredTimeout(msg, timeout)
skipping to change at line 334 skipping to change at line 335
// TouchMessage resets the timeout for an in-flight message // TouchMessage resets the timeout for an in-flight message
func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout ti me.Duration) error { func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout ti me.Duration) error {
msg, err := c.popInFlightMessage(clientID, id) msg, err := c.popInFlightMessage(clientID, id)
if err != nil { if err != nil {
return err return err
} }
c.removeFromInFlightPQ(msg) c.removeFromInFlightPQ(msg)
newTimeout := time.Now().Add(clientMsgTimeout) newTimeout := time.Now().Add(clientMsgTimeout)
if newTimeout.Sub(msg.deliveryTS) >= if newTimeout.Sub(msg.deliveryTS) >=
c.ctx.nsqd.getOpts().MaxMsgTimeout { c.nsqd.getOpts().MaxMsgTimeout {
// we would have gone over, set to the max // we would have gone over, set to the max
newTimeout = msg.deliveryTS.Add(c.ctx.nsqd.getOpts().MaxMsgTimeou t) newTimeout = msg.deliveryTS.Add(c.nsqd.getOpts().MaxMsgTimeout)
} }
msg.pri = newTimeout.UnixNano() msg.pri = newTimeout.UnixNano()
err = c.pushInFlightMessage(msg) err = c.pushInFlightMessage(msg)
if err != nil { if err != nil {
return err return err
} }
c.addToInFlightPQ(msg) c.addToInFlightPQ(msg)
return nil return nil
} }
skipping to change at line 393 skipping to change at line 394
c.exitMutex.RUnlock() c.exitMutex.RUnlock()
return err return err
} }
// deferred requeue // deferred requeue
return c.StartDeferredTimeout(msg, timeout) return c.StartDeferredTimeout(msg, timeout)
} }
// AddClient adds a client to the Channel's client list // AddClient adds a client to the Channel's client list
func (c *Channel) AddClient(clientID int64, client Consumer) error { func (c *Channel) AddClient(clientID int64, client Consumer) error {
c.Lock() c.exitMutex.RLock()
defer c.Unlock() defer c.exitMutex.RUnlock()
if c.Exiting() {
return errors.New("exiting")
}
c.RLock()
_, ok := c.clients[clientID] _, ok := c.clients[clientID]
numClients := len(c.clients)
c.RUnlock()
if ok { if ok {
return nil return nil
} }
maxChannelConsumers := c.ctx.nsqd.getOpts().MaxChannelConsumers maxChannelConsumers := c.nsqd.getOpts().MaxChannelConsumers
if maxChannelConsumers != 0 && len(c.clients) >= maxChannelConsumers { if maxChannelConsumers != 0 && numClients >= maxChannelConsumers {
return errors.New("E_TOO_MANY_CHANNEL_CONSUMERS") return fmt.Errorf("consumers for %s:%s exceeds limit of %d",
c.topicName, c.name, maxChannelConsumers)
} }
c.Lock()
c.clients[clientID] = client c.clients[clientID] = client
c.Unlock()
return nil return nil
} }
// RemoveClient removes a client from the Channel's client list // RemoveClient removes a client from the Channel's client list
func (c *Channel) RemoveClient(clientID int64) { func (c *Channel) RemoveClient(clientID int64) {
c.Lock() c.exitMutex.RLock()
defer c.Unlock() defer c.exitMutex.RUnlock()
if c.Exiting() {
return
}
c.RLock()
_, ok := c.clients[clientID] _, ok := c.clients[clientID]
c.RUnlock()
if !ok { if !ok {
return return
} }
c.Lock()
delete(c.clients, clientID) delete(c.clients, clientID)
c.Unlock()
if len(c.clients) == 0 && c.ephemeral == true { if len(c.clients) == 0 && c.ephemeral == true {
go c.deleter.Do(func() { c.deleteCallback(c) }) go c.deleter.Do(func() { c.deleteCallback(c) })
} }
} }
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout tim e.Duration) error { func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout tim e.Duration) error {
now := time.Now() now := time.Now()
msg.clientID = clientID msg.clientID = clientID
msg.deliveryTS = now msg.deliveryTS = now
 End of changes. 42 change blocks. 
45 lines changed or deleted 66 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)