"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "nsqd/statsd.go" between
nsq-1.2.0.tar.gz and nsq-1.2.1.tar.gz

About: nsq is a realtime distributed and and decentralized messaging platform.

statsd.go  (nsq-1.2.0):statsd.go  (nsq-1.2.1)
package nsqd package nsqd
import ( import (
"fmt" "fmt"
"math" "math"
"net" "net"
"strings"
"time" "time"
"github.com/nsqio/nsq/internal/statsd" "github.com/nsqio/nsq/internal/statsd"
"github.com/nsqio/nsq/internal/writers" "github.com/nsqio/nsq/internal/writers"
) )
type Uint64Slice []uint64 type Uint64Slice []uint64
func (s Uint64Slice) Len() int { func (s Uint64Slice) Len() int {
return len(s) return len(s)
skipping to change at line 29 skipping to change at line 30
func (s Uint64Slice) Swap(i, j int) { func (s Uint64Slice) Swap(i, j int) {
s[i], s[j] = s[j], s[i] s[i], s[j] = s[j], s[i]
} }
func (s Uint64Slice) Less(i, j int) bool { func (s Uint64Slice) Less(i, j int) bool {
return s[i] < s[j] return s[i] < s[j]
} }
func (n *NSQD) statsdLoop() { func (n *NSQD) statsdLoop() {
var lastMemStats memStats var lastMemStats memStats
var lastStats []TopicStats var lastStats Stats
interval := n.getOpts().StatsdInterval interval := n.getOpts().StatsdInterval
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
for { for {
select { select {
case <-n.exitChan: case <-n.exitChan:
goto exit goto exit
case <-ticker.C: case <-ticker.C:
addr := n.getOpts().StatsdAddress addr := n.getOpts().StatsdAddress
prefix := n.getOpts().StatsdPrefix prefix := n.getOpts().StatsdPrefix
excludeEphemeral := n.getOpts().StatsdExcludeEphemeral
conn, err := net.DialTimeout("udp", addr, time.Second) conn, err := net.DialTimeout("udp", addr, time.Second)
if err != nil { if err != nil {
n.logf(LOG_ERROR, "failed to create UDP socket to statsd(%s)", addr) n.logf(LOG_ERROR, "failed to create UDP socket to statsd(%s)", addr)
continue continue
} }
sw := writers.NewSpreadWriter(conn, interval-time.Second, n.exitChan) sw := writers.NewSpreadWriter(conn, interval-time.Second, n.exitChan)
bw := writers.NewBoundaryBufferedWriter(sw, n.getOpts().S tatsdUDPPacketSize) bw := writers.NewBoundaryBufferedWriter(sw, n.getOpts().S tatsdUDPPacketSize)
client := statsd.NewClient(bw, prefix) client := statsd.NewClient(bw, prefix)
n.logf(LOG_INFO, "STATSD: pushing stats to %s", addr) n.logf(LOG_INFO, "STATSD: pushing stats to %s", addr)
stats := n.GetStats("", "", false) stats := n.GetStats("", "", false)
for _, topic := range stats { for _, topic := range stats.Topics {
if excludeEphemeral && strings.HasSuffix(topic.To
picName, "#ephemeral") {
continue
}
// try to find the topic in the last collection // try to find the topic in the last collection
lastTopic := TopicStats{} lastTopic := TopicStats{}
for _, checkTopic := range lastStats { for _, checkTopic := range lastStats.Topics {
if topic.TopicName == checkTopic.TopicNam e { if topic.TopicName == checkTopic.TopicNam e {
lastTopic = checkTopic lastTopic = checkTopic
break break
} }
} }
diff := topic.MessageCount - lastTopic.MessageCou nt diff := topic.MessageCount - lastTopic.MessageCou nt
stat := fmt.Sprintf("topic.%s.message_count", top ic.TopicName) stat := fmt.Sprintf("topic.%s.message_count", top ic.TopicName)
client.Incr(stat, int64(diff)) client.Incr(stat, int64(diff))
diff = topic.MessageBytes - lastTopic.MessageByte s diff = topic.MessageBytes - lastTopic.MessageByte s
skipping to change at line 83 skipping to change at line 89
for _, item := range topic.E2eProcessingLatency.P ercentiles { for _, item := range topic.E2eProcessingLatency.P ercentiles {
stat = fmt.Sprintf("topic.%s.e2e_processi ng_latency_%.0f", topic.TopicName, item["quantile"]*100.0) stat = fmt.Sprintf("topic.%s.e2e_processi ng_latency_%.0f", topic.TopicName, item["quantile"]*100.0)
// We can cast the value to int64 since a value of 1 is the // We can cast the value to int64 since a value of 1 is the
// minimum resolution we will have, so th ere is no loss of // minimum resolution we will have, so th ere is no loss of
// accuracy // accuracy
client.Gauge(stat, int64(item["value"])) client.Gauge(stat, int64(item["value"]))
} }
for _, channel := range topic.Channels { for _, channel := range topic.Channels {
if excludeEphemeral && strings.HasSuffix(
channel.ChannelName, "#ephemeral") {
continue
}
// try to find the channel in the last co llection // try to find the channel in the last co llection
lastChannel := ChannelStats{} lastChannel := ChannelStats{}
for _, checkChannel := range lastTopic.Ch annels { for _, checkChannel := range lastTopic.Ch annels {
if channel.ChannelName == checkCh annel.ChannelName { if channel.ChannelName == checkCh annel.ChannelName {
lastChannel = checkChanne l lastChannel = checkChanne l
break break
} }
} }
diff := channel.MessageCount - lastChanne l.MessageCount diff := channel.MessageCount - lastChanne l.MessageCount
stat := fmt.Sprintf("topic.%s.channel.%s. message_count", topic.TopicName, channel.ChannelName) stat := fmt.Sprintf("topic.%s.channel.%s. message_count", topic.TopicName, channel.ChannelName)
 End of changes. 6 change blocks. 
3 lines changed or deleted 15 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)