channel_test.go (nsq-1.2.0) | : | channel_test.go (nsq-1.2.1) | ||
---|---|---|---|---|
skipping to change at line 153 | skipping to change at line 153 | |||
tcpAddr, _, nsqd := mustStartNSQD(opts) | tcpAddr, _, nsqd := mustStartNSQD(opts) | |||
defer os.RemoveAll(opts.DataPath) | defer os.RemoveAll(opts.DataPath) | |||
defer nsqd.Exit() | defer nsqd.Exit() | |||
conn, _ := mustConnectNSQD(tcpAddr) | conn, _ := mustConnectNSQD(tcpAddr) | |||
defer conn.Close() | defer conn.Close() | |||
topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix())) | topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix())) | |||
topic := nsqd.GetTopic(topicName) | topic := nsqd.GetTopic(topicName) | |||
channel := topic.GetChannel("channel") | channel := topic.GetChannel("channel") | |||
client := newClientV2(0, conn, &context{nsqd}) | client := newClientV2(0, conn, nsqd) | |||
client.SetReadyCount(25) | client.SetReadyCount(25) | |||
err := channel.AddClient(client.ID, client) | err := channel.AddClient(client.ID, client) | |||
test.Equal(t, err, nil) | test.Equal(t, err, nil) | |||
for i := 0; i < 25; i++ { | for i := 0; i < 25; i++ { | |||
msg := NewMessage(topic.GenerateID(), []byte("test")) | msg := NewMessage(topic.GenerateID(), []byte("test")) | |||
channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout) | channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout) | |||
client.SendingMessage() | client.SendingMessage() | |||
} | } | |||
for _, cl := range channel.clients { | for _, cl := range channel.clients { | |||
stats := cl.Stats() | stats := cl.Stats("").(ClientV2Stats) | |||
test.Equal(t, int64(25), stats.InFlightCount) | test.Equal(t, int64(25), stats.InFlightCount) | |||
} | } | |||
channel.Empty() | channel.Empty() | |||
for _, cl := range channel.clients { | for _, cl := range channel.clients { | |||
stats := cl.Stats() | stats := cl.Stats("").(ClientV2Stats) | |||
test.Equal(t, int64(0), stats.InFlightCount) | test.Equal(t, int64(0), stats.InFlightCount) | |||
} | } | |||
} | } | |||
func TestMaxChannelConsumers(t *testing.T) { | func TestMaxChannelConsumers(t *testing.T) { | |||
opts := NewOptions() | opts := NewOptions() | |||
opts.Logger = test.NewTestLogger(t) | opts.Logger = test.NewTestLogger(t) | |||
opts.MaxChannelConsumers = 1 | opts.MaxChannelConsumers = 1 | |||
tcpAddr, _, nsqd := mustStartNSQD(opts) | tcpAddr, _, nsqd := mustStartNSQD(opts) | |||
defer os.RemoveAll(opts.DataPath) | defer os.RemoveAll(opts.DataPath) | |||
defer nsqd.Exit() | defer nsqd.Exit() | |||
conn, _ := mustConnectNSQD(tcpAddr) | conn, _ := mustConnectNSQD(tcpAddr) | |||
defer conn.Close() | defer conn.Close() | |||
topicName := "test_max_channel_consumers" + strconv.Itoa(int(time.Now().U nix())) | topicName := "test_max_channel_consumers" + strconv.Itoa(int(time.Now().U nix())) | |||
topic := nsqd.GetTopic(topicName) | topic := nsqd.GetTopic(topicName) | |||
channel := topic.GetChannel("channel") | channel := topic.GetChannel("channel") | |||
client1 := newClientV2(1, conn, &context{nsqd}) | client1 := newClientV2(1, conn, nsqd) | |||
client1.SetReadyCount(25) | client1.SetReadyCount(25) | |||
err := channel.AddClient(client1.ID, client1) | err := channel.AddClient(client1.ID, client1) | |||
test.Equal(t, err, nil) | test.Equal(t, err, nil) | |||
client2 := newClientV2(2, conn, &context{nsqd}) | client2 := newClientV2(2, conn, nsqd) | |||
client2.SetReadyCount(25) | client2.SetReadyCount(25) | |||
err = channel.AddClient(client2.ID, client2) | err = channel.AddClient(client2.ID, client2) | |||
test.NotEqual(t, err, nil) | test.NotEqual(t, err, nil) | |||
} | } | |||
func TestChannelHealth(t *testing.T) { | func TestChannelHealth(t *testing.T) { | |||
opts := NewOptions() | opts := NewOptions() | |||
opts.Logger = test.NewTestLogger(t) | opts.Logger = test.NewTestLogger(t) | |||
opts.MemQueueSize = 2 | opts.MemQueueSize = 2 | |||
End of changes. 5 change blocks. | ||||
5 lines changed or deleted | 5 lines changed or added |