client_v2.go (nsq-1.2.0) | : | client_v2.go (nsq-1.2.1) | ||
---|---|---|---|---|
package nsqd | package nsqd | |||
import ( | import ( | |||
"bufio" | "bufio" | |||
"compress/flate" | "compress/flate" | |||
"crypto/tls" | "crypto/tls" | |||
"fmt" | "fmt" | |||
"net" | "net" | |||
"strings" | ||||
"sync" | "sync" | |||
"sync/atomic" | "sync/atomic" | |||
"time" | "time" | |||
"github.com/golang/snappy" | "github.com/golang/snappy" | |||
"github.com/nsqio/nsq/internal/auth" | "github.com/nsqio/nsq/internal/auth" | |||
) | ) | |||
const defaultBufferSize = 16 * 1024 | const defaultBufferSize = 16 * 1024 | |||
skipping to change at line 50 | skipping to change at line 51 | |||
MsgTimeout int `json:"msg_timeout"` | MsgTimeout int `json:"msg_timeout"` | |||
} | } | |||
type identifyEvent struct { | type identifyEvent struct { | |||
OutputBufferTimeout time.Duration | OutputBufferTimeout time.Duration | |||
HeartbeatInterval time.Duration | HeartbeatInterval time.Duration | |||
SampleRate int32 | SampleRate int32 | |||
MsgTimeout time.Duration | MsgTimeout time.Duration | |||
} | } | |||
type PubCount struct { | ||||
Topic string `json:"topic"` | ||||
Count uint64 `json:"count"` | ||||
} | ||||
type ClientV2Stats struct { | ||||
ClientID string `json:"client_id"` | ||||
Hostname string `json:"hostname"` | ||||
Version string `json:"version"` | ||||
RemoteAddress string `json:"remote_address"` | ||||
State int32 `json:"state"` | ||||
ReadyCount int64 `json:"ready_count"` | ||||
InFlightCount int64 `json:"in_flight_count"` | ||||
MessageCount uint64 `json:"message_count"` | ||||
FinishCount uint64 `json:"finish_count"` | ||||
RequeueCount uint64 `json:"requeue_count"` | ||||
ConnectTime int64 `json:"connect_ts"` | ||||
SampleRate int32 `json:"sample_rate"` | ||||
Deflate bool `json:"deflate"` | ||||
Snappy bool `json:"snappy"` | ||||
UserAgent string `json:"user_agent"` | ||||
Authed bool `json:"authed,omitempty"` | ||||
AuthIdentity string `json:"auth_identity,omitempty"` | ||||
AuthIdentityURL string `json:"auth_identity_url,omitempty"` | ||||
PubCounts []PubCount `json:"pub_counts,omitempty"` | ||||
TLS bool `json:"tls"` | ||||
CipherSuite string `json:"tls_cipher_suite"` | ||||
TLSVersion string `json:"tls_version"` | ||||
TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` | ||||
TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mu | ||||
tual"` | ||||
} | ||||
func (s ClientV2Stats) String() string { | ||||
connectTime := time.Unix(s.ConnectTime, 0) | ||||
duration := time.Since(connectTime).Truncate(time.Second) | ||||
_, port, _ := net.SplitHostPort(s.RemoteAddress) | ||||
id := fmt.Sprintf("%s:%s %s", s.Hostname, port, s.UserAgent) | ||||
// producer | ||||
if len(s.PubCounts) > 0 { | ||||
var total uint64 | ||||
var topicOut []string | ||||
for _, v := range s.PubCounts { | ||||
total += v.Count | ||||
topicOut = append(topicOut, fmt.Sprintf("%s=%d", v.Topic, | ||||
v.Count)) | ||||
} | ||||
return fmt.Sprintf("[%s %-21s] msgs: %-8d topics: %s connected: % | ||||
s", | ||||
s.Version, | ||||
id, | ||||
total, | ||||
strings.Join(topicOut, ","), | ||||
duration, | ||||
) | ||||
} | ||||
// consumer | ||||
return fmt.Sprintf("[%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d | ||||
re-q: %-8d msgs: %-8d connected: %s", | ||||
s.Version, | ||||
id, | ||||
s.State, | ||||
s.InFlightCount, | ||||
s.ReadyCount, | ||||
s.FinishCount, | ||||
s.RequeueCount, | ||||
s.MessageCount, | ||||
duration, | ||||
) | ||||
} | ||||
type clientV2 struct { | type clientV2 struct { | |||
// 64bit atomic vars need to be first for proper alignment on 32bit platf orms | // 64bit atomic vars need to be first for proper alignment on 32bit platf orms | |||
ReadyCount int64 | ReadyCount int64 | |||
InFlightCount int64 | InFlightCount int64 | |||
MessageCount uint64 | MessageCount uint64 | |||
FinishCount uint64 | FinishCount uint64 | |||
RequeueCount uint64 | RequeueCount uint64 | |||
pubCounts map[string]uint64 | pubCounts map[string]uint64 | |||
writeLock sync.RWMutex | writeLock sync.RWMutex | |||
metaLock sync.RWMutex | metaLock sync.RWMutex | |||
ID int64 | ID int64 | |||
ctx *context | nsqd *NSQD | |||
UserAgent string | UserAgent string | |||
// original connection | // original connection | |||
net.Conn | net.Conn | |||
// connections based on negotiated features | // connections based on negotiated features | |||
tlsConn *tls.Conn | tlsConn *tls.Conn | |||
flateWriter *flate.Writer | flateWriter *flate.Writer | |||
// reading/writing interfaces | // reading/writing interfaces | |||
skipping to change at line 111 | skipping to change at line 184 | |||
Deflate int32 | Deflate int32 | |||
// re-usable buffer for reading the 4-byte lengths off the wire | // re-usable buffer for reading the 4-byte lengths off the wire | |||
lenBuf [4]byte | lenBuf [4]byte | |||
lenSlice []byte | lenSlice []byte | |||
AuthSecret string | AuthSecret string | |||
AuthState *auth.State | AuthState *auth.State | |||
} | } | |||
func newClientV2(id int64, conn net.Conn, ctx *context) *clientV2 { | func newClientV2(id int64, conn net.Conn, nsqd *NSQD) *clientV2 { | |||
var identifier string | var identifier string | |||
if conn != nil { | if conn != nil { | |||
identifier, _, _ = net.SplitHostPort(conn.RemoteAddr().String()) | identifier, _, _ = net.SplitHostPort(conn.RemoteAddr().String()) | |||
} | } | |||
c := &clientV2{ | c := &clientV2{ | |||
ID: id, | ID: id, | |||
ctx: ctx, | nsqd: nsqd, | |||
Conn: conn, | Conn: conn, | |||
Reader: bufio.NewReaderSize(conn, defaultBufferSize), | Reader: bufio.NewReaderSize(conn, defaultBufferSize), | |||
Writer: bufio.NewWriterSize(conn, defaultBufferSize), | Writer: bufio.NewWriterSize(conn, defaultBufferSize), | |||
OutputBufferSize: defaultBufferSize, | OutputBufferSize: defaultBufferSize, | |||
OutputBufferTimeout: ctx.nsqd.getOpts().OutputBufferTimeout, | OutputBufferTimeout: nsqd.getOpts().OutputBufferTimeout, | |||
MsgTimeout: ctx.nsqd.getOpts().MsgTimeout, | MsgTimeout: nsqd.getOpts().MsgTimeout, | |||
// ReadyStateChan has a buffer of 1 to guarantee that in the even t | // ReadyStateChan has a buffer of 1 to guarantee that in the even t | |||
// there is a race the state update is not lost | // there is a race the state update is not lost | |||
ReadyStateChan: make(chan int, 1), | ReadyStateChan: make(chan int, 1), | |||
ExitChan: make(chan int), | ExitChan: make(chan int), | |||
ConnectTime: time.Now(), | ConnectTime: time.Now(), | |||
State: stateInit, | State: stateInit, | |||
ClientID: identifier, | ClientID: identifier, | |||
Hostname: identifier, | Hostname: identifier, | |||
SubEventChan: make(chan *Channel, 1), | SubEventChan: make(chan *Channel, 1), | |||
IdentifyEventChan: make(chan identifyEvent, 1), | IdentifyEventChan: make(chan identifyEvent, 1), | |||
// heartbeats are client configurable but default to 30s | // heartbeats are client configurable but default to 30s | |||
HeartbeatInterval: ctx.nsqd.getOpts().ClientTimeout / 2, | HeartbeatInterval: nsqd.getOpts().ClientTimeout / 2, | |||
pubCounts: make(map[string]uint64), | pubCounts: make(map[string]uint64), | |||
} | } | |||
c.lenSlice = c.lenBuf[:] | c.lenSlice = c.lenBuf[:] | |||
return c | return c | |||
} | } | |||
func (c *clientV2) String() string { | func (c *clientV2) String() string { | |||
return c.RemoteAddr().String() | return c.RemoteAddr().String() | |||
} | } | |||
func (c *clientV2) Type() int { | ||||
c.metaLock.RLock() | ||||
hasPublished := len(c.pubCounts) > 0 | ||||
c.metaLock.RUnlock() | ||||
if hasPublished { | ||||
return typeProducer | ||||
} | ||||
return typeConsumer | ||||
} | ||||
func (c *clientV2) Identify(data identifyDataV2) error { | func (c *clientV2) Identify(data identifyDataV2) error { | |||
c.ctx.nsqd.logf(LOG_INFO, "[%s] IDENTIFY: %+v", c, data) | c.nsqd.logf(LOG_INFO, "[%s] IDENTIFY: %+v", c, data) | |||
c.metaLock.Lock() | c.metaLock.Lock() | |||
c.ClientID = data.ClientID | c.ClientID = data.ClientID | |||
c.Hostname = data.Hostname | c.Hostname = data.Hostname | |||
c.UserAgent = data.UserAgent | c.UserAgent = data.UserAgent | |||
c.metaLock.Unlock() | c.metaLock.Unlock() | |||
err := c.SetHeartbeatInterval(data.HeartbeatInterval) | err := c.SetHeartbeatInterval(data.HeartbeatInterval) | |||
if err != nil { | if err != nil { | |||
return err | return err | |||
skipping to change at line 202 | skipping to change at line 285 | |||
// update the client's message pump | // update the client's message pump | |||
select { | select { | |||
case c.IdentifyEventChan <- ie: | case c.IdentifyEventChan <- ie: | |||
default: | default: | |||
} | } | |||
return nil | return nil | |||
} | } | |||
func (c *clientV2) Stats() ClientStats { | func (c *clientV2) Stats(topicName string) ClientStats { | |||
c.metaLock.RLock() | c.metaLock.RLock() | |||
clientID := c.ClientID | clientID := c.ClientID | |||
hostname := c.Hostname | hostname := c.Hostname | |||
userAgent := c.UserAgent | userAgent := c.UserAgent | |||
var identity string | var identity string | |||
var identityURL string | var identityURL string | |||
if c.AuthState != nil { | if c.AuthState != nil { | |||
identity = c.AuthState.Identity | identity = c.AuthState.Identity | |||
identityURL = c.AuthState.IdentityURL | identityURL = c.AuthState.IdentityURL | |||
} | } | |||
pubCounts := make([]PubCount, 0, len(c.pubCounts)) | pubCounts := make([]PubCount, 0, len(c.pubCounts)) | |||
for topic, count := range c.pubCounts { | for topic, count := range c.pubCounts { | |||
if len(topicName) > 0 && topic != topicName { | ||||
continue | ||||
} | ||||
pubCounts = append(pubCounts, PubCount{ | pubCounts = append(pubCounts, PubCount{ | |||
Topic: topic, | Topic: topic, | |||
Count: count, | Count: count, | |||
}) | }) | |||
break | ||||
} | } | |||
c.metaLock.RUnlock() | c.metaLock.RUnlock() | |||
stats := ClientStats{ | stats := ClientV2Stats{ | |||
Version: "V2", | Version: "V2", | |||
RemoteAddress: c.RemoteAddr().String(), | RemoteAddress: c.RemoteAddr().String(), | |||
ClientID: clientID, | ClientID: clientID, | |||
Hostname: hostname, | Hostname: hostname, | |||
UserAgent: userAgent, | UserAgent: userAgent, | |||
State: atomic.LoadInt32(&c.State), | State: atomic.LoadInt32(&c.State), | |||
ReadyCount: atomic.LoadInt64(&c.ReadyCount), | ReadyCount: atomic.LoadInt64(&c.ReadyCount), | |||
InFlightCount: atomic.LoadInt64(&c.InFlightCount), | InFlightCount: atomic.LoadInt64(&c.InFlightCount), | |||
MessageCount: atomic.LoadUint64(&c.MessageCount), | MessageCount: atomic.LoadUint64(&c.MessageCount), | |||
FinishCount: atomic.LoadUint64(&c.FinishCount), | FinishCount: atomic.LoadUint64(&c.FinishCount), | |||
skipping to change at line 253 | skipping to change at line 340 | |||
if stats.TLS { | if stats.TLS { | |||
p := prettyConnectionState{c.tlsConn.ConnectionState()} | p := prettyConnectionState{c.tlsConn.ConnectionState()} | |||
stats.CipherSuite = p.GetCipherSuite() | stats.CipherSuite = p.GetCipherSuite() | |||
stats.TLSVersion = p.GetVersion() | stats.TLSVersion = p.GetVersion() | |||
stats.TLSNegotiatedProtocol = p.NegotiatedProtocol | stats.TLSNegotiatedProtocol = p.NegotiatedProtocol | |||
stats.TLSNegotiatedProtocolIsMutual = p.NegotiatedProtocolIsMutua l | stats.TLSNegotiatedProtocolIsMutual = p.NegotiatedProtocolIsMutua l | |||
} | } | |||
return stats | return stats | |||
} | } | |||
func (c *clientV2) IsProducer() bool { | ||||
c.metaLock.RLock() | ||||
retval := len(c.pubCounts) > 0 | ||||
c.metaLock.RUnlock() | ||||
return retval | ||||
} | ||||
// struct to convert from integers to the human readable strings | // struct to convert from integers to the human readable strings | |||
type prettyConnectionState struct { | type prettyConnectionState struct { | |||
tls.ConnectionState | tls.ConnectionState | |||
} | } | |||
func (p *prettyConnectionState) GetCipherSuite() string { | func (p *prettyConnectionState) GetCipherSuite() string { | |||
switch p.CipherSuite { | switch p.CipherSuite { | |||
case tls.TLS_RSA_WITH_RC4_128_SHA: | case tls.TLS_RSA_WITH_RC4_128_SHA: | |||
return "TLS_RSA_WITH_RC4_128_SHA" | return "TLS_RSA_WITH_RC4_128_SHA" | |||
case tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA: | case tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA: | |||
skipping to change at line 320 | skipping to change at line 400 | |||
} | } | |||
func (c *clientV2) IsReadyForMessages() bool { | func (c *clientV2) IsReadyForMessages() bool { | |||
if c.Channel.IsPaused() { | if c.Channel.IsPaused() { | |||
return false | return false | |||
} | } | |||
readyCount := atomic.LoadInt64(&c.ReadyCount) | readyCount := atomic.LoadInt64(&c.ReadyCount) | |||
inFlightCount := atomic.LoadInt64(&c.InFlightCount) | inFlightCount := atomic.LoadInt64(&c.InFlightCount) | |||
c.ctx.nsqd.logf(LOG_DEBUG, "[%s] state rdy: %4d inflt: %4d", c, readyCoun t, inFlightCount) | c.nsqd.logf(LOG_DEBUG, "[%s] state rdy: %4d inflt: %4d", c, readyCount, i nFlightCount) | |||
if inFlightCount >= readyCount || readyCount <= 0 { | if inFlightCount >= readyCount || readyCount <= 0 { | |||
return false | return false | |||
} | } | |||
return true | return true | |||
} | } | |||
func (c *clientV2) SetReadyCount(count int64) { | func (c *clientV2) SetReadyCount(count int64) { | |||
atomic.StoreInt64(&c.ReadyCount, count) | oldCount := atomic.SwapInt64(&c.ReadyCount, count) | |||
c.tryUpdateReadyState() | ||||
if oldCount != count { | ||||
c.tryUpdateReadyState() | ||||
} | ||||
} | } | |||
func (c *clientV2) tryUpdateReadyState() { | func (c *clientV2) tryUpdateReadyState() { | |||
// you can always *try* to write to ReadyStateChan because in the cases | // you can always *try* to write to ReadyStateChan because in the cases | |||
// where you cannot the message pump loop would have iterated anyway. | // where you cannot the message pump loop would have iterated anyway. | |||
// the atomic integer operations guarantee correctness of the value. | // the atomic integer operations guarantee correctness of the value. | |||
select { | select { | |||
case c.ReadyStateChan <- 1: | case c.ReadyStateChan <- 1: | |||
default: | default: | |||
} | } | |||
skipping to change at line 402 | skipping to change at line 485 | |||
func (c *clientV2) SetHeartbeatInterval(desiredInterval int) error { | func (c *clientV2) SetHeartbeatInterval(desiredInterval int) error { | |||
c.writeLock.Lock() | c.writeLock.Lock() | |||
defer c.writeLock.Unlock() | defer c.writeLock.Unlock() | |||
switch { | switch { | |||
case desiredInterval == -1: | case desiredInterval == -1: | |||
c.HeartbeatInterval = 0 | c.HeartbeatInterval = 0 | |||
case desiredInterval == 0: | case desiredInterval == 0: | |||
// do nothing (use default) | // do nothing (use default) | |||
case desiredInterval >= 1000 && | case desiredInterval >= 1000 && | |||
desiredInterval <= int(c.ctx.nsqd.getOpts().MaxHeartbeatInterval/ time.Millisecond): | desiredInterval <= int(c.nsqd.getOpts().MaxHeartbeatInterval/time .Millisecond): | |||
c.HeartbeatInterval = time.Duration(desiredInterval) * time.Milli second | c.HeartbeatInterval = time.Duration(desiredInterval) * time.Milli second | |||
default: | default: | |||
return fmt.Errorf("heartbeat interval (%d) is invalid", desiredIn terval) | return fmt.Errorf("heartbeat interval (%d) is invalid", desiredIn terval) | |||
} | } | |||
return nil | return nil | |||
} | } | |||
func (c *clientV2) SetOutputBuffer(desiredSize int, desiredTimeout int) error { | func (c *clientV2) SetOutputBuffer(desiredSize int, desiredTimeout int) error { | |||
c.writeLock.Lock() | c.writeLock.Lock() | |||
defer c.writeLock.Unlock() | defer c.writeLock.Unlock() | |||
switch { | switch { | |||
case desiredTimeout == -1: | case desiredTimeout == -1: | |||
c.OutputBufferTimeout = 0 | c.OutputBufferTimeout = 0 | |||
case desiredTimeout == 0: | case desiredTimeout == 0: | |||
// do nothing (use default) | // do nothing (use default) | |||
case true && | case true && | |||
desiredTimeout >= int(c.ctx.nsqd.getOpts().MinOutputBufferTimeout | desiredTimeout >= int(c.nsqd.getOpts().MinOutputBufferTimeout/tim | |||
/time.Millisecond) && | e.Millisecond) && | |||
desiredTimeout <= int(c.ctx.nsqd.getOpts().MaxOutputBufferTimeout | desiredTimeout <= int(c.nsqd.getOpts().MaxOutputBufferTimeout/tim | |||
/time.Millisecond): | e.Millisecond): | |||
c.OutputBufferTimeout = time.Duration(desiredTimeout) * time.Mill isecond | c.OutputBufferTimeout = time.Duration(desiredTimeout) * time.Mill isecond | |||
default: | default: | |||
return fmt.Errorf("output buffer timeout (%d) is invalid", desire dTimeout) | return fmt.Errorf("output buffer timeout (%d) is invalid", desire dTimeout) | |||
} | } | |||
switch { | switch { | |||
case desiredSize == -1: | case desiredSize == -1: | |||
// effectively no buffer (every write will go directly to the wra pped net.Conn) | // effectively no buffer (every write will go directly to the wra pped net.Conn) | |||
c.OutputBufferSize = 1 | c.OutputBufferSize = 1 | |||
c.OutputBufferTimeout = 0 | c.OutputBufferTimeout = 0 | |||
case desiredSize == 0: | case desiredSize == 0: | |||
// do nothing (use default) | // do nothing (use default) | |||
case desiredSize >= 64 && desiredSize <= int(c.ctx.nsqd.getOpts().MaxOutp utBufferSize): | case desiredSize >= 64 && desiredSize <= int(c.nsqd.getOpts().MaxOutputBu fferSize): | |||
c.OutputBufferSize = desiredSize | c.OutputBufferSize = desiredSize | |||
default: | default: | |||
return fmt.Errorf("output buffer size (%d) is invalid", desiredSi ze) | return fmt.Errorf("output buffer size (%d) is invalid", desiredSi ze) | |||
} | } | |||
if desiredSize != 0 { | if desiredSize != 0 { | |||
err := c.Writer.Flush() | err := c.Writer.Flush() | |||
if err != nil { | if err != nil { | |||
return err | return err | |||
} | } | |||
skipping to change at line 469 | skipping to change at line 552 | |||
} | } | |||
func (c *clientV2) SetMsgTimeout(msgTimeout int) error { | func (c *clientV2) SetMsgTimeout(msgTimeout int) error { | |||
c.writeLock.Lock() | c.writeLock.Lock() | |||
defer c.writeLock.Unlock() | defer c.writeLock.Unlock() | |||
switch { | switch { | |||
case msgTimeout == 0: | case msgTimeout == 0: | |||
// do nothing (use default) | // do nothing (use default) | |||
case msgTimeout >= 1000 && | case msgTimeout >= 1000 && | |||
msgTimeout <= int(c.ctx.nsqd.getOpts().MaxMsgTimeout/time.Millise cond): | msgTimeout <= int(c.nsqd.getOpts().MaxMsgTimeout/time.Millisecond ): | |||
c.MsgTimeout = time.Duration(msgTimeout) * time.Millisecond | c.MsgTimeout = time.Duration(msgTimeout) * time.Millisecond | |||
default: | default: | |||
return fmt.Errorf("msg timeout (%d) is invalid", msgTimeout) | return fmt.Errorf("msg timeout (%d) is invalid", msgTimeout) | |||
} | } | |||
return nil | return nil | |||
} | } | |||
func (c *clientV2) UpgradeTLS() error { | func (c *clientV2) UpgradeTLS() error { | |||
c.writeLock.Lock() | c.writeLock.Lock() | |||
defer c.writeLock.Unlock() | defer c.writeLock.Unlock() | |||
tlsConn := tls.Server(c.Conn, c.ctx.nsqd.tlsConfig) | tlsConn := tls.Server(c.Conn, c.nsqd.tlsConfig) | |||
tlsConn.SetDeadline(time.Now().Add(5 * time.Second)) | tlsConn.SetDeadline(time.Now().Add(5 * time.Second)) | |||
err := tlsConn.Handshake() | err := tlsConn.Handshake() | |||
if err != nil { | if err != nil { | |||
return err | return err | |||
} | } | |||
c.tlsConn = tlsConn | c.tlsConn = tlsConn | |||
c.Reader = bufio.NewReaderSize(c.tlsConn, defaultBufferSize) | c.Reader = bufio.NewReaderSize(c.tlsConn, defaultBufferSize) | |||
c.Writer = bufio.NewWriterSize(c.tlsConn, c.OutputBufferSize) | c.Writer = bufio.NewWriterSize(c.tlsConn, c.OutputBufferSize) | |||
skipping to change at line 570 | skipping to change at line 653 | |||
tlsEnabled := atomic.LoadInt32(&c.TLS) == 1 | tlsEnabled := atomic.LoadInt32(&c.TLS) == 1 | |||
commonName := "" | commonName := "" | |||
if tlsEnabled { | if tlsEnabled { | |||
tlsConnState := c.tlsConn.ConnectionState() | tlsConnState := c.tlsConn.ConnectionState() | |||
if len(tlsConnState.PeerCertificates) > 0 { | if len(tlsConnState.PeerCertificates) > 0 { | |||
commonName = tlsConnState.PeerCertificates[0].Subject.Com monName | commonName = tlsConnState.PeerCertificates[0].Subject.Com monName | |||
} | } | |||
} | } | |||
authState, err := auth.QueryAnyAuthd(c.ctx.nsqd.getOpts().AuthHTTPAddress es, | authState, err := auth.QueryAnyAuthd(c.nsqd.getOpts().AuthHTTPAddresses, | |||
remoteIP, tlsEnabled, commonName, c.AuthSecret, | remoteIP, tlsEnabled, commonName, c.AuthSecret, | |||
c.ctx.nsqd.getOpts().HTTPClientConnectTimeout, | c.nsqd.getOpts().HTTPClientConnectTimeout, | |||
c.ctx.nsqd.getOpts().HTTPClientRequestTimeout) | c.nsqd.getOpts().HTTPClientRequestTimeout) | |||
if err != nil { | if err != nil { | |||
return err | return err | |||
} | } | |||
c.AuthState = authState | c.AuthState = authState | |||
return nil | return nil | |||
} | } | |||
func (c *clientV2) Auth(secret string) error { | func (c *clientV2) Auth(secret string) error { | |||
c.AuthSecret = secret | c.AuthSecret = secret | |||
return c.QueryAuthd() | return c.QueryAuthd() | |||
End of changes. 24 change blocks. | ||||
31 lines changed or deleted | 118 lines changed or added |