stats.go (nsq-1.2.0) | : | stats.go (nsq-1.2.1) | ||
---|---|---|---|---|
package nsqd | package nsqd | |||
import ( | import ( | |||
"runtime" | "runtime" | |||
"sort" | "sort" | |||
"sync/atomic" | "sync/atomic" | |||
"github.com/nsqio/nsq/internal/quantile" | "github.com/nsqio/nsq/internal/quantile" | |||
) | ) | |||
type Stats struct { | ||||
Topics []TopicStats | ||||
Producers []ClientStats | ||||
} | ||||
type ClientStats interface { | ||||
String() string | ||||
} | ||||
type TopicStats struct { | type TopicStats struct { | |||
TopicName string `json:"topic_name"` | TopicName string `json:"topic_name"` | |||
Channels []ChannelStats `json:"channels"` | Channels []ChannelStats `json:"channels"` | |||
Depth int64 `json:"depth"` | Depth int64 `json:"depth"` | |||
BackendDepth int64 `json:"backend_depth"` | BackendDepth int64 `json:"backend_depth"` | |||
MessageCount uint64 `json:"message_count"` | MessageCount uint64 `json:"message_count"` | |||
MessageBytes uint64 `json:"message_bytes"` | MessageBytes uint64 `json:"message_bytes"` | |||
Paused bool `json:"paused"` | Paused bool `json:"paused"` | |||
E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"` | E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"` | |||
skipping to change at line 78 | skipping to change at line 87 | |||
RequeueCount: atomic.LoadUint64(&c.requeueCount), | RequeueCount: atomic.LoadUint64(&c.requeueCount), | |||
TimeoutCount: atomic.LoadUint64(&c.timeoutCount), | TimeoutCount: atomic.LoadUint64(&c.timeoutCount), | |||
ClientCount: clientCount, | ClientCount: clientCount, | |||
Clients: clients, | Clients: clients, | |||
Paused: c.IsPaused(), | Paused: c.IsPaused(), | |||
E2eProcessingLatency: c.e2eProcessingLatencyStream.Result(), | E2eProcessingLatency: c.e2eProcessingLatencyStream.Result(), | |||
} | } | |||
} | } | |||
type PubCount struct { | ||||
Topic string `json:"topic"` | ||||
Count uint64 `json:"count"` | ||||
} | ||||
type ClientStats struct { | ||||
ClientID string `json:"client_id"` | ||||
Hostname string `json:"hostname"` | ||||
Version string `json:"version"` | ||||
RemoteAddress string `json:"remote_address"` | ||||
State int32 `json:"state"` | ||||
ReadyCount int64 `json:"ready_count"` | ||||
InFlightCount int64 `json:"in_flight_count"` | ||||
MessageCount uint64 `json:"message_count"` | ||||
FinishCount uint64 `json:"finish_count"` | ||||
RequeueCount uint64 `json:"requeue_count"` | ||||
ConnectTime int64 `json:"connect_ts"` | ||||
SampleRate int32 `json:"sample_rate"` | ||||
Deflate bool `json:"deflate"` | ||||
Snappy bool `json:"snappy"` | ||||
UserAgent string `json:"user_agent"` | ||||
Authed bool `json:"authed,omitempty"` | ||||
AuthIdentity string `json:"auth_identity,omitempty"` | ||||
AuthIdentityURL string `json:"auth_identity_url,omitempty"` | ||||
PubCounts []PubCount `json:"pub_counts,omitempty"` | ||||
TLS bool `json:"tls"` | ||||
CipherSuite string `json:"tls_cipher_suite"` | ||||
TLSVersion string `json:"tls_version"` | ||||
TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` | ||||
TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mu | ||||
tual"` | ||||
} | ||||
type Topics []*Topic | type Topics []*Topic | |||
func (t Topics) Len() int { return len(t) } | func (t Topics) Len() int { return len(t) } | |||
func (t Topics) Swap(i, j int) { t[i], t[j] = t[j], t[i] } | func (t Topics) Swap(i, j int) { t[i], t[j] = t[j], t[i] } | |||
type TopicsByName struct { | type TopicsByName struct { | |||
Topics | Topics | |||
} | } | |||
func (t TopicsByName) Less(i, j int) bool { return t.Topics[i].name < t.Topics[j ].name } | func (t TopicsByName) Less(i, j int) bool { return t.Topics[i].name < t.Topics[j ].name } | |||
skipping to change at line 134 | skipping to change at line 109 | |||
func (c Channels) Len() int { return len(c) } | func (c Channels) Len() int { return len(c) } | |||
func (c Channels) Swap(i, j int) { c[i], c[j] = c[j], c[i] } | func (c Channels) Swap(i, j int) { c[i], c[j] = c[j], c[i] } | |||
type ChannelsByName struct { | type ChannelsByName struct { | |||
Channels | Channels | |||
} | } | |||
func (c ChannelsByName) Less(i, j int) bool { return c.Channels[i].name < c.Chan nels[j].name } | func (c ChannelsByName) Less(i, j int) bool { return c.Channels[i].name < c.Chan nels[j].name } | |||
func (n *NSQD) GetStats(topic string, channel string, includeClients bool) []Top | func (n *NSQD) GetStats(topic string, channel string, includeClients bool) Stats | |||
icStats { | { | |||
var stats Stats | ||||
n.RLock() | n.RLock() | |||
var realTopics []*Topic | var realTopics []*Topic | |||
if topic == "" { | if topic == "" { | |||
realTopics = make([]*Topic, 0, len(n.topicMap)) | realTopics = make([]*Topic, 0, len(n.topicMap)) | |||
for _, t := range n.topicMap { | for _, t := range n.topicMap { | |||
realTopics = append(realTopics, t) | realTopics = append(realTopics, t) | |||
} | } | |||
} else if val, exists := n.topicMap[topic]; exists { | } else if val, exists := n.topicMap[topic]; exists { | |||
realTopics = []*Topic{val} | realTopics = []*Topic{val} | |||
} else { | } else { | |||
n.RUnlock() | n.RUnlock() | |||
return []TopicStats{} | return stats | |||
} | } | |||
n.RUnlock() | n.RUnlock() | |||
sort.Sort(TopicsByName{realTopics}) | sort.Sort(TopicsByName{realTopics}) | |||
topics := make([]TopicStats, 0, len(realTopics)) | topics := make([]TopicStats, 0, len(realTopics)) | |||
for _, t := range realTopics { | for _, t := range realTopics { | |||
t.RLock() | t.RLock() | |||
var realChannels []*Channel | var realChannels []*Channel | |||
if channel == "" { | if channel == "" { | |||
realChannels = make([]*Channel, 0, len(t.channelMap)) | realChannels = make([]*Channel, 0, len(t.channelMap)) | |||
for _, c := range t.channelMap { | for _, c := range t.channelMap { | |||
realChannels = append(realChannels, c) | realChannels = append(realChannels, c) | |||
} | } | |||
} else if val, exists := t.channelMap[channel]; exists { | } else if val, exists := t.channelMap[channel]; exists { | |||
realChannels = []*Channel{val} | realChannels = []*Channel{val} | |||
skipping to change at line 175 | skipping to change at line 154 | |||
t.RUnlock() | t.RUnlock() | |||
sort.Sort(ChannelsByName{realChannels}) | sort.Sort(ChannelsByName{realChannels}) | |||
channels := make([]ChannelStats, 0, len(realChannels)) | channels := make([]ChannelStats, 0, len(realChannels)) | |||
for _, c := range realChannels { | for _, c := range realChannels { | |||
var clients []ClientStats | var clients []ClientStats | |||
var clientCount int | var clientCount int | |||
c.RLock() | c.RLock() | |||
if includeClients { | if includeClients { | |||
clients = make([]ClientStats, 0, len(c.clients)) | clients = make([]ClientStats, 0, len(c.clients)) | |||
for _, client := range c.clients { | for _, client := range c.clients { | |||
clients = append(clients, client.Stats()) | clients = append(clients, client.Stats(to pic)) | |||
} | } | |||
} | } | |||
clientCount = len(c.clients) | clientCount = len(c.clients) | |||
c.RUnlock() | c.RUnlock() | |||
channels = append(channels, NewChannelStats(c, clients, c lientCount)) | channels = append(channels, NewChannelStats(c, clients, c lientCount)) | |||
} | } | |||
topics = append(topics, NewTopicStats(t, channels)) | topics = append(topics, NewTopicStats(t, channels)) | |||
} | } | |||
return topics | stats.Topics = topics | |||
} | ||||
func (n *NSQD) GetProducerStats() []ClientStats { | if includeClients { | |||
n.clientLock.RLock() | var producerStats []ClientStats | |||
var producers []Client | n.tcpServer.conns.Range(func(k, v interface{}) bool { | |||
for _, c := range n.clients { | c := v.(Client) | |||
if c.IsProducer() { | if c.Type() == typeProducer { | |||
producers = append(producers, c) | producerStats = append(producerStats, c.Stats(top | |||
} | ic)) | |||
} | } | |||
n.clientLock.RUnlock() | return true | |||
producerStats := make([]ClientStats, 0, len(producers)) | }) | |||
for _, p := range producers { | stats.Producers = producerStats | |||
producerStats = append(producerStats, p.Stats()) | ||||
} | } | |||
return producerStats | ||||
return stats | ||||
} | } | |||
type memStats struct { | type memStats struct { | |||
HeapObjects uint64 `json:"heap_objects"` | HeapObjects uint64 `json:"heap_objects"` | |||
HeapIdleBytes uint64 `json:"heap_idle_bytes"` | HeapIdleBytes uint64 `json:"heap_idle_bytes"` | |||
HeapInUseBytes uint64 `json:"heap_in_use_bytes"` | HeapInUseBytes uint64 `json:"heap_in_use_bytes"` | |||
HeapReleasedBytes uint64 `json:"heap_released_bytes"` | HeapReleasedBytes uint64 `json:"heap_released_bytes"` | |||
GCPauseUsec100 uint64 `json:"gc_pause_usec_100"` | GCPauseUsec100 uint64 `json:"gc_pause_usec_100"` | |||
GCPauseUsec99 uint64 `json:"gc_pause_usec_99"` | GCPauseUsec99 uint64 `json:"gc_pause_usec_99"` | |||
GCPauseUsec95 uint64 `json:"gc_pause_usec_95"` | GCPauseUsec95 uint64 `json:"gc_pause_usec_95"` | |||
End of changes. 10 change blocks. | ||||
54 lines changed or deleted | 31 lines changed or added |