"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "nsqd/channel_test.go" between
nsq-1.2.0.tar.gz and nsq-1.2.1.tar.gz

About: nsq is a realtime distributed and and decentralized messaging platform.

channel_test.go  (nsq-1.2.0):channel_test.go  (nsq-1.2.1)
skipping to change at line 153 skipping to change at line 153
tcpAddr, _, nsqd := mustStartNSQD(opts) tcpAddr, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath) defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit() defer nsqd.Exit()
conn, _ := mustConnectNSQD(tcpAddr) conn, _ := mustConnectNSQD(tcpAddr)
defer conn.Close() defer conn.Close()
topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix())) topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName) topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel") channel := topic.GetChannel("channel")
client := newClientV2(0, conn, &context{nsqd}) client := newClientV2(0, conn, nsqd)
client.SetReadyCount(25) client.SetReadyCount(25)
err := channel.AddClient(client.ID, client) err := channel.AddClient(client.ID, client)
test.Equal(t, err, nil) test.Equal(t, err, nil)
for i := 0; i < 25; i++ { for i := 0; i < 25; i++ {
msg := NewMessage(topic.GenerateID(), []byte("test")) msg := NewMessage(topic.GenerateID(), []byte("test"))
channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout) channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
client.SendingMessage() client.SendingMessage()
} }
for _, cl := range channel.clients { for _, cl := range channel.clients {
stats := cl.Stats() stats := cl.Stats("").(ClientV2Stats)
test.Equal(t, int64(25), stats.InFlightCount) test.Equal(t, int64(25), stats.InFlightCount)
} }
channel.Empty() channel.Empty()
for _, cl := range channel.clients { for _, cl := range channel.clients {
stats := cl.Stats() stats := cl.Stats("").(ClientV2Stats)
test.Equal(t, int64(0), stats.InFlightCount) test.Equal(t, int64(0), stats.InFlightCount)
} }
} }
func TestMaxChannelConsumers(t *testing.T) { func TestMaxChannelConsumers(t *testing.T) {
opts := NewOptions() opts := NewOptions()
opts.Logger = test.NewTestLogger(t) opts.Logger = test.NewTestLogger(t)
opts.MaxChannelConsumers = 1 opts.MaxChannelConsumers = 1
tcpAddr, _, nsqd := mustStartNSQD(opts) tcpAddr, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath) defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit() defer nsqd.Exit()
conn, _ := mustConnectNSQD(tcpAddr) conn, _ := mustConnectNSQD(tcpAddr)
defer conn.Close() defer conn.Close()
topicName := "test_max_channel_consumers" + strconv.Itoa(int(time.Now().U nix())) topicName := "test_max_channel_consumers" + strconv.Itoa(int(time.Now().U nix()))
topic := nsqd.GetTopic(topicName) topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel") channel := topic.GetChannel("channel")
client1 := newClientV2(1, conn, &context{nsqd}) client1 := newClientV2(1, conn, nsqd)
client1.SetReadyCount(25) client1.SetReadyCount(25)
err := channel.AddClient(client1.ID, client1) err := channel.AddClient(client1.ID, client1)
test.Equal(t, err, nil) test.Equal(t, err, nil)
client2 := newClientV2(2, conn, &context{nsqd}) client2 := newClientV2(2, conn, nsqd)
client2.SetReadyCount(25) client2.SetReadyCount(25)
err = channel.AddClient(client2.ID, client2) err = channel.AddClient(client2.ID, client2)
test.NotEqual(t, err, nil) test.NotEqual(t, err, nil)
} }
func TestChannelHealth(t *testing.T) { func TestChannelHealth(t *testing.T) {
opts := NewOptions() opts := NewOptions()
opts.Logger = test.NewTestLogger(t) opts.Logger = test.NewTestLogger(t)
opts.MemQueueSize = 2 opts.MemQueueSize = 2
 End of changes. 5 change blocks. 
5 lines changed or deleted 5 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)