message.go (nsq-1.2.0) | : | message.go (nsq-1.2.1) | ||
---|---|---|---|---|
package nsqd | package nsqd | |||
import ( | import ( | |||
"bytes" | ||||
"encoding/binary" | "encoding/binary" | |||
"fmt" | "fmt" | |||
"io" | "io" | |||
"time" | "time" | |||
) | ) | |||
const ( | const ( | |||
MsgIDLength = 16 | MsgIDLength = 16 | |||
minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts | minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts | |||
) | ) | |||
skipping to change at line 93 | skipping to change at line 92 | |||
} | } | |||
msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8])) | msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8])) | |||
msg.Attempts = binary.BigEndian.Uint16(b[8:10]) | msg.Attempts = binary.BigEndian.Uint16(b[8:10]) | |||
copy(msg.ID[:], b[10:10+MsgIDLength]) | copy(msg.ID[:], b[10:10+MsgIDLength]) | |||
msg.Body = b[10+MsgIDLength:] | msg.Body = b[10+MsgIDLength:] | |||
return &msg, nil | return &msg, nil | |||
} | } | |||
func writeMessageToBackend(buf *bytes.Buffer, msg *Message, bq BackendQueue) err | func writeMessageToBackend(msg *Message, bq BackendQueue) error { | |||
or { | buf := bufferPoolGet() | |||
buf.Reset() | defer bufferPoolPut(buf) | |||
_, err := msg.WriteTo(buf) | _, err := msg.WriteTo(buf) | |||
if err != nil { | if err != nil { | |||
return err | return err | |||
} | } | |||
return bq.Put(buf.Bytes()) | return bq.Put(buf.Bytes()) | |||
} | } | |||
End of changes. 2 change blocks. | ||||
4 lines changed or deleted | 3 lines changed or added |