"Fossies" - the Fresh Open Source Software Archive

Member "nsq-1.2.1/nsqd/topic.go" (16 Aug 2021, 11384 Bytes) of package /linux/www/nsq-1.2.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Go source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file.

    1 package nsqd
    2 
    3 import (
    4     "errors"
    5     "strings"
    6     "sync"
    7     "sync/atomic"
    8     "time"
    9 
   10     "github.com/nsqio/go-diskqueue"
   11     "github.com/nsqio/nsq/internal/lg"
   12     "github.com/nsqio/nsq/internal/quantile"
   13     "github.com/nsqio/nsq/internal/util"
   14 )
   15 
   16 type Topic struct {
   17     // 64bit atomic vars need to be first for proper alignment on 32bit platforms
   18     messageCount uint64
   19     messageBytes uint64
   20 
   21     sync.RWMutex
   22 
   23     name              string
   24     channelMap        map[string]*Channel
   25     backend           BackendQueue
   26     memoryMsgChan     chan *Message
   27     startChan         chan int
   28     exitChan          chan int
   29     channelUpdateChan chan int
   30     waitGroup         util.WaitGroupWrapper
   31     exitFlag          int32
   32     idFactory         *guidFactory
   33 
   34     ephemeral      bool
   35     deleteCallback func(*Topic)
   36     deleter        sync.Once
   37 
   38     paused    int32
   39     pauseChan chan int
   40 
   41     nsqd *NSQD
   42 }
   43 
   44 // Topic constructor
   45 func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic {
   46     t := &Topic{
   47         name:              topicName,
   48         channelMap:        make(map[string]*Channel),
   49         memoryMsgChan:     nil,
   50         startChan:         make(chan int, 1),
   51         exitChan:          make(chan int),
   52         channelUpdateChan: make(chan int),
   53         nsqd:              nsqd,
   54         paused:            0,
   55         pauseChan:         make(chan int),
   56         deleteCallback:    deleteCallback,
   57         idFactory:         NewGUIDFactory(nsqd.getOpts().ID),
   58     }
   59     // create mem-queue only if size > 0 (do not use unbuffered chan)
   60     if nsqd.getOpts().MemQueueSize > 0 {
   61         t.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize)
   62     }
   63     if strings.HasSuffix(topicName, "#ephemeral") {
   64         t.ephemeral = true
   65         t.backend = newDummyBackendQueue()
   66     } else {
   67         dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
   68             opts := nsqd.getOpts()
   69             lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...)
   70         }
   71         t.backend = diskqueue.New(
   72             topicName,
   73             nsqd.getOpts().DataPath,
   74             nsqd.getOpts().MaxBytesPerFile,
   75             int32(minValidMsgLength),
   76             int32(nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
   77             nsqd.getOpts().SyncEvery,
   78             nsqd.getOpts().SyncTimeout,
   79             dqLogf,
   80         )
   81     }
   82 
   83     t.waitGroup.Wrap(t.messagePump)
   84 
   85     t.nsqd.Notify(t, !t.ephemeral)
   86 
   87     return t
   88 }
   89 
   90 func (t *Topic) Start() {
   91     select {
   92     case t.startChan <- 1:
   93     default:
   94     }
   95 }
   96 
   97 // Exiting returns a boolean indicating if this topic is closed/exiting
   98 func (t *Topic) Exiting() bool {
   99     return atomic.LoadInt32(&t.exitFlag) == 1
  100 }
  101 
  102 // GetChannel performs a thread safe operation
  103 // to return a pointer to a Channel object (potentially new)
  104 // for the given Topic
  105 func (t *Topic) GetChannel(channelName string) *Channel {
  106     t.Lock()
  107     channel, isNew := t.getOrCreateChannel(channelName)
  108     t.Unlock()
  109 
  110     if isNew {
  111         // update messagePump state
  112         select {
  113         case t.channelUpdateChan <- 1:
  114         case <-t.exitChan:
  115         }
  116     }
  117 
  118     return channel
  119 }
  120 
  121 // this expects the caller to handle locking
  122 func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) {
  123     channel, ok := t.channelMap[channelName]
  124     if !ok {
  125         deleteCallback := func(c *Channel) {
  126             t.DeleteExistingChannel(c.name)
  127         }
  128         channel = NewChannel(t.name, channelName, t.nsqd, deleteCallback)
  129         t.channelMap[channelName] = channel
  130         t.nsqd.logf(LOG_INFO, "TOPIC(%s): new channel(%s)", t.name, channel.name)
  131         return channel, true
  132     }
  133     return channel, false
  134 }
  135 
  136 func (t *Topic) GetExistingChannel(channelName string) (*Channel, error) {
  137     t.RLock()
  138     defer t.RUnlock()
  139     channel, ok := t.channelMap[channelName]
  140     if !ok {
  141         return nil, errors.New("channel does not exist")
  142     }
  143     return channel, nil
  144 }
  145 
  146 // DeleteExistingChannel removes a channel from the topic only if it exists
  147 func (t *Topic) DeleteExistingChannel(channelName string) error {
  148     t.RLock()
  149     channel, ok := t.channelMap[channelName]
  150     t.RUnlock()
  151     if !ok {
  152         return errors.New("channel does not exist")
  153     }
  154 
  155     t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting channel %s", t.name, channel.name)
  156 
  157     // delete empties the channel before closing
  158     // (so that we dont leave any messages around)
  159     //
  160     // we do this before removing the channel from map below (with no lock)
  161     // so that any incoming subs will error and not create a new channel
  162     // to enforce ordering
  163     channel.Delete()
  164 
  165     t.Lock()
  166     delete(t.channelMap, channelName)
  167     numChannels := len(t.channelMap)
  168     t.Unlock()
  169 
  170     // update messagePump state
  171     select {
  172     case t.channelUpdateChan <- 1:
  173     case <-t.exitChan:
  174     }
  175 
  176     if numChannels == 0 && t.ephemeral == true {
  177         go t.deleter.Do(func() { t.deleteCallback(t) })
  178     }
  179 
  180     return nil
  181 }
  182 
  183 // PutMessage writes a Message to the queue
  184 func (t *Topic) PutMessage(m *Message) error {
  185     t.RLock()
  186     defer t.RUnlock()
  187     if atomic.LoadInt32(&t.exitFlag) == 1 {
  188         return errors.New("exiting")
  189     }
  190     err := t.put(m)
  191     if err != nil {
  192         return err
  193     }
  194     atomic.AddUint64(&t.messageCount, 1)
  195     atomic.AddUint64(&t.messageBytes, uint64(len(m.Body)))
  196     return nil
  197 }
  198 
  199 // PutMessages writes multiple Messages to the queue
  200 func (t *Topic) PutMessages(msgs []*Message) error {
  201     t.RLock()
  202     defer t.RUnlock()
  203     if atomic.LoadInt32(&t.exitFlag) == 1 {
  204         return errors.New("exiting")
  205     }
  206 
  207     messageTotalBytes := 0
  208 
  209     for i, m := range msgs {
  210         err := t.put(m)
  211         if err != nil {
  212             atomic.AddUint64(&t.messageCount, uint64(i))
  213             atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))
  214             return err
  215         }
  216         messageTotalBytes += len(m.Body)
  217     }
  218 
  219     atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))
  220     atomic.AddUint64(&t.messageCount, uint64(len(msgs)))
  221     return nil
  222 }
  223 
  224 func (t *Topic) put(m *Message) error {
  225     select {
  226     case t.memoryMsgChan <- m:
  227     default:
  228         err := writeMessageToBackend(m, t.backend)
  229         t.nsqd.SetHealth(err)
  230         if err != nil {
  231             t.nsqd.logf(LOG_ERROR,
  232                 "TOPIC(%s) ERROR: failed to write message to backend - %s",
  233                 t.name, err)
  234             return err
  235         }
  236     }
  237     return nil
  238 }
  239 
  240 func (t *Topic) Depth() int64 {
  241     return int64(len(t.memoryMsgChan)) + t.backend.Depth()
  242 }
  243 
  244 // messagePump selects over the in-memory and backend queue and
  245 // writes messages to every channel for this topic
  246 func (t *Topic) messagePump() {
  247     var msg *Message
  248     var buf []byte
  249     var err error
  250     var chans []*Channel
  251     var memoryMsgChan chan *Message
  252     var backendChan <-chan []byte
  253 
  254     // do not pass messages before Start(), but avoid blocking Pause() or GetChannel()
  255     for {
  256         select {
  257         case <-t.channelUpdateChan:
  258             continue
  259         case <-t.pauseChan:
  260             continue
  261         case <-t.exitChan:
  262             goto exit
  263         case <-t.startChan:
  264         }
  265         break
  266     }
  267     t.RLock()
  268     for _, c := range t.channelMap {
  269         chans = append(chans, c)
  270     }
  271     t.RUnlock()
  272     if len(chans) > 0 && !t.IsPaused() {
  273         memoryMsgChan = t.memoryMsgChan
  274         backendChan = t.backend.ReadChan()
  275     }
  276 
  277     // main message loop
  278     for {
  279         select {
  280         case msg = <-memoryMsgChan:
  281         case buf = <-backendChan:
  282             msg, err = decodeMessage(buf)
  283             if err != nil {
  284                 t.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
  285                 continue
  286             }
  287         case <-t.channelUpdateChan:
  288             chans = chans[:0]
  289             t.RLock()
  290             for _, c := range t.channelMap {
  291                 chans = append(chans, c)
  292             }
  293             t.RUnlock()
  294             if len(chans) == 0 || t.IsPaused() {
  295                 memoryMsgChan = nil
  296                 backendChan = nil
  297             } else {
  298                 memoryMsgChan = t.memoryMsgChan
  299                 backendChan = t.backend.ReadChan()
  300             }
  301             continue
  302         case <-t.pauseChan:
  303             if len(chans) == 0 || t.IsPaused() {
  304                 memoryMsgChan = nil
  305                 backendChan = nil
  306             } else {
  307                 memoryMsgChan = t.memoryMsgChan
  308                 backendChan = t.backend.ReadChan()
  309             }
  310             continue
  311         case <-t.exitChan:
  312             goto exit
  313         }
  314 
  315         for i, channel := range chans {
  316             chanMsg := msg
  317             // copy the message because each channel
  318             // needs a unique instance but...
  319             // fastpath to avoid copy if its the first channel
  320             // (the topic already created the first copy)
  321             if i > 0 {
  322                 chanMsg = NewMessage(msg.ID, msg.Body)
  323                 chanMsg.Timestamp = msg.Timestamp
  324                 chanMsg.deferred = msg.deferred
  325             }
  326             if chanMsg.deferred != 0 {
  327                 channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
  328                 continue
  329             }
  330             err := channel.PutMessage(chanMsg)
  331             if err != nil {
  332                 t.nsqd.logf(LOG_ERROR,
  333                     "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
  334                     t.name, msg.ID, channel.name, err)
  335             }
  336         }
  337     }
  338 
  339 exit:
  340     t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
  341 }
  342 
  343 // Delete empties the topic and all its channels and closes
  344 func (t *Topic) Delete() error {
  345     return t.exit(true)
  346 }
  347 
  348 // Close persists all outstanding topic data and closes all its channels
  349 func (t *Topic) Close() error {
  350     return t.exit(false)
  351 }
  352 
  353 func (t *Topic) exit(deleted bool) error {
  354     if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) {
  355         return errors.New("exiting")
  356     }
  357 
  358     if deleted {
  359         t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting", t.name)
  360 
  361         // since we are explicitly deleting a topic (not just at system exit time)
  362         // de-register this from the lookupd
  363         t.nsqd.Notify(t, !t.ephemeral)
  364     } else {
  365         t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name)
  366     }
  367 
  368     close(t.exitChan)
  369 
  370     // synchronize the close of messagePump()
  371     t.waitGroup.Wait()
  372 
  373     if deleted {
  374         t.Lock()
  375         for _, channel := range t.channelMap {
  376             delete(t.channelMap, channel.name)
  377             channel.Delete()
  378         }
  379         t.Unlock()
  380 
  381         // empty the queue (deletes the backend files, too)
  382         t.Empty()
  383         return t.backend.Delete()
  384     }
  385 
  386     // close all the channels
  387     t.RLock()
  388     for _, channel := range t.channelMap {
  389         err := channel.Close()
  390         if err != nil {
  391             // we need to continue regardless of error to close all the channels
  392             t.nsqd.logf(LOG_ERROR, "channel(%s) close - %s", channel.name, err)
  393         }
  394     }
  395     t.RUnlock()
  396 
  397     // write anything leftover to disk
  398     t.flush()
  399     return t.backend.Close()
  400 }
  401 
  402 func (t *Topic) Empty() error {
  403     for {
  404         select {
  405         case <-t.memoryMsgChan:
  406         default:
  407             goto finish
  408         }
  409     }
  410 
  411 finish:
  412     return t.backend.Empty()
  413 }
  414 
  415 func (t *Topic) flush() error {
  416     if len(t.memoryMsgChan) > 0 {
  417         t.nsqd.logf(LOG_INFO,
  418             "TOPIC(%s): flushing %d memory messages to backend",
  419             t.name, len(t.memoryMsgChan))
  420     }
  421 
  422     for {
  423         select {
  424         case msg := <-t.memoryMsgChan:
  425             err := writeMessageToBackend(msg, t.backend)
  426             if err != nil {
  427                 t.nsqd.logf(LOG_ERROR,
  428                     "ERROR: failed to write message to backend - %s", err)
  429             }
  430         default:
  431             goto finish
  432         }
  433     }
  434 
  435 finish:
  436     return nil
  437 }
  438 
  439 func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile {
  440     var latencyStream *quantile.Quantile
  441     t.RLock()
  442     realChannels := make([]*Channel, 0, len(t.channelMap))
  443     for _, c := range t.channelMap {
  444         realChannels = append(realChannels, c)
  445     }
  446     t.RUnlock()
  447     for _, c := range realChannels {
  448         if c.e2eProcessingLatencyStream == nil {
  449             continue
  450         }
  451         if latencyStream == nil {
  452             latencyStream = quantile.New(
  453                 t.nsqd.getOpts().E2EProcessingLatencyWindowTime,
  454                 t.nsqd.getOpts().E2EProcessingLatencyPercentiles)
  455         }
  456         latencyStream.Merge(c.e2eProcessingLatencyStream)
  457     }
  458     return latencyStream
  459 }
  460 
  461 func (t *Topic) Pause() error {
  462     return t.doPause(true)
  463 }
  464 
  465 func (t *Topic) UnPause() error {
  466     return t.doPause(false)
  467 }
  468 
  469 func (t *Topic) doPause(pause bool) error {
  470     if pause {
  471         atomic.StoreInt32(&t.paused, 1)
  472     } else {
  473         atomic.StoreInt32(&t.paused, 0)
  474     }
  475 
  476     select {
  477     case t.pauseChan <- 1:
  478     case <-t.exitChan:
  479     }
  480 
  481     return nil
  482 }
  483 
  484 func (t *Topic) IsPaused() bool {
  485     return atomic.LoadInt32(&t.paused) == 1
  486 }
  487 
  488 func (t *Topic) GenerateID() MessageID {
  489     var i int64 = 0
  490     for {
  491         id, err := t.idFactory.NewGUID()
  492         if err == nil {
  493             return id.Hex()
  494         }
  495         if i%10000 == 0 {
  496             t.nsqd.logf(LOG_ERROR, "TOPIC(%s): failed to create guid - %s", t.name, err)
  497         }
  498         time.Sleep(time.Millisecond)
  499         i++
  500     }
  501 }