v3_double_barrier_test.go (etcd-3.5.5) | : | v3_double_barrier_test.go (etcd-3.5.6) | ||
---|---|---|---|---|
skipping to change at line 18 | skipping to change at line 18 | |||
// | // | |||
// Unless required by applicable law or agreed to in writing, software | // Unless required by applicable law or agreed to in writing, software | |||
// distributed under the License is distributed on an "AS IS" BASIS, | // distributed under the License is distributed on an "AS IS" BASIS, | |||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
// See the License for the specific language governing permissions and | // See the License for the specific language governing permissions and | |||
// limitations under the License. | // limitations under the License. | |||
package recipes_test | package recipes_test | |||
import ( | import ( | |||
"context" | ||||
"sync" | ||||
"testing" | "testing" | |||
"time" | "time" | |||
"github.com/stretchr/testify/assert" | ||||
clientv3 "go.etcd.io/etcd/client/v3" | ||||
"go.etcd.io/etcd/client/v3/concurrency" | "go.etcd.io/etcd/client/v3/concurrency" | |||
recipe "go.etcd.io/etcd/client/v3/experimental/recipes" | recipe "go.etcd.io/etcd/client/v3/experimental/recipes" | |||
"go.etcd.io/etcd/tests/v3/integration" | "go.etcd.io/etcd/tests/v3/integration" | |||
) | ) | |||
func TestDoubleBarrier(t *testing.T) { | func TestDoubleBarrier(t *testing.T) { | |||
integration.BeforeTest(t) | integration.BeforeTest(t) | |||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) | clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) | |||
defer clus.Terminate(t) | defer clus.Terminate(t) | |||
skipping to change at line 100 | skipping to change at line 105 | |||
timerC = time.After(time.Duration(waiters*100) * time.Millisecond) | timerC = time.After(time.Duration(waiters*100) * time.Millisecond) | |||
for i := 0; i < waiters-1; i++ { | for i := 0; i < waiters-1; i++ { | |||
select { | select { | |||
case <-timerC: | case <-timerC: | |||
t.Fatalf("barrier leave timed out") | t.Fatalf("barrier leave timed out") | |||
case donec <- struct{}{}: | case donec <- struct{}{}: | |||
} | } | |||
} | } | |||
} | } | |||
func TestDoubleBarrierTooManyClients(t *testing.T) { | ||||
integration.BeforeTest(t) | ||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) | ||||
defer clus.Terminate(t) | ||||
waiters := 10 | ||||
session, err := concurrency.NewSession(clus.RandClient()) | ||||
if err != nil { | ||||
t.Error(err) | ||||
} | ||||
defer session.Orphan() | ||||
b := recipe.NewDoubleBarrier(session, "test-barrier", waiters) | ||||
donec := make(chan struct{}) | ||||
var ( | ||||
wgDone sync.WaitGroup // make sure all clients have finished t | ||||
he tasks | ||||
wgEntered sync.WaitGroup // make sure all clients have entered th | ||||
e double barrier | ||||
) | ||||
wgDone.Add(waiters) | ||||
wgEntered.Add(waiters) | ||||
for i := 0; i < waiters; i++ { | ||||
go func() { | ||||
defer wgDone.Done() | ||||
session, err := concurrency.NewSession(clus.RandClient()) | ||||
if err != nil { | ||||
t.Error(err) | ||||
} | ||||
defer session.Orphan() | ||||
bb := recipe.NewDoubleBarrier(session, "test-barrier", wa | ||||
iters) | ||||
if err := bb.Enter(); err != nil { | ||||
t.Errorf("could not enter on barrier (%v)", err) | ||||
} | ||||
wgEntered.Done() | ||||
<-donec | ||||
if err := bb.Leave(); err != nil { | ||||
t.Errorf("could not leave on barrier (%v)", err) | ||||
} | ||||
}() | ||||
} | ||||
// Wait until all clients have already entered the double barrier, so | ||||
// no any other client can enter the barrier. | ||||
wgEntered.Wait() | ||||
t.Log("Try to enter into double barrier") | ||||
if err := b.Enter(); err != recipe.ErrTooManyClients { | ||||
t.Errorf("Unexcepted error, expected: ErrTooManyClients, got: %v" | ||||
, err) | ||||
} | ||||
resp, err := clus.RandClient().Get(context.TODO(), "test-barrier/waiters" | ||||
, clientv3.WithPrefix()) | ||||
if err != nil { | ||||
t.Errorf("Unexpected error: %v", err) | ||||
} | ||||
// Make sure the extra `b.Enter()` did not create a new ephemeral key | ||||
assert.Equal(t, waiters, len(resp.Kvs)) | ||||
close(donec) | ||||
wgDone.Wait() | ||||
} | ||||
func TestDoubleBarrierFailover(t *testing.T) { | func TestDoubleBarrierFailover(t *testing.T) { | |||
integration.BeforeTest(t) | integration.BeforeTest(t) | |||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) | clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) | |||
defer clus.Terminate(t) | defer clus.Terminate(t) | |||
waiters := 10 | waiters := 10 | |||
donec := make(chan struct{}) | donec := make(chan struct{}) | |||
defer close(donec) | defer close(donec) | |||
End of changes. 3 change blocks. | ||||
0 lines changed or deleted | 71 lines changed or added |