"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "nsqd/stats.go" between
nsq-1.2.0.tar.gz and nsq-1.2.1.tar.gz

About: nsq is a realtime distributed and and decentralized messaging platform.

stats.go  (nsq-1.2.0):stats.go  (nsq-1.2.1)
package nsqd package nsqd
import ( import (
"runtime" "runtime"
"sort" "sort"
"sync/atomic" "sync/atomic"
"github.com/nsqio/nsq/internal/quantile" "github.com/nsqio/nsq/internal/quantile"
) )
type Stats struct {
Topics []TopicStats
Producers []ClientStats
}
type ClientStats interface {
String() string
}
type TopicStats struct { type TopicStats struct {
TopicName string `json:"topic_name"` TopicName string `json:"topic_name"`
Channels []ChannelStats `json:"channels"` Channels []ChannelStats `json:"channels"`
Depth int64 `json:"depth"` Depth int64 `json:"depth"`
BackendDepth int64 `json:"backend_depth"` BackendDepth int64 `json:"backend_depth"`
MessageCount uint64 `json:"message_count"` MessageCount uint64 `json:"message_count"`
MessageBytes uint64 `json:"message_bytes"` MessageBytes uint64 `json:"message_bytes"`
Paused bool `json:"paused"` Paused bool `json:"paused"`
E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"` E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
skipping to change at line 78 skipping to change at line 87
RequeueCount: atomic.LoadUint64(&c.requeueCount), RequeueCount: atomic.LoadUint64(&c.requeueCount),
TimeoutCount: atomic.LoadUint64(&c.timeoutCount), TimeoutCount: atomic.LoadUint64(&c.timeoutCount),
ClientCount: clientCount, ClientCount: clientCount,
Clients: clients, Clients: clients,
Paused: c.IsPaused(), Paused: c.IsPaused(),
E2eProcessingLatency: c.e2eProcessingLatencyStream.Result(), E2eProcessingLatency: c.e2eProcessingLatencyStream.Result(),
} }
} }
type PubCount struct {
Topic string `json:"topic"`
Count uint64 `json:"count"`
}
type ClientStats struct {
ClientID string `json:"client_id"`
Hostname string `json:"hostname"`
Version string `json:"version"`
RemoteAddress string `json:"remote_address"`
State int32 `json:"state"`
ReadyCount int64 `json:"ready_count"`
InFlightCount int64 `json:"in_flight_count"`
MessageCount uint64 `json:"message_count"`
FinishCount uint64 `json:"finish_count"`
RequeueCount uint64 `json:"requeue_count"`
ConnectTime int64 `json:"connect_ts"`
SampleRate int32 `json:"sample_rate"`
Deflate bool `json:"deflate"`
Snappy bool `json:"snappy"`
UserAgent string `json:"user_agent"`
Authed bool `json:"authed,omitempty"`
AuthIdentity string `json:"auth_identity,omitempty"`
AuthIdentityURL string `json:"auth_identity_url,omitempty"`
PubCounts []PubCount `json:"pub_counts,omitempty"`
TLS bool `json:"tls"`
CipherSuite string `json:"tls_cipher_suite"`
TLSVersion string `json:"tls_version"`
TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"`
TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mu
tual"`
}
type Topics []*Topic type Topics []*Topic
func (t Topics) Len() int { return len(t) } func (t Topics) Len() int { return len(t) }
func (t Topics) Swap(i, j int) { t[i], t[j] = t[j], t[i] } func (t Topics) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
type TopicsByName struct { type TopicsByName struct {
Topics Topics
} }
func (t TopicsByName) Less(i, j int) bool { return t.Topics[i].name < t.Topics[j ].name } func (t TopicsByName) Less(i, j int) bool { return t.Topics[i].name < t.Topics[j ].name }
skipping to change at line 134 skipping to change at line 109
func (c Channels) Len() int { return len(c) } func (c Channels) Len() int { return len(c) }
func (c Channels) Swap(i, j int) { c[i], c[j] = c[j], c[i] } func (c Channels) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
type ChannelsByName struct { type ChannelsByName struct {
Channels Channels
} }
func (c ChannelsByName) Less(i, j int) bool { return c.Channels[i].name < c.Chan nels[j].name } func (c ChannelsByName) Less(i, j int) bool { return c.Channels[i].name < c.Chan nels[j].name }
func (n *NSQD) GetStats(topic string, channel string, includeClients bool) []Top func (n *NSQD) GetStats(topic string, channel string, includeClients bool) Stats
icStats { {
var stats Stats
n.RLock() n.RLock()
var realTopics []*Topic var realTopics []*Topic
if topic == "" { if topic == "" {
realTopics = make([]*Topic, 0, len(n.topicMap)) realTopics = make([]*Topic, 0, len(n.topicMap))
for _, t := range n.topicMap { for _, t := range n.topicMap {
realTopics = append(realTopics, t) realTopics = append(realTopics, t)
} }
} else if val, exists := n.topicMap[topic]; exists { } else if val, exists := n.topicMap[topic]; exists {
realTopics = []*Topic{val} realTopics = []*Topic{val}
} else { } else {
n.RUnlock() n.RUnlock()
return []TopicStats{} return stats
} }
n.RUnlock() n.RUnlock()
sort.Sort(TopicsByName{realTopics}) sort.Sort(TopicsByName{realTopics})
topics := make([]TopicStats, 0, len(realTopics)) topics := make([]TopicStats, 0, len(realTopics))
for _, t := range realTopics { for _, t := range realTopics {
t.RLock() t.RLock()
var realChannels []*Channel var realChannels []*Channel
if channel == "" { if channel == "" {
realChannels = make([]*Channel, 0, len(t.channelMap)) realChannels = make([]*Channel, 0, len(t.channelMap))
for _, c := range t.channelMap { for _, c := range t.channelMap {
realChannels = append(realChannels, c) realChannels = append(realChannels, c)
} }
} else if val, exists := t.channelMap[channel]; exists { } else if val, exists := t.channelMap[channel]; exists {
realChannels = []*Channel{val} realChannels = []*Channel{val}
skipping to change at line 175 skipping to change at line 154
t.RUnlock() t.RUnlock()
sort.Sort(ChannelsByName{realChannels}) sort.Sort(ChannelsByName{realChannels})
channels := make([]ChannelStats, 0, len(realChannels)) channels := make([]ChannelStats, 0, len(realChannels))
for _, c := range realChannels { for _, c := range realChannels {
var clients []ClientStats var clients []ClientStats
var clientCount int var clientCount int
c.RLock() c.RLock()
if includeClients { if includeClients {
clients = make([]ClientStats, 0, len(c.clients)) clients = make([]ClientStats, 0, len(c.clients))
for _, client := range c.clients { for _, client := range c.clients {
clients = append(clients, client.Stats()) clients = append(clients, client.Stats(to pic))
} }
} }
clientCount = len(c.clients) clientCount = len(c.clients)
c.RUnlock() c.RUnlock()
channels = append(channels, NewChannelStats(c, clients, c lientCount)) channels = append(channels, NewChannelStats(c, clients, c lientCount))
} }
topics = append(topics, NewTopicStats(t, channels)) topics = append(topics, NewTopicStats(t, channels))
} }
return topics stats.Topics = topics
}
func (n *NSQD) GetProducerStats() []ClientStats { if includeClients {
n.clientLock.RLock() var producerStats []ClientStats
var producers []Client n.tcpServer.conns.Range(func(k, v interface{}) bool {
for _, c := range n.clients { c := v.(Client)
if c.IsProducer() { if c.Type() == typeProducer {
producers = append(producers, c) producerStats = append(producerStats, c.Stats(top
} ic))
} }
n.clientLock.RUnlock() return true
producerStats := make([]ClientStats, 0, len(producers)) })
for _, p := range producers { stats.Producers = producerStats
producerStats = append(producerStats, p.Stats())
} }
return producerStats
return stats
} }
type memStats struct { type memStats struct {
HeapObjects uint64 `json:"heap_objects"` HeapObjects uint64 `json:"heap_objects"`
HeapIdleBytes uint64 `json:"heap_idle_bytes"` HeapIdleBytes uint64 `json:"heap_idle_bytes"`
HeapInUseBytes uint64 `json:"heap_in_use_bytes"` HeapInUseBytes uint64 `json:"heap_in_use_bytes"`
HeapReleasedBytes uint64 `json:"heap_released_bytes"` HeapReleasedBytes uint64 `json:"heap_released_bytes"`
GCPauseUsec100 uint64 `json:"gc_pause_usec_100"` GCPauseUsec100 uint64 `json:"gc_pause_usec_100"`
GCPauseUsec99 uint64 `json:"gc_pause_usec_99"` GCPauseUsec99 uint64 `json:"gc_pause_usec_99"`
GCPauseUsec95 uint64 `json:"gc_pause_usec_95"` GCPauseUsec95 uint64 `json:"gc_pause_usec_95"`
 End of changes. 10 change blocks. 
54 lines changed or deleted 31 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)