protocol_v2.go (nsq-1.2.0) | : | protocol_v2.go (nsq-1.2.1) | ||
---|---|---|---|---|
skipping to change at line 33 | skipping to change at line 33 | |||
frameTypeResponse int32 = 0 | frameTypeResponse int32 = 0 | |||
frameTypeError int32 = 1 | frameTypeError int32 = 1 | |||
frameTypeMessage int32 = 2 | frameTypeMessage int32 = 2 | |||
) | ) | |||
var separatorBytes = []byte(" ") | var separatorBytes = []byte(" ") | |||
var heartbeatBytes = []byte("_heartbeat_") | var heartbeatBytes = []byte("_heartbeat_") | |||
var okBytes = []byte("OK") | var okBytes = []byte("OK") | |||
type protocolV2 struct { | type protocolV2 struct { | |||
ctx *context | nsqd *NSQD | |||
} | } | |||
func (p *protocolV2) IOLoop(conn net.Conn) error { | func (p *protocolV2) NewClient(conn net.Conn) protocol.Client { | |||
clientID := atomic.AddInt64(&p.nsqd.clientIDSequence, 1) | ||||
return newClientV2(clientID, conn, p.nsqd) | ||||
} | ||||
func (p *protocolV2) IOLoop(c protocol.Client) error { | ||||
var err error | var err error | |||
var line []byte | var line []byte | |||
var zeroTime time.Time | var zeroTime time.Time | |||
clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) | client := c.(*clientV2) | |||
client := newClientV2(clientID, conn, p.ctx) | ||||
p.ctx.nsqd.AddClient(client.ID, client) | ||||
// synchronize the startup of messagePump in order | // synchronize the startup of messagePump in order | |||
// to guarantee that it gets a chance to initialize | // to guarantee that it gets a chance to initialize | |||
// goroutine local state derived from client attributes | // goroutine local state derived from client attributes | |||
// and avoid a potential race with IDENTIFY (where a client | // and avoid a potential race with IDENTIFY (where a client | |||
// could have changed or disabled said attributes) | // could have changed or disabled said attributes) | |||
messagePumpStartedChan := make(chan bool) | messagePumpStartedChan := make(chan bool) | |||
go p.messagePump(client, messagePumpStartedChan) | go p.messagePump(client, messagePumpStartedChan) | |||
<-messagePumpStartedChan | <-messagePumpStartedChan | |||
skipping to change at line 81 | skipping to change at line 84 | |||
} | } | |||
// trim the '\n' | // trim the '\n' | |||
line = line[:len(line)-1] | line = line[:len(line)-1] | |||
// optionally trim the '\r' | // optionally trim the '\r' | |||
if len(line) > 0 && line[len(line)-1] == '\r' { | if len(line) > 0 && line[len(line)-1] == '\r' { | |||
line = line[:len(line)-1] | line = line[:len(line)-1] | |||
} | } | |||
params := bytes.Split(line, separatorBytes) | params := bytes.Split(line, separatorBytes) | |||
p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, param s) | p.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params) | |||
var response []byte | var response []byte | |||
response, err = p.Exec(client, params) | response, err = p.Exec(client, params) | |||
if err != nil { | if err != nil { | |||
ctx := "" | ctx := "" | |||
if parentErr := err.(protocol.ChildErr).Parent(); parentE rr != nil { | if parentErr := err.(protocol.ChildErr).Parent(); parentE rr != nil { | |||
ctx = " - " + parentErr.Error() | ctx = " - " + parentErr.Error() | |||
} | } | |||
p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ct x) | p.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx) | |||
sendErr := p.Send(client, frameTypeError, []byte(err.Erro r())) | sendErr := p.Send(client, frameTypeError, []byte(err.Erro r())) | |||
if sendErr != nil { | if sendErr != nil { | |||
p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx) | p.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sen dErr, ctx) | |||
break | break | |||
} | } | |||
// errors of type FatalClientErr should forceably close t he connection | // errors of type FatalClientErr should forceably close t he connection | |||
if _, ok := err.(*protocol.FatalClientErr); ok { | if _, ok := err.(*protocol.FatalClientErr); ok { | |||
break | break | |||
} | } | |||
continue | continue | |||
} | } | |||
if response != nil { | if response != nil { | |||
err = p.Send(client, frameTypeResponse, response) | err = p.Send(client, frameTypeResponse, response) | |||
if err != nil { | if err != nil { | |||
err = fmt.Errorf("failed to send response - %s", err) | err = fmt.Errorf("failed to send response - %s", err) | |||
break | break | |||
} | } | |||
} | } | |||
} | } | |||
p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client) | p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client) | |||
conn.Close() | ||||
close(client.ExitChan) | close(client.ExitChan) | |||
if client.Channel != nil { | if client.Channel != nil { | |||
client.Channel.RemoveClient(client.ID) | client.Channel.RemoveClient(client.ID) | |||
} | } | |||
p.ctx.nsqd.RemoveClient(client.ID) | ||||
return err | return err | |||
} | } | |||
func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error { | func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error { | |||
p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): writing msg(%s) to client(%s) - | p.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): writing msg(%s) to client(%s) - %s" | |||
%s", msg.ID, client, msg.Body) | , msg.ID, client, msg.Body) | |||
var buf = &bytes.Buffer{} | ||||
buf := bufferPoolGet() | ||||
defer bufferPoolPut(buf) | ||||
_, err := msg.WriteTo(buf) | _, err := msg.WriteTo(buf) | |||
if err != nil { | if err != nil { | |||
return err | return err | |||
} | } | |||
err = p.Send(client, frameTypeMessage, buf.Bytes()) | err = p.Send(client, frameTypeMessage, buf.Bytes()) | |||
if err != nil { | if err != nil { | |||
return err | return err | |||
} | } | |||
skipping to change at line 205 | skipping to change at line 208 | |||
return p.CLS(client, params) | return p.CLS(client, params) | |||
case bytes.Equal(params[0], []byte("AUTH")): | case bytes.Equal(params[0], []byte("AUTH")): | |||
return p.AUTH(client, params) | return p.AUTH(client, params) | |||
} | } | |||
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("inv alid command %s", params[0])) | return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("inv alid command %s", params[0])) | |||
} | } | |||
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { | func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { | |||
var err error | var err error | |||
var memoryMsgChan chan *Message | var memoryMsgChan chan *Message | |||
var backendMsgChan chan []byte | var backendMsgChan <-chan []byte | |||
var subChannel *Channel | var subChannel *Channel | |||
// NOTE: `flusherChan` is used to bound message latency for | // NOTE: `flusherChan` is used to bound message latency for | |||
// the pathological case of a channel on a low volume topic | // the pathological case of a channel on a low volume topic | |||
// with >1 clients having >1 RDY counts | // with >1 clients having >1 RDY counts | |||
var flusherChan <-chan time.Time | var flusherChan <-chan time.Time | |||
var sampleRate int32 | var sampleRate int32 | |||
subEventChan := client.SubEventChan | subEventChan := client.SubEventChan | |||
identifyEventChan := client.IdentifyEventChan | identifyEventChan := client.IdentifyEventChan | |||
outputBufferTicker := time.NewTicker(client.OutputBufferTimeout) | outputBufferTicker := time.NewTicker(client.OutputBufferTimeout) | |||
skipping to change at line 309 | skipping to change at line 312 | |||
if err != nil { | if err != nil { | |||
goto exit | goto exit | |||
} | } | |||
case b := <-backendMsgChan: | case b := <-backendMsgChan: | |||
if sampleRate > 0 && rand.Int31n(100) > sampleRate { | if sampleRate > 0 && rand.Int31n(100) > sampleRate { | |||
continue | continue | |||
} | } | |||
msg, err := decodeMessage(b) | msg, err := decodeMessage(b) | |||
if err != nil { | if err != nil { | |||
p.ctx.nsqd.logf(LOG_ERROR, "failed to decode mess age - %s", err) | p.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) | |||
continue | continue | |||
} | } | |||
msg.Attempts++ | msg.Attempts++ | |||
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeou t) | subChannel.StartInFlightTimeout(msg, client.ID, msgTimeou t) | |||
client.SendingMessage() | client.SendingMessage() | |||
err = p.SendMessage(client, msg) | err = p.SendMessage(client, msg) | |||
if err != nil { | if err != nil { | |||
goto exit | goto exit | |||
} | } | |||
skipping to change at line 340 | skipping to change at line 343 | |||
if err != nil { | if err != nil { | |||
goto exit | goto exit | |||
} | } | |||
flushed = false | flushed = false | |||
case <-client.ExitChan: | case <-client.ExitChan: | |||
goto exit | goto exit | |||
} | } | |||
} | } | |||
exit: | exit: | |||
p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", clien t) | p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client) | |||
heartbeatTicker.Stop() | heartbeatTicker.Stop() | |||
outputBufferTicker.Stop() | outputBufferTicker.Stop() | |||
if err != nil { | if err != nil { | |||
p.ctx.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err) | p.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s ", client, err) | |||
} | } | |||
} | } | |||
func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error) { | func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error) { | |||
var err error | var err error | |||
if atomic.LoadInt32(&client.State) != stateInit { | if atomic.LoadInt32(&client.State) != stateInit { | |||
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "cannot IDENTIFY in current state") | return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "cannot IDENTIFY in current state") | |||
} | } | |||
bodyLen, err := readLen(client.Reader, client.lenSlice) | bodyLen, err := readLen(client.Reader, client.lenSlice) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTI FY failed to read body size") | return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTI FY failed to read body size") | |||
} | } | |||
if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxBodySize { | if int64(bodyLen) > p.nsqd.getOpts().MaxBodySize { | |||
return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", | return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", | |||
fmt.Sprintf("IDENTIFY body too big %d > %d", bodyLen, p.c tx.nsqd.getOpts().MaxBodySize)) | fmt.Sprintf("IDENTIFY body too big %d > %d", bodyLen, p.n sqd.getOpts().MaxBodySize)) | |||
} | } | |||
if bodyLen <= 0 { | if bodyLen <= 0 { | |||
return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", | return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", | |||
fmt.Sprintf("IDENTIFY invalid body size %d", bodyLen)) | fmt.Sprintf("IDENTIFY invalid body size %d", bodyLen)) | |||
} | } | |||
body := make([]byte, bodyLen) | body := make([]byte, bodyLen) | |||
_, err = io.ReadFull(client.Reader, body) | _, err = io.ReadFull(client.Reader, body) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTI FY failed to read body") | return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTI FY failed to read body") | |||
} | } | |||
// body is a json structure with producer information | // body is a json structure with producer information | |||
var identifyData identifyDataV2 | var identifyData identifyDataV2 | |||
err = json.Unmarshal(body, &identifyData) | err = json.Unmarshal(body, &identifyData) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTI FY failed to decode JSON body") | return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTI FY failed to decode JSON body") | |||
} | } | |||
p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %+v", client, identifyData ) | p.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %+v", client, identifyData) | |||
err = client.Identify(identifyData) | err = client.Identify(identifyData) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTI FY "+err.Error()) | return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTI FY "+err.Error()) | |||
} | } | |||
// bail out early if we're not negotiating features | // bail out early if we're not negotiating features | |||
if !identifyData.FeatureNegotiation { | if !identifyData.FeatureNegotiation { | |||
return okBytes, nil | return okBytes, nil | |||
} | } | |||
tlsv1 := p.ctx.nsqd.tlsConfig != nil && identifyData.TLSv1 | tlsv1 := p.nsqd.tlsConfig != nil && identifyData.TLSv1 | |||
deflate := p.ctx.nsqd.getOpts().DeflateEnabled && identifyData.Deflate | deflate := p.nsqd.getOpts().DeflateEnabled && identifyData.Deflate | |||
deflateLevel := 6 | deflateLevel := 6 | |||
if deflate && identifyData.DeflateLevel > 0 { | if deflate && identifyData.DeflateLevel > 0 { | |||
deflateLevel = identifyData.DeflateLevel | deflateLevel = identifyData.DeflateLevel | |||
} | } | |||
if max := p.ctx.nsqd.getOpts().MaxDeflateLevel; max < deflateLevel { | if max := p.nsqd.getOpts().MaxDeflateLevel; max < deflateLevel { | |||
deflateLevel = max | deflateLevel = max | |||
} | } | |||
snappy := p.ctx.nsqd.getOpts().SnappyEnabled && identifyData.Snappy | snappy := p.nsqd.getOpts().SnappyEnabled && identifyData.Snappy | |||
if deflate && snappy { | if deflate && snappy { | |||
return nil, protocol.NewFatalClientErr(nil, "E_IDENTIFY_FAILED", "cannot enable both deflate and snappy compression") | return nil, protocol.NewFatalClientErr(nil, "E_IDENTIFY_FAILED", "cannot enable both deflate and snappy compression") | |||
} | } | |||
resp, err := json.Marshal(struct { | resp, err := json.Marshal(struct { | |||
MaxRdyCount int64 `json:"max_rdy_count"` | MaxRdyCount int64 `json:"max_rdy_count"` | |||
Version string `json:"version"` | Version string `json:"version"` | |||
MaxMsgTimeout int64 `json:"max_msg_timeout"` | MaxMsgTimeout int64 `json:"max_msg_timeout"` | |||
MsgTimeout int64 `json:"msg_timeout"` | MsgTimeout int64 `json:"msg_timeout"` | |||
TLSv1 bool `json:"tls_v1"` | TLSv1 bool `json:"tls_v1"` | |||
Deflate bool `json:"deflate"` | Deflate bool `json:"deflate"` | |||
DeflateLevel int `json:"deflate_level"` | DeflateLevel int `json:"deflate_level"` | |||
MaxDeflateLevel int `json:"max_deflate_level"` | MaxDeflateLevel int `json:"max_deflate_level"` | |||
Snappy bool `json:"snappy"` | Snappy bool `json:"snappy"` | |||
SampleRate int32 `json:"sample_rate"` | SampleRate int32 `json:"sample_rate"` | |||
AuthRequired bool `json:"auth_required"` | AuthRequired bool `json:"auth_required"` | |||
OutputBufferSize int `json:"output_buffer_size"` | OutputBufferSize int `json:"output_buffer_size"` | |||
OutputBufferTimeout int64 `json:"output_buffer_timeout"` | OutputBufferTimeout int64 `json:"output_buffer_timeout"` | |||
}{ | }{ | |||
MaxRdyCount: p.ctx.nsqd.getOpts().MaxRdyCount, | MaxRdyCount: p.nsqd.getOpts().MaxRdyCount, | |||
Version: version.Binary, | Version: version.Binary, | |||
MaxMsgTimeout: int64(p.ctx.nsqd.getOpts().MaxMsgTimeout / t ime.Millisecond), | MaxMsgTimeout: int64(p.nsqd.getOpts().MaxMsgTimeout / time. Millisecond), | |||
MsgTimeout: int64(client.MsgTimeout / time.Millisecond), | MsgTimeout: int64(client.MsgTimeout / time.Millisecond), | |||
TLSv1: tlsv1, | TLSv1: tlsv1, | |||
Deflate: deflate, | Deflate: deflate, | |||
DeflateLevel: deflateLevel, | DeflateLevel: deflateLevel, | |||
MaxDeflateLevel: p.ctx.nsqd.getOpts().MaxDeflateLevel, | MaxDeflateLevel: p.nsqd.getOpts().MaxDeflateLevel, | |||
Snappy: snappy, | Snappy: snappy, | |||
SampleRate: client.SampleRate, | SampleRate: client.SampleRate, | |||
AuthRequired: p.ctx.nsqd.IsAuthEnabled(), | AuthRequired: p.nsqd.IsAuthEnabled(), | |||
OutputBufferSize: client.OutputBufferSize, | OutputBufferSize: client.OutputBufferSize, | |||
OutputBufferTimeout: int64(client.OutputBufferTimeout / time.Mill isecond), | OutputBufferTimeout: int64(client.OutputBufferTimeout / time.Mill isecond), | |||
}) | }) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error()) | return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error()) | |||
} | } | |||
err = p.Send(client, frameTypeResponse, resp) | err = p.Send(client, frameTypeResponse, resp) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error()) | return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error()) | |||
} | } | |||
if tlsv1 { | if tlsv1 { | |||
p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] upgrading connectio n to TLS", client) | p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] upgrading connection to TLS", client) | |||
err = client.UpgradeTLS() | err = client.UpgradeTLS() | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_F AILED", "IDENTIFY failed "+err.Error()) | return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_F AILED", "IDENTIFY failed "+err.Error()) | |||
} | } | |||
err = p.Send(client, frameTypeResponse, okBytes) | err = p.Send(client, frameTypeResponse, okBytes) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_F AILED", "IDENTIFY failed "+err.Error()) | return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_F AILED", "IDENTIFY failed "+err.Error()) | |||
} | } | |||
} | } | |||
if snappy { | if snappy { | |||
p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] upgrading connectio n to snappy", client) | p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] upgrading connection to snappy", client) | |||
err = client.UpgradeSnappy() | err = client.UpgradeSnappy() | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_F AILED", "IDENTIFY failed "+err.Error()) | return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_F AILED", "IDENTIFY failed "+err.Error()) | |||
} | } | |||
err = p.Send(client, frameTypeResponse, okBytes) | err = p.Send(client, frameTypeResponse, okBytes) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_F AILED", "IDENTIFY failed "+err.Error()) | return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_F AILED", "IDENTIFY failed "+err.Error()) | |||
} | } | |||
} | } | |||
if deflate { | if deflate { | |||
p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] upgrading connectio n to deflate (level %d)", client, deflateLevel) | p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] upgrading connection to deflate (level %d)", client, deflateLevel) | |||
err = client.UpgradeDeflate(deflateLevel) | err = client.UpgradeDeflate(deflateLevel) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_F AILED", "IDENTIFY failed "+err.Error()) | return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_F AILED", "IDENTIFY failed "+err.Error()) | |||
} | } | |||
err = p.Send(client, frameTypeResponse, okBytes) | err = p.Send(client, frameTypeResponse, okBytes) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_F AILED", "IDENTIFY failed "+err.Error()) | return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_F AILED", "IDENTIFY failed "+err.Error()) | |||
} | } | |||
} | } | |||
skipping to change at line 504 | skipping to change at line 507 | |||
if len(params) != 1 { | if len(params) != 1 { | |||
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "AUTH in valid number of parameters") | return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "AUTH in valid number of parameters") | |||
} | } | |||
bodyLen, err := readLen(client.Reader, client.lenSlice) | bodyLen, err := readLen(client.Reader, client.lenSlice) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "AUTH f ailed to read body size") | return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "AUTH f ailed to read body size") | |||
} | } | |||
if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxBodySize { | if int64(bodyLen) > p.nsqd.getOpts().MaxBodySize { | |||
return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", | return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", | |||
fmt.Sprintf("AUTH body too big %d > %d", bodyLen, p.ctx.n sqd.getOpts().MaxBodySize)) | fmt.Sprintf("AUTH body too big %d > %d", bodyLen, p.nsqd. getOpts().MaxBodySize)) | |||
} | } | |||
if bodyLen <= 0 { | if bodyLen <= 0 { | |||
return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", | return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", | |||
fmt.Sprintf("AUTH invalid body size %d", bodyLen)) | fmt.Sprintf("AUTH invalid body size %d", bodyLen)) | |||
} | } | |||
body := make([]byte, bodyLen) | body := make([]byte, bodyLen) | |||
_, err = io.ReadFull(client.Reader, body) | _, err = io.ReadFull(client.Reader, body) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "AUTH f ailed to read body") | return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "AUTH f ailed to read body") | |||
} | } | |||
if client.HasAuthorizations() { | if client.HasAuthorizations() { | |||
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "AUTH al ready set") | return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "AUTH al ready set") | |||
} | } | |||
if !client.ctx.nsqd.IsAuthEnabled() { | if !client.nsqd.IsAuthEnabled() { | |||
return nil, protocol.NewFatalClientErr(err, "E_AUTH_DISABLED", "A UTH disabled") | return nil, protocol.NewFatalClientErr(err, "E_AUTH_DISABLED", "A UTH disabled") | |||
} | } | |||
if err := client.Auth(string(body)); err != nil { | if err := client.Auth(string(body)); err != nil { | |||
// we don't want to leak errors contacting the auth server to unt rusted clients | // we don't want to leak errors contacting the auth server to unt rusted clients | |||
p.ctx.nsqd.logf(LOG_WARN, "PROTOCOL(V2): [%s] AUTH failed %s", cl ient, err) | p.nsqd.logf(LOG_WARN, "PROTOCOL(V2): [%s] AUTH failed %s", client , err) | |||
return nil, protocol.NewFatalClientErr(err, "E_AUTH_FAILED", "AUT H failed") | return nil, protocol.NewFatalClientErr(err, "E_AUTH_FAILED", "AUT H failed") | |||
} | } | |||
if !client.HasAuthorizations() { | if !client.HasAuthorizations() { | |||
return nil, protocol.NewFatalClientErr(nil, "E_UNAUTHORIZED", "AU TH no authorizations found") | return nil, protocol.NewFatalClientErr(nil, "E_UNAUTHORIZED", "AU TH no authorizations found") | |||
} | } | |||
resp, err := json.Marshal(struct { | resp, err := json.Marshal(struct { | |||
Identity string `json:"identity"` | Identity string `json:"identity"` | |||
IdentityURL string `json:"identity_url"` | IdentityURL string `json:"identity_url"` | |||
skipping to change at line 563 | skipping to change at line 566 | |||
return nil, protocol.NewFatalClientErr(err, "E_AUTH_ERROR", "AUTH error "+err.Error()) | return nil, protocol.NewFatalClientErr(err, "E_AUTH_ERROR", "AUTH error "+err.Error()) | |||
} | } | |||
return nil, nil | return nil, nil | |||
} | } | |||
func (p *protocolV2) CheckAuth(client *clientV2, cmd, topicName, channelName str ing) error { | func (p *protocolV2) CheckAuth(client *clientV2, cmd, topicName, channelName str ing) error { | |||
// if auth is enabled, the client must have authorized already | // if auth is enabled, the client must have authorized already | |||
// compare topic/channel against cached authorization data (refetching if expired) | // compare topic/channel against cached authorization data (refetching if expired) | |||
if client.ctx.nsqd.IsAuthEnabled() { | if client.nsqd.IsAuthEnabled() { | |||
if !client.HasAuthorizations() { | if !client.HasAuthorizations() { | |||
return protocol.NewFatalClientErr(nil, "E_AUTH_FIRST", | return protocol.NewFatalClientErr(nil, "E_AUTH_FIRST", | |||
fmt.Sprintf("AUTH required before %s", cmd)) | fmt.Sprintf("AUTH required before %s", cmd)) | |||
} | } | |||
ok, err := client.IsAuthorized(topicName, channelName) | ok, err := client.IsAuthorized(topicName, channelName) | |||
if err != nil { | if err != nil { | |||
// we don't want to leak errors contacting the auth serve r to untrusted clients | // we don't want to leak errors contacting the auth serve r to untrusted clients | |||
p.ctx.nsqd.logf(LOG_WARN, "PROTOCOL(V2): [%s] AUTH failed %s", client, err) | p.nsqd.logf(LOG_WARN, "PROTOCOL(V2): [%s] AUTH failed %s" , client, err) | |||
return protocol.NewFatalClientErr(nil, "E_AUTH_FAILED", " AUTH failed") | return protocol.NewFatalClientErr(nil, "E_AUTH_FAILED", " AUTH failed") | |||
} | } | |||
if !ok { | if !ok { | |||
return protocol.NewFatalClientErr(nil, "E_UNAUTHORIZED", | return protocol.NewFatalClientErr(nil, "E_UNAUTHORIZED", | |||
fmt.Sprintf("AUTH failed for %s on %q %q", cmd, t opicName, channelName)) | fmt.Sprintf("AUTH failed for %s on %q %q", cmd, t opicName, channelName)) | |||
} | } | |||
} | } | |||
return nil | return nil | |||
} | } | |||
skipping to change at line 615 | skipping to change at line 618 | |||
} | } | |||
if err := p.CheckAuth(client, "SUB", topicName, channelName); err != nil { | if err := p.CheckAuth(client, "SUB", topicName, channelName); err != nil { | |||
return nil, err | return nil, err | |||
} | } | |||
// This retry-loop is a work-around for a race condition, where the | // This retry-loop is a work-around for a race condition, where the | |||
// last client can leave the channel between GetChannel() and AddClient() . | // last client can leave the channel between GetChannel() and AddClient() . | |||
// Avoid adding a client to an ephemeral channel / topic which has starte d exiting. | // Avoid adding a client to an ephemeral channel / topic which has starte d exiting. | |||
var channel *Channel | var channel *Channel | |||
for { | for i := 1; ; i++ { | |||
topic := p.ctx.nsqd.GetTopic(topicName) | topic := p.nsqd.GetTopic(topicName) | |||
channel = topic.GetChannel(channelName) | channel = topic.GetChannel(channelName) | |||
if err := channel.AddClient(client.ID, client); err != nil { | if err := channel.AddClient(client.ID, client); err != nil { | |||
return nil, protocol.NewFatalClientErr(nil, "E_TOO_MANY_C | return nil, protocol.NewFatalClientErr(err, "E_SUB_FAILED | |||
HANNEL_CONSUMERS", | ", "SUB failed "+err.Error()) | |||
fmt.Sprintf("channel consumers for %s:%s exceeds | ||||
limit of %d", | ||||
topicName, channelName, p.ctx.nsqd.getOpt | ||||
s().MaxChannelConsumers)) | ||||
} | } | |||
if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral & & topic.Exiting()) { | if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral & & topic.Exiting()) { | |||
channel.RemoveClient(client.ID) | channel.RemoveClient(client.ID) | |||
time.Sleep(1 * time.Millisecond) | if i < 2 { | |||
continue | time.Sleep(100 * time.Millisecond) | |||
continue | ||||
} | ||||
return nil, protocol.NewFatalClientErr(nil, "E_SUB_FAILED | ||||
", "SUB failed to deleted topic/channel") | ||||
} | } | |||
break | break | |||
} | } | |||
atomic.StoreInt32(&client.State, stateSubscribed) | atomic.StoreInt32(&client.State, stateSubscribed) | |||
client.Channel = channel | client.Channel = channel | |||
// update message pump | // update message pump | |||
client.SubEventChan <- channel | client.SubEventChan <- channel | |||
return okBytes, nil | return okBytes, nil | |||
} | } | |||
func (p *protocolV2) RDY(client *clientV2, params [][]byte) ([]byte, error) { | func (p *protocolV2) RDY(client *clientV2, params [][]byte) ([]byte, error) { | |||
state := atomic.LoadInt32(&client.State) | state := atomic.LoadInt32(&client.State) | |||
if state == stateClosing { | if state == stateClosing { | |||
// just ignore ready changes on a closing channel | // just ignore ready changes on a closing channel | |||
p.ctx.nsqd.logf(LOG_INFO, | p.nsqd.logf(LOG_INFO, | |||
"PROTOCOL(V2): [%s] ignoring RDY after CLS in state Clien tStateV2Closing", | "PROTOCOL(V2): [%s] ignoring RDY after CLS in state Clien tStateV2Closing", | |||
client) | client) | |||
return nil, nil | return nil, nil | |||
} | } | |||
if state != stateSubscribed { | if state != stateSubscribed { | |||
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "cannot RDY in current state") | return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "cannot RDY in current state") | |||
} | } | |||
count := int64(1) | count := int64(1) | |||
if len(params) > 1 { | if len(params) > 1 { | |||
b10, err := protocol.ByteToBase10(params[1]) | b10, err := protocol.ByteToBase10(params[1]) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_INVALID", | return nil, protocol.NewFatalClientErr(err, "E_INVALID", | |||
fmt.Sprintf("RDY could not parse count %s", param s[1])) | fmt.Sprintf("RDY could not parse count %s", param s[1])) | |||
} | } | |||
count = int64(b10) | count = int64(b10) | |||
} | } | |||
if count < 0 || count > p.ctx.nsqd.getOpts().MaxRdyCount { | if count < 0 || count > p.nsqd.getOpts().MaxRdyCount { | |||
// this needs to be a fatal error otherwise clients would have | // this needs to be a fatal error otherwise clients would have | |||
// inconsistent state | // inconsistent state | |||
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", | return nil, protocol.NewFatalClientErr(nil, "E_INVALID", | |||
fmt.Sprintf("RDY count %d out of range 0-%d", count, p.ct x.nsqd.getOpts().MaxRdyCount)) | fmt.Sprintf("RDY count %d out of range 0-%d", count, p.ns qd.getOpts().MaxRdyCount)) | |||
} | } | |||
client.SetReadyCount(count) | client.SetReadyCount(count) | |||
return nil, nil | return nil, nil | |||
} | } | |||
func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) { | func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) { | |||
state := atomic.LoadInt32(&client.State) | state := atomic.LoadInt32(&client.State) | |||
if state != stateSubscribed && state != stateClosing { | if state != stateSubscribed && state != stateClosing { | |||
skipping to change at line 724 | skipping to change at line 728 | |||
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", err.Erro r()) | return nil, protocol.NewFatalClientErr(nil, "E_INVALID", err.Erro r()) | |||
} | } | |||
timeoutMs, err := protocol.ByteToBase10(params[2]) | timeoutMs, err := protocol.ByteToBase10(params[2]) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_INVALID", | return nil, protocol.NewFatalClientErr(err, "E_INVALID", | |||
fmt.Sprintf("REQ could not parse timeout %s", params[2])) | fmt.Sprintf("REQ could not parse timeout %s", params[2])) | |||
} | } | |||
timeoutDuration := time.Duration(timeoutMs) * time.Millisecond | timeoutDuration := time.Duration(timeoutMs) * time.Millisecond | |||
maxReqTimeout := p.ctx.nsqd.getOpts().MaxReqTimeout | maxReqTimeout := p.nsqd.getOpts().MaxReqTimeout | |||
clampedTimeout := timeoutDuration | clampedTimeout := timeoutDuration | |||
if timeoutDuration < 0 { | if timeoutDuration < 0 { | |||
clampedTimeout = 0 | clampedTimeout = 0 | |||
} else if timeoutDuration > maxReqTimeout { | } else if timeoutDuration > maxReqTimeout { | |||
clampedTimeout = maxReqTimeout | clampedTimeout = maxReqTimeout | |||
} | } | |||
if clampedTimeout != timeoutDuration { | if clampedTimeout != timeoutDuration { | |||
p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] REQ timeout %d out of range 0-%d. Setting to %d", | p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] REQ timeout %d out of r ange 0-%d. Setting to %d", | |||
client, timeoutDuration, maxReqTimeout, clampedTimeout) | client, timeoutDuration, maxReqTimeout, clampedTimeout) | |||
timeoutDuration = clampedTimeout | timeoutDuration = clampedTimeout | |||
} | } | |||
err = client.Channel.RequeueMessage(client.ID, *id, timeoutDuration) | err = client.Channel.RequeueMessage(client.ID, *id, timeoutDuration) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewClientErr(err, "E_REQ_FAILED", | return nil, protocol.NewClientErr(err, "E_REQ_FAILED", | |||
fmt.Sprintf("REQ %s failed %s", *id, err.Error())) | fmt.Sprintf("REQ %s failed %s", *id, err.Error())) | |||
} | } | |||
skipping to change at line 786 | skipping to change at line 790 | |||
bodyLen, err := readLen(client.Reader, client.lenSlice) | bodyLen, err := readLen(client.Reader, client.lenSlice) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body size") | return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body size") | |||
} | } | |||
if bodyLen <= 0 { | if bodyLen <= 0 { | |||
return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", | return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", | |||
fmt.Sprintf("PUB invalid message body size %d", bodyLen)) | fmt.Sprintf("PUB invalid message body size %d", bodyLen)) | |||
} | } | |||
if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxMsgSize { | if int64(bodyLen) > p.nsqd.getOpts().MaxMsgSize { | |||
return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", | return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", | |||
fmt.Sprintf("PUB message too big %d > %d", bodyLen, p.ctx .nsqd.getOpts().MaxMsgSize)) | fmt.Sprintf("PUB message too big %d > %d", bodyLen, p.nsq d.getOpts().MaxMsgSize)) | |||
} | } | |||
messageBody := make([]byte, bodyLen) | messageBody := make([]byte, bodyLen) | |||
_, err = io.ReadFull(client.Reader, messageBody) | _, err = io.ReadFull(client.Reader, messageBody) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body") | return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body") | |||
} | } | |||
if err := p.CheckAuth(client, "PUB", topicName, ""); err != nil { | if err := p.CheckAuth(client, "PUB", topicName, ""); err != nil { | |||
return nil, err | return nil, err | |||
} | } | |||
topic := p.ctx.nsqd.GetTopic(topicName) | topic := p.nsqd.GetTopic(topicName) | |||
msg := NewMessage(topic.GenerateID(), messageBody) | msg := NewMessage(topic.GenerateID(), messageBody) | |||
err = topic.PutMessage(msg) | err = topic.PutMessage(msg) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) | return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) | |||
} | } | |||
client.PublishedMessage(topicName, 1) | client.PublishedMessage(topicName, 1) | |||
return okBytes, nil | return okBytes, nil | |||
} | } | |||
skipping to change at line 830 | skipping to change at line 834 | |||
topicName := string(params[1]) | topicName := string(params[1]) | |||
if !protocol.IsValidTopicName(topicName) { | if !protocol.IsValidTopicName(topicName) { | |||
return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC", | return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC", | |||
fmt.Sprintf("E_BAD_TOPIC MPUB topic name %q is not valid" , topicName)) | fmt.Sprintf("E_BAD_TOPIC MPUB topic name %q is not valid" , topicName)) | |||
} | } | |||
if err := p.CheckAuth(client, "MPUB", topicName, ""); err != nil { | if err := p.CheckAuth(client, "MPUB", topicName, ""); err != nil { | |||
return nil, err | return nil, err | |||
} | } | |||
topic := p.ctx.nsqd.GetTopic(topicName) | topic := p.nsqd.GetTopic(topicName) | |||
bodyLen, err := readLen(client.Reader, client.lenSlice) | bodyLen, err := readLen(client.Reader, client.lenSlice) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "MPUB f ailed to read body size") | return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "MPUB f ailed to read body size") | |||
} | } | |||
if bodyLen <= 0 { | if bodyLen <= 0 { | |||
return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", | return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", | |||
fmt.Sprintf("MPUB invalid body size %d", bodyLen)) | fmt.Sprintf("MPUB invalid body size %d", bodyLen)) | |||
} | } | |||
if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxBodySize { | if int64(bodyLen) > p.nsqd.getOpts().MaxBodySize { | |||
return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", | return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", | |||
fmt.Sprintf("MPUB body too big %d > %d", bodyLen, p.ctx.n sqd.getOpts().MaxBodySize)) | fmt.Sprintf("MPUB body too big %d > %d", bodyLen, p.nsqd. getOpts().MaxBodySize)) | |||
} | } | |||
messages, err := readMPUB(client.Reader, client.lenSlice, topic, | messages, err := readMPUB(client.Reader, client.lenSlice, topic, | |||
p.ctx.nsqd.getOpts().MaxMsgSize, p.ctx.nsqd.getOpts().MaxBodySize ) | p.nsqd.getOpts().MaxMsgSize, p.nsqd.getOpts().MaxBodySize) | |||
if err != nil { | if err != nil { | |||
return nil, err | return nil, err | |||
} | } | |||
// if we've made it this far we've validated all the input, | // if we've made it this far we've validated all the input, | |||
// the only possible error is that the topic is exiting during | // the only possible error is that the topic is exiting during | |||
// this next call (and no messages will be queued in that case) | // this next call (and no messages will be queued in that case) | |||
err = topic.PutMessages(messages) | err = topic.PutMessages(messages) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_MPUB_FAILED", "MPU B failed "+err.Error()) | return nil, protocol.NewFatalClientErr(err, "E_MPUB_FAILED", "MPU B failed "+err.Error()) | |||
skipping to change at line 886 | skipping to change at line 890 | |||
fmt.Sprintf("DPUB topic name %q is not valid", topicName) ) | fmt.Sprintf("DPUB topic name %q is not valid", topicName) ) | |||
} | } | |||
timeoutMs, err := protocol.ByteToBase10(params[2]) | timeoutMs, err := protocol.ByteToBase10(params[2]) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_INVALID", | return nil, protocol.NewFatalClientErr(err, "E_INVALID", | |||
fmt.Sprintf("DPUB could not parse timeout %s", params[2]) ) | fmt.Sprintf("DPUB could not parse timeout %s", params[2]) ) | |||
} | } | |||
timeoutDuration := time.Duration(timeoutMs) * time.Millisecond | timeoutDuration := time.Duration(timeoutMs) * time.Millisecond | |||
if timeoutDuration < 0 || timeoutDuration > p.ctx.nsqd.getOpts().MaxReqTi meout { | if timeoutDuration < 0 || timeoutDuration > p.nsqd.getOpts().MaxReqTimeou t { | |||
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", | return nil, protocol.NewFatalClientErr(nil, "E_INVALID", | |||
fmt.Sprintf("DPUB timeout %d out of range 0-%d", | fmt.Sprintf("DPUB timeout %d out of range 0-%d", | |||
timeoutMs, p.ctx.nsqd.getOpts().MaxReqTimeout/tim e.Millisecond)) | timeoutMs, p.nsqd.getOpts().MaxReqTimeout/time.Mi llisecond)) | |||
} | } | |||
bodyLen, err := readLen(client.Reader, client.lenSlice) | bodyLen, err := readLen(client.Reader, client.lenSlice) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "DPU B failed to read message body size") | return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "DPU B failed to read message body size") | |||
} | } | |||
if bodyLen <= 0 { | if bodyLen <= 0 { | |||
return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", | return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", | |||
fmt.Sprintf("DPUB invalid message body size %d", bodyLen) ) | fmt.Sprintf("DPUB invalid message body size %d", bodyLen) ) | |||
} | } | |||
if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxMsgSize { | if int64(bodyLen) > p.nsqd.getOpts().MaxMsgSize { | |||
return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", | return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", | |||
fmt.Sprintf("DPUB message too big %d > %d", bodyLen, p.ct x.nsqd.getOpts().MaxMsgSize)) | fmt.Sprintf("DPUB message too big %d > %d", bodyLen, p.ns qd.getOpts().MaxMsgSize)) | |||
} | } | |||
messageBody := make([]byte, bodyLen) | messageBody := make([]byte, bodyLen) | |||
_, err = io.ReadFull(client.Reader, messageBody) | _, err = io.ReadFull(client.Reader, messageBody) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "DPU B failed to read message body") | return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "DPU B failed to read message body") | |||
} | } | |||
if err := p.CheckAuth(client, "DPUB", topicName, ""); err != nil { | if err := p.CheckAuth(client, "DPUB", topicName, ""); err != nil { | |||
return nil, err | return nil, err | |||
} | } | |||
topic := p.ctx.nsqd.GetTopic(topicName) | topic := p.nsqd.GetTopic(topicName) | |||
msg := NewMessage(topic.GenerateID(), messageBody) | msg := NewMessage(topic.GenerateID(), messageBody) | |||
msg.deferred = timeoutDuration | msg.deferred = timeoutDuration | |||
err = topic.PutMessage(msg) | err = topic.PutMessage(msg) | |||
if err != nil { | if err != nil { | |||
return nil, protocol.NewFatalClientErr(err, "E_DPUB_FAILED", "DPU B failed "+err.Error()) | return nil, protocol.NewFatalClientErr(err, "E_DPUB_FAILED", "DPU B failed "+err.Error()) | |||
} | } | |||
client.PublishedMessage(topicName, 1) | client.PublishedMessage(topicName, 1) | |||
return okBytes, nil | return okBytes, nil | |||
skipping to change at line 1017 | skipping to change at line 1021 | |||
func readLen(r io.Reader, tmp []byte) (int32, error) { | func readLen(r io.Reader, tmp []byte) (int32, error) { | |||
_, err := io.ReadFull(r, tmp) | _, err := io.ReadFull(r, tmp) | |||
if err != nil { | if err != nil { | |||
return 0, err | return 0, err | |||
} | } | |||
return int32(binary.BigEndian.Uint32(tmp)), nil | return int32(binary.BigEndian.Uint32(tmp)), nil | |||
} | } | |||
func enforceTLSPolicy(client *clientV2, p *protocolV2, command []byte) error { | func enforceTLSPolicy(client *clientV2, p *protocolV2, command []byte) error { | |||
if p.ctx.nsqd.getOpts().TLSRequired != TLSNotRequired && atomic.LoadInt32 (&client.TLS) != 1 { | if p.nsqd.getOpts().TLSRequired != TLSNotRequired && atomic.LoadInt32(&cl ient.TLS) != 1 { | |||
return protocol.NewFatalClientErr(nil, "E_INVALID", | return protocol.NewFatalClientErr(nil, "E_INVALID", | |||
fmt.Sprintf("cannot %s in current state (TLS required)", command)) | fmt.Sprintf("cannot %s in current state (TLS required)", command)) | |||
} | } | |||
return nil | return nil | |||
} | } | |||
End of changes. 53 change blocks. | ||||
66 lines changed or deleted | 69 lines changed or added |