"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java" between
elasticsearch-6.8.2-src.tar.gz and elasticsearch-6.8.3-src.tar.gz

About: elasticsearch is a Distributed, RESTful, Search Engine built on top of Apache Lucene. Source package (GitHub).

MockDiskUsagesIT.java  (elasticsearch-6.8.2-src):MockDiskUsagesIT.java  (elasticsearch-6.8.3-src)
skipping to change at line 19 skipping to change at line 19
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, * Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an * software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the * KIND, either express or implied. See the License for the
* specific language governing permissions and limitations * specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.elasticsearch.cluster.routing.allocation.decider; package org.elasticsearch.cluster.routing.allocation.decider;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.MockInternalClusterInfoService; import org.elasticsearch.cluster.MockInternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import java.util.ArrayList; import java.nio.file.Path;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings
.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING;
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings
.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings
.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings
.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocat
ionDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
d;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class MockDiskUsagesIT extends ESIntegTestCase { public class MockDiskUsagesIT extends ESIntegTestCase {
@Override @Override
protected Collection<Class<? extends Plugin>> nodePlugins() { protected Collection<Class<? extends Plugin>> nodePlugins() {
// Use the mock internal cluster info service, which has fake-able disk return Collections.singletonList(MockInternalClusterInfoService.TestPlug
usages in.class);
return Arrays.asList(MockInternalClusterInfoService.TestPlugin.class); }
@Override
public Settings indexSettings() {
// ensure that indices do not use custom data paths
return Settings.builder().put(super.indexSettings()).putNull(IndexMetaDa
ta.SETTING_DATA_PATH).build();
}
private static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalByte
s, long freeBytes) {
return new FsInfo.Path(original.getPath(), original.getMount(), totalByt
es, freeBytes, freeBytes);
} }
public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception { public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception {
List<String> nodes = internalCluster().startNodes(3); for (int i = 0; i < 3; i++) {
// ensure that each node has a single data path
internalCluster().startNode(Settings.builder().put(Environment.PATH_
DATA_SETTING.getKey(), createTempDir()));
}
// Wait for all 3 nodes to be up final List<String> nodeIds = StreamSupport.stream(client().admin().clust
assertBusy(() -> { er().prepareState().get().getState()
NodesStatsResponse resp = client().admin().cluster().prepareNodesSta .getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).co
ts().get(); llect(Collectors.toList());
assertThat(resp.getNodes().size(), equalTo(3));
}); final MockInternalClusterInfoService clusterInfoService = getMockInterna
lClusterInfoService();
clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
clusterInfoService.onMaster();
// Start with all nodes at 50% usage // prevent any effects from in-flight recoveries, since we are only simu
final MockInternalClusterInfoService cis = (MockInternalClusterInfoServi lating a 100-byte disk
ce) clusterInfoService.shardSizeFunction = shardRouting -> 0L;
internalCluster().getInstance(ClusterInfoService.class, internal
Cluster().getMasterName()); // start with all nodes below the watermark
cis.setUpdateFrequency(TimeValue.timeValueMillis(200)); clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> se
cis.onMaster(); tDiskUsage(fsInfoPath, 100, between(10, 100));
cis.setN1Usage(nodes.get(0), new DiskUsage(nodes.get(0), "n1", "/dev/nul
l", 100, 50));
cis.setN2Usage(nodes.get(1), new DiskUsage(nodes.get(1), "n2", "/dev/nul
l", 100, 50));
cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), "n3", "/dev/nul
l", 100, 50));
final boolean watermarkBytes = randomBoolean(); // we have to consistent ly use bytes or percentage for the disk watermark settings final boolean watermarkBytes = randomBoolean(); // we have to consistent ly use bytes or percentage for the disk watermark settings
client().admin().cluster().prepareUpdateSettings().setTransientSettings( assertAcked(client().admin().cluster().prepareUpdateSettings().setTransi
Settings.builder() entSettings(Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_W .put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(),
ATERMARK_SETTING.getKey(), watermarkBytes ? "20b" : "80%") watermarkBytes ? "10b" : "90%")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_ .put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey()
WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%") , watermarkBytes ? "10b" : "90%")
.put( .put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.g
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FL etKey(), watermarkBytes ? "0b" : "100%")
OOD_STAGE_WATERMARK_SETTING.getKey(), .put(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "
watermarkBytes ? "0b" : "100%") 1ms")));
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_IN
TERVAL_SETTING.getKey(), "1ms")).get();
// Create an index with 10 shards so we can check allocation for it // Create an index with 10 shards so we can check allocation for it
prepareCreate("test").setSettings(Settings.builder() assertAcked(prepareCreate("test").setSettings(Settings.builder().put("nu
.put("number_of_shards", 10) mber_of_shards", 10).put("number_of_replicas", 0)));
.put("number_of_replicas", 0)
.put("index.routing.allocation.exclude._name", "")).get();
ensureGreen("test"); ensureGreen("test");
// Block until the "fake" cluster info is retrieved at least once
assertBusy(() -> { assertBusy(() -> {
ClusterInfo info = cis.getClusterInfo(); final Map<String, Integer> shardCountByNodeId = getShardCountByNodeI
logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsage d();
s().size()); assertThat("node0 has at least 3 shards", shardCountByNodeId.get(nod
assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterTha eIds.get(0)), greaterThanOrEqualTo(3));
n(0)); assertThat("node1 has at least 3 shards", shardCountByNodeId.get(nod
eIds.get(1)), greaterThanOrEqualTo(3));
assertThat("node2 has at least 3 shards", shardCountByNodeId.get(nod
eIds.get(2)), greaterThanOrEqualTo(3));
});
// move node2 above high watermark
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> se
tDiskUsage(fsInfoPath, 100,
discoveryNode.getId().equals(nodeIds.get(2)) ? between(0, 9) : betwe
en(10, 100));
logger.info("--> waiting for shards to relocate off node [{}]", nodeIds.
get(2));
assertBusy(() -> {
final Map<String, Integer> shardCountByNodeId = getShardCountByNodeI
d();
assertThat("node0 has 5 shards", shardCountByNodeId.get(nodeIds.get(
0)), equalTo(5));
assertThat("node1 has 5 shards", shardCountByNodeId.get(nodeIds.get(
1)), equalTo(5));
assertThat("node2 has 0 shards", shardCountByNodeId.get(nodeIds.get(
2)), equalTo(0));
});
// move all nodes below watermark again
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> se
tDiskUsage(fsInfoPath, 100, between(10, 100));
logger.info("--> waiting for shards to rebalance back onto node [{}]", n
odeIds.get(2));
assertBusy(() -> {
final Map<String, Integer> shardCountByNodeId = getShardCountByNodeI
d();
assertThat("node0 has at least 3 shards", shardCountByNodeId.get(nod
eIds.get(0)), greaterThanOrEqualTo(3));
assertThat("node1 has at least 3 shards", shardCountByNodeId.get(nod
eIds.get(1)), greaterThanOrEqualTo(3));
assertThat("node2 has at least 3 shards", shardCountByNodeId.get(nod
eIds.get(2)), greaterThanOrEqualTo(3));
}); });
}
final List<String> realNodeNames = new ArrayList<>(); public void testOnlyMovesEnoughShardsToDropBelowHighWatermark() throws Excep
ClusterStateResponse resp = client().admin().cluster().prepareState().ge tion {
t(); for (int i = 0; i < 3; i++) {
Iterator<RoutingNode> iter = resp.getState().getRoutingNodes().iterator( // ensure that each node has a single data path
); internalCluster().startNode(Settings.builder().put(Environment.PATH_
while (iter.hasNext()) { DATA_SETTING.getKey(), createTempDir()));
RoutingNode node = iter.next();
realNodeNames.add(node.nodeId());
logger.info("--> node {} has {} shards",
node.nodeId(), resp.getState().getRoutingNodes().node(node.n
odeId()).numberOfOwningShards());
} }
// Update the disk usages so one node has now passed the high watermark final MockInternalClusterInfoService clusterInfoService = getMockInterna
cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", " lClusterInfoService();
_na_", 100, 50));
cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", " final AtomicReference<ClusterState> masterAppliedClusterState = new Atom
_na_", 100, 50)); icReference<>();
cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", " internalCluster().getCurrentMasterNodeInstance(ClusterService.class).add
_na_", 100, 0)); // nothing free on node3 Listener(event -> {
masterAppliedClusterState.set(event.state());
clusterInfoService.refresh(); // so that a subsequent reroute se
es disk usage according to the current state
});
// shards are 1 byte large
clusterInfoService.shardSizeFunction = shardRouting -> 1L;
// start with all nodes below the watermark
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> se
tDiskUsage(fsInfoPath, 1000L, 1000L);
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransi
entSettings(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(),
"90%")
.put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey()
, "90%")
.put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.g
etKey(), "100%")
.put(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "
1ms")));
// Retrieve the count of shards on each node final List<String> nodeIds = StreamSupport.stream(client().admin().clust
final Map<String, Integer> nodesToShardCount = new HashMap<>(); er().prepareState().get().getState()
.getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).co
llect(Collectors.toList());
assertAcked(prepareCreate("test").setSettings(Settings.builder().put("nu
mber_of_shards", 6).put("number_of_replicas", 0)));
ensureGreen("test");
assertBusy(() -> { assertBusy(() -> {
ClusterStateResponse resp12 = client().admin().cluster().prepareStat final Map<String, Integer> shardCountByNodeId = getShardCountByNodeI
e().get(); d();
Iterator<RoutingNode> iter12 = resp12.getState().getRoutingNodes().i assertThat("node0 has 2 shards", shardCountByNodeId.get(nodeIds.get(
terator(); 0)), equalTo(2));
while (iter12.hasNext()) { assertThat("node1 has 2 shards", shardCountByNodeId.get(nodeIds.get(
RoutingNode node = iter12.next(); 1)), equalTo(2));
logger.info("--> node {} has {} shards", assertThat("node2 has 2 shards", shardCountByNodeId.get(nodeIds.get(
node.nodeId(), resp12.getState().getRoutingNodes().node( 2)), equalTo(2));
node.nodeId()).numberOfOwningShards()); });
nodesToShardCount.put(node.nodeId(), resp12.getState().getRoutin
gNodes().node(node.nodeId()).numberOfOwningShards()); // disable rebalancing, or else we might move too many shards away and t
} hen rebalance them back again
assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames assertAcked(client().admin().cluster().prepareUpdateSettings().setTransi
.get(0)), equalTo(5)); entSettings(Settings.builder()
assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames .put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAlloca
.get(1)), equalTo(5)); tionDecider.Rebalance.NONE)));
assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames
.get(2)), equalTo(0)); // node2 suddenly has 99 bytes free, less than 10%, but moving one shard
}); is enough to bring it up to 100 bytes free:
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> se
// Update the disk usages so one node is now back under the high waterma tDiskUsage(fsInfoPath, 1000L,
rk discoveryNode.getId().equals(nodeIds.get(2))
cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", " ? 101L - masterAppliedClusterState.get().getRoutingNodes().node(
_na_", 100, 50)); nodeIds.get(2)).numberOfOwningShards()
cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", " : 1000L);
_na_", 100, 50));
cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", " clusterInfoService.refresh();
_na_", 100, 50)); // node3 has free space now
logger.info("--> waiting for shards to relocate off node [{}]", nodeIds.
// Retrieve the count of shards on each node get(2));
nodesToShardCount.clear();
// must wait for relocation to start
assertBusy(() -> { assertBusy(() -> assertThat("node2 has 1 shard", getShardCountByNodeId()
ClusterStateResponse resp1 = client().admin().cluster().prepareState .get(nodeIds.get(2)), equalTo(1)));
().get();
Iterator<RoutingNode> iter1 = resp1.getState().getRoutingNodes().ite // ensure that relocations finished without moving any more shards
rator(); ensureGreen("test");
while (iter1.hasNext()) { assertThat("node2 has 1 shard", getShardCountByNodeId().get(nodeIds.get(
RoutingNode node = iter1.next(); 2)), equalTo(1));
logger.info("--> node {} has {} shards", }
node.nodeId(), resp1.getState().getRoutingNodes().node(n
ode.nodeId()).numberOfOwningShards()); public void testDoesNotExceedLowWatermarkWhenRebalancing() throws Exception
nodesToShardCount.put(node.nodeId(), resp1.getState().getRouting {
Nodes().node(node.nodeId()).numberOfOwningShards()); for (int i = 0; i < 3; i++) {
// ensure that each node has a single data path
internalCluster().startNode(Settings.builder().put(Environment.PATH_
DATA_SETTING.getKey(), createTempDir()));
}
final AtomicReference<ClusterState> masterAppliedClusterState = new Atom
icReference<>();
final MockInternalClusterInfoService clusterInfoService = getMockInterna
lClusterInfoService();
final List<String> nodeIds = StreamSupport.stream(client().admin().clust
er().prepareState().get().getState()
.getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).co
llect(Collectors.toList());
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).add
Listener(event -> {
assertThat(event.state().getRoutingNodes().node(nodeIds.get(2)).size
(), lessThanOrEqualTo(1));
masterAppliedClusterState.set(event.state());
clusterInfoService.refresh(); // so that a subsequent reroute sees d
isk usage according to the current state
});
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransi
entSettings(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(),
"85%")
.put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey()
, "100%")
.put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.g
etKey(), "100%")));
// shards are 1 byte large
clusterInfoService.shardSizeFunction = shardRouting -> 1L;
// node 2 only has space for one shard
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> se
tDiskUsage(fsInfoPath, 1000L,
discoveryNode.getId().equals(nodeIds.get(2))
? 150L - masterAppliedClusterState.get().getRoutingNodes().node(
nodeIds.get(2)).numberOfOwningShards()
: 1000L);
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put("number_of_shards", 6)
.put("number_of_replicas", 0)
.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getConcreteSe
ttingForNamespace("_id").getKey(), nodeIds.get(2))));
ensureGreen("test");
assertBusy(() -> {
final Map<String, Integer> shardCountByNodeId = getShardCountByNodeI
d();
assertThat("node0 has 3 shards", shardCountByNodeId.get(nodeIds.get(
0)), equalTo(3));
assertThat("node1 has 3 shards", shardCountByNodeId.get(nodeIds.get(
1)), equalTo(3));
assertThat("node2 has 0 shards", shardCountByNodeId.get(nodeIds.get(
2)), equalTo(0));
});
assertAcked(client().admin().indices().prepareUpdateSettings("test").set
Settings(Settings.builder()
.putNull(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getConcre
teSettingForNamespace("_id").getKey())));
logger.info("--> waiting for shards to relocate onto node [{}]", nodeIds
.get(2));
ensureGreen("test");
assertThat("node2 has 1 shard", getShardCountByNodeId().get(nodeIds.get(
2)), equalTo(1));
}
public void testMovesShardsOffSpecificDataPathAboveWatermark() throws Except
ion {
// start one node with two data paths
final Path pathOverWatermark = createTempDir();
final Settings.Builder twoPathSettings = Settings.builder();
if (randomBoolean()) {
twoPathSettings.putList(Environment.PATH_DATA_SETTING.getKey(), crea
teTempDir().toString(), pathOverWatermark.toString());
} else {
twoPathSettings.putList(Environment.PATH_DATA_SETTING.getKey(), path
OverWatermark.toString(), createTempDir().toString());
}
internalCluster().startNode(twoPathSettings);
final String nodeWithTwoPaths = client().admin().cluster().prepareNodesI
nfo().get().getNodes().get(0).getNode().getId();
// other two nodes have one data path each
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA
_SETTING.getKey(), createTempDir()));
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA
_SETTING.getKey(), createTempDir()));
final MockInternalClusterInfoService clusterInfoService = getMockInterna
lClusterInfoService();
// prevent any effects from in-flight recoveries, since we are only simu
lating a 100-byte disk
clusterInfoService.shardSizeFunction = shardRouting -> 0L;
// start with all paths below the watermark
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> se
tDiskUsage(fsInfoPath, 100, between(10, 100));
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransi
entSettings(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(),
"90%")
.put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey()
, "90%")
.put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.g
etKey(), "100%")));
final List<String> nodeIds = StreamSupport.stream(client().admin().clust
er().prepareState().get().getState()
.getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).co
llect(Collectors.toList());
assertAcked(prepareCreate("test").setSettings(Settings.builder().put("nu
mber_of_shards", 6).put("number_of_replicas", 0)));
ensureGreen("test");
{
final Map<String, Integer> shardCountByNodeId = getShardCountByNodeI
d();
assertThat("node0 has 2 shards", shardCountByNodeId.get(nodeIds.get(
0)), equalTo(2));
assertThat("node1 has 2 shards", shardCountByNodeId.get(nodeIds.get(
1)), equalTo(2));
assertThat("node2 has 2 shards", shardCountByNodeId.get(nodeIds.get(
2)), equalTo(2));
}
final long shardsOnGoodPath = Arrays.stream(client().admin().indices().p
repareStats("test").get().getShards())
.filter(shardStats -> shardStats.getShardRouting().currentNodeId().e
quals(nodeWithTwoPaths)
&& shardStats.getDataPath().startsWith(pathOverWatermark.toStrin
g()) == false).count();
logger.info("--> shards on good path: [{}]", shardsOnGoodPath);
// one of the paths on node0 suddenly exceeds the high watermark
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> se
tDiskUsage(fsInfoPath, 100L,
fsInfoPath.getPath().startsWith(pathOverWatermark.toString()) ? betw
een(0, 9) : between(10, 100));
// disable rebalancing, or else we might move shards back onto the over-
full path since we're not faking that
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransi
entSettings(Settings.builder()
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAlloca
tionDecider.Rebalance.NONE)));
clusterInfoService.refresh();
logger.info("--> waiting for shards to relocate off path [{}]", pathOver
Watermark);
assertBusy(() -> {
for (final ShardStats shardStats : client().admin().indices().prepar
eStats("test").get().getShards()) {
assertThat(shardStats.getDataPath(), not(startsWith(pathOverWate
rmark.toString())));
} }
assertThat("node1 has at least 3 shards", nodesToShardCount.get(real
NodeNames.get(0)), greaterThanOrEqualTo(3));
assertThat("node2 has at least 3 shards", nodesToShardCount.get(real
NodeNames.get(1)), greaterThanOrEqualTo(3));
assertThat("node3 has at least 3 shards", nodesToShardCount.get(real
NodeNames.get(2)), greaterThanOrEqualTo(3));
}); });
ensureGreen("test");
for (final ShardStats shardStats : client().admin().indices().prepareSta
ts("test").get().getShards()) {
assertThat(shardStats.getDataPath(), not(startsWith(pathOverWatermar
k.toString())));
}
assertThat("should not have moved any shards off of the path that wasn't
too full",
Arrays.stream(client().admin().indices().prepareStats("test").get().
getShards())
.filter(shardStats -> shardStats.getShardRouting().currentNodeId().e
quals(nodeWithTwoPaths)
&& shardStats.getDataPath().startsWith(pathOverWatermark.toStrin
g()) == false).count(), equalTo(shardsOnGoodPath));
}
private Map<String, Integer> getShardCountByNodeId() {
final Map<String, Integer> shardCountByNodeId = new HashMap<>();
final ClusterState clusterState = client().admin().cluster().prepareStat
e().get().getState();
for (final RoutingNode node : clusterState.getRoutingNodes()) {
logger.info("----> node {} has {} shards",
node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()
).numberOfOwningShards());
shardCountByNodeId.put(node.nodeId(), clusterState.getRoutingNodes()
.node(node.nodeId()).numberOfOwningShards());
}
return shardCountByNodeId;
} }
private MockInternalClusterInfoService getMockInternalClusterInfoService() {
return (MockInternalClusterInfoService) internalCluster().getCurrentMast
erNodeInstance(ClusterInfoService.class);
}
} }
 End of changes. 28 change blocks. 
125 lines changed or deleted 418 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)