"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "nsqd/client_v2.go" between
nsq-1.2.0.tar.gz and nsq-1.2.1.tar.gz

About: nsq is a realtime distributed and and decentralized messaging platform.

client_v2.go  (nsq-1.2.0):client_v2.go  (nsq-1.2.1)
package nsqd package nsqd
import ( import (
"bufio" "bufio"
"compress/flate" "compress/flate"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net" "net"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/golang/snappy" "github.com/golang/snappy"
"github.com/nsqio/nsq/internal/auth" "github.com/nsqio/nsq/internal/auth"
) )
const defaultBufferSize = 16 * 1024 const defaultBufferSize = 16 * 1024
skipping to change at line 50 skipping to change at line 51
MsgTimeout int `json:"msg_timeout"` MsgTimeout int `json:"msg_timeout"`
} }
type identifyEvent struct { type identifyEvent struct {
OutputBufferTimeout time.Duration OutputBufferTimeout time.Duration
HeartbeatInterval time.Duration HeartbeatInterval time.Duration
SampleRate int32 SampleRate int32
MsgTimeout time.Duration MsgTimeout time.Duration
} }
type PubCount struct {
Topic string `json:"topic"`
Count uint64 `json:"count"`
}
type ClientV2Stats struct {
ClientID string `json:"client_id"`
Hostname string `json:"hostname"`
Version string `json:"version"`
RemoteAddress string `json:"remote_address"`
State int32 `json:"state"`
ReadyCount int64 `json:"ready_count"`
InFlightCount int64 `json:"in_flight_count"`
MessageCount uint64 `json:"message_count"`
FinishCount uint64 `json:"finish_count"`
RequeueCount uint64 `json:"requeue_count"`
ConnectTime int64 `json:"connect_ts"`
SampleRate int32 `json:"sample_rate"`
Deflate bool `json:"deflate"`
Snappy bool `json:"snappy"`
UserAgent string `json:"user_agent"`
Authed bool `json:"authed,omitempty"`
AuthIdentity string `json:"auth_identity,omitempty"`
AuthIdentityURL string `json:"auth_identity_url,omitempty"`
PubCounts []PubCount `json:"pub_counts,omitempty"`
TLS bool `json:"tls"`
CipherSuite string `json:"tls_cipher_suite"`
TLSVersion string `json:"tls_version"`
TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"`
TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mu
tual"`
}
func (s ClientV2Stats) String() string {
connectTime := time.Unix(s.ConnectTime, 0)
duration := time.Since(connectTime).Truncate(time.Second)
_, port, _ := net.SplitHostPort(s.RemoteAddress)
id := fmt.Sprintf("%s:%s %s", s.Hostname, port, s.UserAgent)
// producer
if len(s.PubCounts) > 0 {
var total uint64
var topicOut []string
for _, v := range s.PubCounts {
total += v.Count
topicOut = append(topicOut, fmt.Sprintf("%s=%d", v.Topic,
v.Count))
}
return fmt.Sprintf("[%s %-21s] msgs: %-8d topics: %s connected: %
s",
s.Version,
id,
total,
strings.Join(topicOut, ","),
duration,
)
}
// consumer
return fmt.Sprintf("[%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d
re-q: %-8d msgs: %-8d connected: %s",
s.Version,
id,
s.State,
s.InFlightCount,
s.ReadyCount,
s.FinishCount,
s.RequeueCount,
s.MessageCount,
duration,
)
}
type clientV2 struct { type clientV2 struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platf orms // 64bit atomic vars need to be first for proper alignment on 32bit platf orms
ReadyCount int64 ReadyCount int64
InFlightCount int64 InFlightCount int64
MessageCount uint64 MessageCount uint64
FinishCount uint64 FinishCount uint64
RequeueCount uint64 RequeueCount uint64
pubCounts map[string]uint64 pubCounts map[string]uint64
writeLock sync.RWMutex writeLock sync.RWMutex
metaLock sync.RWMutex metaLock sync.RWMutex
ID int64 ID int64
ctx *context nsqd *NSQD
UserAgent string UserAgent string
// original connection // original connection
net.Conn net.Conn
// connections based on negotiated features // connections based on negotiated features
tlsConn *tls.Conn tlsConn *tls.Conn
flateWriter *flate.Writer flateWriter *flate.Writer
// reading/writing interfaces // reading/writing interfaces
skipping to change at line 111 skipping to change at line 184
Deflate int32 Deflate int32
// re-usable buffer for reading the 4-byte lengths off the wire // re-usable buffer for reading the 4-byte lengths off the wire
lenBuf [4]byte lenBuf [4]byte
lenSlice []byte lenSlice []byte
AuthSecret string AuthSecret string
AuthState *auth.State AuthState *auth.State
} }
func newClientV2(id int64, conn net.Conn, ctx *context) *clientV2 { func newClientV2(id int64, conn net.Conn, nsqd *NSQD) *clientV2 {
var identifier string var identifier string
if conn != nil { if conn != nil {
identifier, _, _ = net.SplitHostPort(conn.RemoteAddr().String()) identifier, _, _ = net.SplitHostPort(conn.RemoteAddr().String())
} }
c := &clientV2{ c := &clientV2{
ID: id, ID: id,
ctx: ctx, nsqd: nsqd,
Conn: conn, Conn: conn,
Reader: bufio.NewReaderSize(conn, defaultBufferSize), Reader: bufio.NewReaderSize(conn, defaultBufferSize),
Writer: bufio.NewWriterSize(conn, defaultBufferSize), Writer: bufio.NewWriterSize(conn, defaultBufferSize),
OutputBufferSize: defaultBufferSize, OutputBufferSize: defaultBufferSize,
OutputBufferTimeout: ctx.nsqd.getOpts().OutputBufferTimeout, OutputBufferTimeout: nsqd.getOpts().OutputBufferTimeout,
MsgTimeout: ctx.nsqd.getOpts().MsgTimeout, MsgTimeout: nsqd.getOpts().MsgTimeout,
// ReadyStateChan has a buffer of 1 to guarantee that in the even t // ReadyStateChan has a buffer of 1 to guarantee that in the even t
// there is a race the state update is not lost // there is a race the state update is not lost
ReadyStateChan: make(chan int, 1), ReadyStateChan: make(chan int, 1),
ExitChan: make(chan int), ExitChan: make(chan int),
ConnectTime: time.Now(), ConnectTime: time.Now(),
State: stateInit, State: stateInit,
ClientID: identifier, ClientID: identifier,
Hostname: identifier, Hostname: identifier,
SubEventChan: make(chan *Channel, 1), SubEventChan: make(chan *Channel, 1),
IdentifyEventChan: make(chan identifyEvent, 1), IdentifyEventChan: make(chan identifyEvent, 1),
// heartbeats are client configurable but default to 30s // heartbeats are client configurable but default to 30s
HeartbeatInterval: ctx.nsqd.getOpts().ClientTimeout / 2, HeartbeatInterval: nsqd.getOpts().ClientTimeout / 2,
pubCounts: make(map[string]uint64), pubCounts: make(map[string]uint64),
} }
c.lenSlice = c.lenBuf[:] c.lenSlice = c.lenBuf[:]
return c return c
} }
func (c *clientV2) String() string { func (c *clientV2) String() string {
return c.RemoteAddr().String() return c.RemoteAddr().String()
} }
func (c *clientV2) Type() int {
c.metaLock.RLock()
hasPublished := len(c.pubCounts) > 0
c.metaLock.RUnlock()
if hasPublished {
return typeProducer
}
return typeConsumer
}
func (c *clientV2) Identify(data identifyDataV2) error { func (c *clientV2) Identify(data identifyDataV2) error {
c.ctx.nsqd.logf(LOG_INFO, "[%s] IDENTIFY: %+v", c, data) c.nsqd.logf(LOG_INFO, "[%s] IDENTIFY: %+v", c, data)
c.metaLock.Lock() c.metaLock.Lock()
c.ClientID = data.ClientID c.ClientID = data.ClientID
c.Hostname = data.Hostname c.Hostname = data.Hostname
c.UserAgent = data.UserAgent c.UserAgent = data.UserAgent
c.metaLock.Unlock() c.metaLock.Unlock()
err := c.SetHeartbeatInterval(data.HeartbeatInterval) err := c.SetHeartbeatInterval(data.HeartbeatInterval)
if err != nil { if err != nil {
return err return err
skipping to change at line 202 skipping to change at line 285
// update the client's message pump // update the client's message pump
select { select {
case c.IdentifyEventChan <- ie: case c.IdentifyEventChan <- ie:
default: default:
} }
return nil return nil
} }
func (c *clientV2) Stats() ClientStats { func (c *clientV2) Stats(topicName string) ClientStats {
c.metaLock.RLock() c.metaLock.RLock()
clientID := c.ClientID clientID := c.ClientID
hostname := c.Hostname hostname := c.Hostname
userAgent := c.UserAgent userAgent := c.UserAgent
var identity string var identity string
var identityURL string var identityURL string
if c.AuthState != nil { if c.AuthState != nil {
identity = c.AuthState.Identity identity = c.AuthState.Identity
identityURL = c.AuthState.IdentityURL identityURL = c.AuthState.IdentityURL
} }
pubCounts := make([]PubCount, 0, len(c.pubCounts)) pubCounts := make([]PubCount, 0, len(c.pubCounts))
for topic, count := range c.pubCounts { for topic, count := range c.pubCounts {
if len(topicName) > 0 && topic != topicName {
continue
}
pubCounts = append(pubCounts, PubCount{ pubCounts = append(pubCounts, PubCount{
Topic: topic, Topic: topic,
Count: count, Count: count,
}) })
break
} }
c.metaLock.RUnlock() c.metaLock.RUnlock()
stats := ClientStats{ stats := ClientV2Stats{
Version: "V2", Version: "V2",
RemoteAddress: c.RemoteAddr().String(), RemoteAddress: c.RemoteAddr().String(),
ClientID: clientID, ClientID: clientID,
Hostname: hostname, Hostname: hostname,
UserAgent: userAgent, UserAgent: userAgent,
State: atomic.LoadInt32(&c.State), State: atomic.LoadInt32(&c.State),
ReadyCount: atomic.LoadInt64(&c.ReadyCount), ReadyCount: atomic.LoadInt64(&c.ReadyCount),
InFlightCount: atomic.LoadInt64(&c.InFlightCount), InFlightCount: atomic.LoadInt64(&c.InFlightCount),
MessageCount: atomic.LoadUint64(&c.MessageCount), MessageCount: atomic.LoadUint64(&c.MessageCount),
FinishCount: atomic.LoadUint64(&c.FinishCount), FinishCount: atomic.LoadUint64(&c.FinishCount),
skipping to change at line 253 skipping to change at line 340
if stats.TLS { if stats.TLS {
p := prettyConnectionState{c.tlsConn.ConnectionState()} p := prettyConnectionState{c.tlsConn.ConnectionState()}
stats.CipherSuite = p.GetCipherSuite() stats.CipherSuite = p.GetCipherSuite()
stats.TLSVersion = p.GetVersion() stats.TLSVersion = p.GetVersion()
stats.TLSNegotiatedProtocol = p.NegotiatedProtocol stats.TLSNegotiatedProtocol = p.NegotiatedProtocol
stats.TLSNegotiatedProtocolIsMutual = p.NegotiatedProtocolIsMutua l stats.TLSNegotiatedProtocolIsMutual = p.NegotiatedProtocolIsMutua l
} }
return stats return stats
} }
func (c *clientV2) IsProducer() bool {
c.metaLock.RLock()
retval := len(c.pubCounts) > 0
c.metaLock.RUnlock()
return retval
}
// struct to convert from integers to the human readable strings // struct to convert from integers to the human readable strings
type prettyConnectionState struct { type prettyConnectionState struct {
tls.ConnectionState tls.ConnectionState
} }
func (p *prettyConnectionState) GetCipherSuite() string { func (p *prettyConnectionState) GetCipherSuite() string {
switch p.CipherSuite { switch p.CipherSuite {
case tls.TLS_RSA_WITH_RC4_128_SHA: case tls.TLS_RSA_WITH_RC4_128_SHA:
return "TLS_RSA_WITH_RC4_128_SHA" return "TLS_RSA_WITH_RC4_128_SHA"
case tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA: case tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA:
skipping to change at line 320 skipping to change at line 400
} }
func (c *clientV2) IsReadyForMessages() bool { func (c *clientV2) IsReadyForMessages() bool {
if c.Channel.IsPaused() { if c.Channel.IsPaused() {
return false return false
} }
readyCount := atomic.LoadInt64(&c.ReadyCount) readyCount := atomic.LoadInt64(&c.ReadyCount)
inFlightCount := atomic.LoadInt64(&c.InFlightCount) inFlightCount := atomic.LoadInt64(&c.InFlightCount)
c.ctx.nsqd.logf(LOG_DEBUG, "[%s] state rdy: %4d inflt: %4d", c, readyCoun t, inFlightCount) c.nsqd.logf(LOG_DEBUG, "[%s] state rdy: %4d inflt: %4d", c, readyCount, i nFlightCount)
if inFlightCount >= readyCount || readyCount <= 0 { if inFlightCount >= readyCount || readyCount <= 0 {
return false return false
} }
return true return true
} }
func (c *clientV2) SetReadyCount(count int64) { func (c *clientV2) SetReadyCount(count int64) {
atomic.StoreInt64(&c.ReadyCount, count) oldCount := atomic.SwapInt64(&c.ReadyCount, count)
c.tryUpdateReadyState()
if oldCount != count {
c.tryUpdateReadyState()
}
} }
func (c *clientV2) tryUpdateReadyState() { func (c *clientV2) tryUpdateReadyState() {
// you can always *try* to write to ReadyStateChan because in the cases // you can always *try* to write to ReadyStateChan because in the cases
// where you cannot the message pump loop would have iterated anyway. // where you cannot the message pump loop would have iterated anyway.
// the atomic integer operations guarantee correctness of the value. // the atomic integer operations guarantee correctness of the value.
select { select {
case c.ReadyStateChan <- 1: case c.ReadyStateChan <- 1:
default: default:
} }
skipping to change at line 402 skipping to change at line 485
func (c *clientV2) SetHeartbeatInterval(desiredInterval int) error { func (c *clientV2) SetHeartbeatInterval(desiredInterval int) error {
c.writeLock.Lock() c.writeLock.Lock()
defer c.writeLock.Unlock() defer c.writeLock.Unlock()
switch { switch {
case desiredInterval == -1: case desiredInterval == -1:
c.HeartbeatInterval = 0 c.HeartbeatInterval = 0
case desiredInterval == 0: case desiredInterval == 0:
// do nothing (use default) // do nothing (use default)
case desiredInterval >= 1000 && case desiredInterval >= 1000 &&
desiredInterval <= int(c.ctx.nsqd.getOpts().MaxHeartbeatInterval/ time.Millisecond): desiredInterval <= int(c.nsqd.getOpts().MaxHeartbeatInterval/time .Millisecond):
c.HeartbeatInterval = time.Duration(desiredInterval) * time.Milli second c.HeartbeatInterval = time.Duration(desiredInterval) * time.Milli second
default: default:
return fmt.Errorf("heartbeat interval (%d) is invalid", desiredIn terval) return fmt.Errorf("heartbeat interval (%d) is invalid", desiredIn terval)
} }
return nil return nil
} }
func (c *clientV2) SetOutputBuffer(desiredSize int, desiredTimeout int) error { func (c *clientV2) SetOutputBuffer(desiredSize int, desiredTimeout int) error {
c.writeLock.Lock() c.writeLock.Lock()
defer c.writeLock.Unlock() defer c.writeLock.Unlock()
switch { switch {
case desiredTimeout == -1: case desiredTimeout == -1:
c.OutputBufferTimeout = 0 c.OutputBufferTimeout = 0
case desiredTimeout == 0: case desiredTimeout == 0:
// do nothing (use default) // do nothing (use default)
case true && case true &&
desiredTimeout >= int(c.ctx.nsqd.getOpts().MinOutputBufferTimeout desiredTimeout >= int(c.nsqd.getOpts().MinOutputBufferTimeout/tim
/time.Millisecond) && e.Millisecond) &&
desiredTimeout <= int(c.ctx.nsqd.getOpts().MaxOutputBufferTimeout desiredTimeout <= int(c.nsqd.getOpts().MaxOutputBufferTimeout/tim
/time.Millisecond): e.Millisecond):
c.OutputBufferTimeout = time.Duration(desiredTimeout) * time.Mill isecond c.OutputBufferTimeout = time.Duration(desiredTimeout) * time.Mill isecond
default: default:
return fmt.Errorf("output buffer timeout (%d) is invalid", desire dTimeout) return fmt.Errorf("output buffer timeout (%d) is invalid", desire dTimeout)
} }
switch { switch {
case desiredSize == -1: case desiredSize == -1:
// effectively no buffer (every write will go directly to the wra pped net.Conn) // effectively no buffer (every write will go directly to the wra pped net.Conn)
c.OutputBufferSize = 1 c.OutputBufferSize = 1
c.OutputBufferTimeout = 0 c.OutputBufferTimeout = 0
case desiredSize == 0: case desiredSize == 0:
// do nothing (use default) // do nothing (use default)
case desiredSize >= 64 && desiredSize <= int(c.ctx.nsqd.getOpts().MaxOutp utBufferSize): case desiredSize >= 64 && desiredSize <= int(c.nsqd.getOpts().MaxOutputBu fferSize):
c.OutputBufferSize = desiredSize c.OutputBufferSize = desiredSize
default: default:
return fmt.Errorf("output buffer size (%d) is invalid", desiredSi ze) return fmt.Errorf("output buffer size (%d) is invalid", desiredSi ze)
} }
if desiredSize != 0 { if desiredSize != 0 {
err := c.Writer.Flush() err := c.Writer.Flush()
if err != nil { if err != nil {
return err return err
} }
skipping to change at line 469 skipping to change at line 552
} }
func (c *clientV2) SetMsgTimeout(msgTimeout int) error { func (c *clientV2) SetMsgTimeout(msgTimeout int) error {
c.writeLock.Lock() c.writeLock.Lock()
defer c.writeLock.Unlock() defer c.writeLock.Unlock()
switch { switch {
case msgTimeout == 0: case msgTimeout == 0:
// do nothing (use default) // do nothing (use default)
case msgTimeout >= 1000 && case msgTimeout >= 1000 &&
msgTimeout <= int(c.ctx.nsqd.getOpts().MaxMsgTimeout/time.Millise cond): msgTimeout <= int(c.nsqd.getOpts().MaxMsgTimeout/time.Millisecond ):
c.MsgTimeout = time.Duration(msgTimeout) * time.Millisecond c.MsgTimeout = time.Duration(msgTimeout) * time.Millisecond
default: default:
return fmt.Errorf("msg timeout (%d) is invalid", msgTimeout) return fmt.Errorf("msg timeout (%d) is invalid", msgTimeout)
} }
return nil return nil
} }
func (c *clientV2) UpgradeTLS() error { func (c *clientV2) UpgradeTLS() error {
c.writeLock.Lock() c.writeLock.Lock()
defer c.writeLock.Unlock() defer c.writeLock.Unlock()
tlsConn := tls.Server(c.Conn, c.ctx.nsqd.tlsConfig) tlsConn := tls.Server(c.Conn, c.nsqd.tlsConfig)
tlsConn.SetDeadline(time.Now().Add(5 * time.Second)) tlsConn.SetDeadline(time.Now().Add(5 * time.Second))
err := tlsConn.Handshake() err := tlsConn.Handshake()
if err != nil { if err != nil {
return err return err
} }
c.tlsConn = tlsConn c.tlsConn = tlsConn
c.Reader = bufio.NewReaderSize(c.tlsConn, defaultBufferSize) c.Reader = bufio.NewReaderSize(c.tlsConn, defaultBufferSize)
c.Writer = bufio.NewWriterSize(c.tlsConn, c.OutputBufferSize) c.Writer = bufio.NewWriterSize(c.tlsConn, c.OutputBufferSize)
skipping to change at line 570 skipping to change at line 653
tlsEnabled := atomic.LoadInt32(&c.TLS) == 1 tlsEnabled := atomic.LoadInt32(&c.TLS) == 1
commonName := "" commonName := ""
if tlsEnabled { if tlsEnabled {
tlsConnState := c.tlsConn.ConnectionState() tlsConnState := c.tlsConn.ConnectionState()
if len(tlsConnState.PeerCertificates) > 0 { if len(tlsConnState.PeerCertificates) > 0 {
commonName = tlsConnState.PeerCertificates[0].Subject.Com monName commonName = tlsConnState.PeerCertificates[0].Subject.Com monName
} }
} }
authState, err := auth.QueryAnyAuthd(c.ctx.nsqd.getOpts().AuthHTTPAddress es, authState, err := auth.QueryAnyAuthd(c.nsqd.getOpts().AuthHTTPAddresses,
remoteIP, tlsEnabled, commonName, c.AuthSecret, remoteIP, tlsEnabled, commonName, c.AuthSecret,
c.ctx.nsqd.getOpts().HTTPClientConnectTimeout, c.nsqd.getOpts().HTTPClientConnectTimeout,
c.ctx.nsqd.getOpts().HTTPClientRequestTimeout) c.nsqd.getOpts().HTTPClientRequestTimeout)
if err != nil { if err != nil {
return err return err
} }
c.AuthState = authState c.AuthState = authState
return nil return nil
} }
func (c *clientV2) Auth(secret string) error { func (c *clientV2) Auth(secret string) error {
c.AuthSecret = secret c.AuthSecret = secret
return c.QueryAuthd() return c.QueryAuthd()
 End of changes. 24 change blocks. 
31 lines changed or deleted 118 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)