12 #include "boost/format.hpp"
13 #include "boost/lexical_cast.hpp"
20 const std::string& _obj_path,
21 const std::string& _current_resc,
22 const std::string& _src_hier,
23 const std::string& _dst_hier,
24 const std::string& _src_resc,
25 const std::string& _dst_resc,
30 parser.set_string( _src_hier );
32 parser.str( sub_hier, _current_resc );
39 data_obj_inp.createMode = _mode;
53 boost::format(
"Failed to replicate the data object [%s]" ) %
71 boost::format(
"getSqlResultByInx failed. column [%d] genquery_inp contents:\n%s\n\n possible iquest [%s]") %
77 struct ReplicationSourceInfo {
78 std::string object_path;
79 std::string resource_hierarchy;
84 ReplicationSourceInfo get_source_data_object_attributes(
87 const std::vector<leaf_bundle_t>& _leaf_bundles) {
97 std::stringstream cond_ss;
98 for (
auto& bun : _leaf_bundles) {
100 cond_ss <<
"= '" <<
id <<
"' || ";
103 std::string cond_str = cond_ss.str().substr(0, cond_ss.str().size()-4);
114 const int status_rsGenQuery =
rsGenQuery(_comm, &genquery_inp_wrapped.
get(), &genquery_out_ptr_wrapped.
get());
115 if (status_rsGenQuery < 0) {
118 boost::format(
"rsGenQuery failed. genquery_inp contents:\n%s\n\n possible iquest [%s]") %
123 if (!genquery_out_ptr_wrapped.
get()) {
126 boost::format(
"rsGenQuery failed. genquery_inp contents:\n%s\n\n possible iquest [%s]") %
133 char *data_name = &data_name_result->
value[0];
136 char *coll_name = &coll_name_result->
value[0];
138 auto cast_genquery_result = [&genquery_inp_wrapped](
char *s) ->
rodsLong_t {
140 return boost::lexical_cast<rodsLong_t>(s);
141 }
catch (
const boost::bad_lexical_cast&) {
144 boost::format(
"boost::lexical_cast failed. tried to cast [%s]. genquery_inp contents:\n%s\n\n possible iquest [%s]") %
152 const rodsLong_t resc_id = cast_genquery_result(&resc_id_result->
value[0]);
155 const int data_mode = cast_genquery_result(&data_mode_result->
value[0]);
157 ReplicationSourceInfo ret;
162 boost::format(
"leaf_id_to_hier failed. resc id [%lld] genquery inp:\n%s") %
166 ret.data_mode = data_mode;
170 struct ReplicaAndRescId {
177 std::vector<ReplicaAndRescId> get_out_of_date_replicas_batch(
179 const std::vector<leaf_bundle_t>& _bundles,
180 const std::string& _invocation_timestamp,
181 const int _batch_size) {
185 if (_bundles.empty()) {
188 if (_batch_size <= 0) {
191 if (_invocation_timestamp.empty()) {
196 genquery_inp_wrapped.
get().
maxRows = _batch_size;
198 std::stringstream cond_ss;
199 for (
auto& bun : _bundles) {
200 for (
auto id : bun) {
201 cond_ss <<
"= '" <<
id <<
"' || ";
204 const std::string cond_str = cond_ss.str().substr(0, cond_ss.str().size()-4);
207 const std::string timestamp_str =
"<= '" + _invocation_timestamp +
"'";
214 const int status_rsGenQuery =
rsGenQuery(_comm, &genquery_inp_wrapped.
get(), &genquery_out_ptr_wrapped.
get());
216 std::vector<ReplicaAndRescId> ret;
219 }
else if (status_rsGenQuery < 0) {
222 boost::format(
"rsGenQuery failed. genquery_inp contents:\n%s\npossible iquest [%s]") %
225 }
else if (
nullptr == genquery_out_ptr_wrapped.
get()) {
228 boost::format(
"rsGenQuery failed. genquery_inp contents:\n%s\npossible iquest [%s]") %
237 ret.reserve(genquery_out_ptr_wrapped.
get()->
rowCnt);
238 for (
int i=0; i<genquery_out_ptr_wrapped.
get()->rowCnt; ++i) {
239 ReplicaAndRescId repl_and_resc;
240 auto cast_genquery_result = [&genquery_inp_wrapped](
int i,
char *s) {
242 return boost::lexical_cast<rodsLong_t>(s);
243 }
catch (
const boost::bad_lexical_cast&) {
246 boost::format(
"boost::lexical_cast failed. index [%d]. tried to cast [%s]. genquery_inp contents:\n%s\npossible iquest [%s]") %
253 repl_and_resc.data_id = cast_genquery_result(i, &data_id_results ->
value[data_id_results ->len * i]);
254 repl_and_resc.replica_number = cast_genquery_result(i, &data_repl_num_results->
value[data_repl_num_results->
len * i]);
255 repl_and_resc.resource_id = cast_genquery_result(i, &data_resc_id_results ->
value[data_resc_id_results ->len * i]);
256 ret.push_back(repl_and_resc);
262 std::string get_child_name_that_is_ancestor_of_bundle(
263 const std::string& _resc_name,
278 err = parse.
next(_resc_name, ret);
285 std::string leaf_bundles_to_string(
286 const std::vector<leaf_bundle_t>& _leaf_bundles) {
287 std::stringstream ss;
288 for (
auto&
b : _leaf_bundles) {
299 void proc_results_for_rebalance(
301 const std::string& _parent_resc_name,
302 const std::string& _child_resc_name,
303 const size_t _bun_idx,
304 const std::vector<leaf_bundle_t> _bundles,
308 boost::format(
"null comm pointer. resource [%s]. child resource [%s]. bundle index [%d]. bundles [%s]") %
312 leaf_bundles_to_string(_bundles));
315 if (_data_ids_to_replicate.empty()) {
317 boost::format(
"empty data id list. resource [%s]. child resource [%s]. bundle index [%d]. bundles [%s]") %
321 leaf_bundles_to_string(_bundles));
325 for (
auto data_id_to_replicate : _data_ids_to_replicate) {
326 const ReplicationSourceInfo source_info = get_source_data_object_attributes(_ctx.
comm(), data_id_to_replicate, _bundles);
333 sub_parser.
set_string(source_info.resource_hierarchy);
334 std::string sub_hier;
335 sub_parser.
str(sub_hier, _parent_resc_name);
336 f_ptr->in_pdmo(sub_hier);
341 const size_t pos = source_info.resource_hierarchy.find(_parent_resc_name);
342 if (std::string::npos == pos) {
343 THROW(
SYS_INVALID_INPUT_PARAM, boost::format(
"missing repl name [%s] in source hier string [%s]") % _parent_resc_name % source_info.resource_hierarchy);
347 std::string src_frag = source_info.resource_hierarchy.substr(0, pos + _parent_resc_name.size() + 1);
348 parser.set_string(src_frag);
351 std::string root_resc;
352 parser.first_resc(root_resc);
357 if (!err_resolve.
ok()) {
358 THROW(err_resolve.
code(), boost::format(
"failed to resolve resource plugin. child resc [%s] parent resc [%s] bundle index [%d] bundles [%s] data id [%lld]. resolve message [%s]") %
362 leaf_bundles_to_string(_bundles) %
363 data_id_to_replicate %
368 std::string host_name;
378 if (!err_vote.
ok()) {
379 THROW(err_resolve.
code(), boost::format(
"failed to get dest hierarchy. child resc [%s] parent resc [%s] bundle index [%d] bundles [%s] data id [%lld]. vote message [%s]") %
383 leaf_bundles_to_string(_bundles) %
384 data_id_to_replicate %
389 std::string dst_hier;
391 rodsLog(
LOG_NOTICE,
"proc_results_for_rebalance: creating new replica for data id [%lld] from [%s] on [%s]", data_id_to_replicate, source_info.resource_hierarchy.c_str(), dst_hier.c_str());
394 source_info.object_path,
396 source_info.resource_hierarchy,
400 source_info.data_mode);
401 if (!err_rebalance.
ok()) {
402 if (first_rebalance_error.
ok()) {
403 first_rebalance_error = err_rebalance;
405 rodsLog(
LOG_ERROR,
"proc_results_for_rebalance: repl_for_rebalance failed. object path [%s] parent resc [%s] source hier [%s] dest hier [%s] root resc [%s] data mode [%d]",
406 source_info.object_path.c_str(), _parent_resc_name.c_str(), source_info.resource_hierarchy.c_str(), dst_hier.c_str(), root_resc.c_str(), source_info.data_mode);
414 if (!first_rebalance_error.
ok()) {
416 boost::format(
"proc_results_for_rebalance: repl_for_rebalance failed. child_resc [%s] parent resc [%s]. rebalance message [%s]") %
419 first_rebalance_error.
result());
428 const std::vector<leaf_bundle_t>& _leaf_bundles,
429 const int _batch_size,
430 const std::string& _invocation_timestamp,
431 const std::string& _resource_name) {
434 const std::vector<ReplicaAndRescId> replicas_to_update = get_out_of_date_replicas_batch(_ctx.
comm(), _leaf_bundles, _invocation_timestamp, _batch_size);
435 if (replicas_to_update.empty()) {
440 for (
const auto& replica_to_update : replicas_to_update) {
441 std::string destination_hierarchy;
443 if (!err_dst_hier.
ok()) {
445 boost::format(
"leaf_id_to_hier failed. data id [%lld]. replica number [%d] resource id [%lld]") %
446 replica_to_update.data_id %
447 replica_to_update.replica_number %
448 replica_to_update.resource_id);
451 ReplicationSourceInfo source_info = get_source_data_object_attributes(_ctx.
comm(), replica_to_update.data_id, _leaf_bundles);
454 if (!err_parser.
ok()) {
457 boost::format(
"set_string failed. resource hierarchy [%s]. object path [%s]") %
458 source_info.resource_hierarchy %
459 source_info.object_path);
461 std::string root_resc;
463 if (!err_first_resc.
ok()) {
465 err_first_resc.
code(),
466 boost::format(
"first_resc failed. resource hierarchy [%s]. object path [%s]") %
467 source_info.resource_hierarchy %
468 source_info.object_path);
471 rodsLog(
LOG_NOTICE,
"update_out_of_date_replicas: updating out-of-date replica for data id [%ji] from [%s] to [%s]",
472 static_cast<intmax_t
>(replica_to_update.data_id),
473 source_info.resource_hierarchy.c_str(),
474 destination_hierarchy.c_str());
475 const error err_repl = repl_for_rebalance(
477 source_info.object_path,
479 source_info.resource_hierarchy,
480 destination_hierarchy,
483 source_info.data_mode);
485 if (!err_repl.
ok()) {
486 if (first_error.
ok()) {
487 first_error = err_repl;
489 const error error_to_log =
PASS(err_repl);
494 "update_out_of_date_replicas: repl_for_rebalance failed with code [%ji] and message [%s]. object [%s] source hierarchy [%s] data id [%ji] destination repl num [%ji] destination hierarchy [%s]",
495 static_cast<intmax_t
>(err_repl.
code()), err_repl.
result().c_str(), source_info.object_path.c_str(), source_info.resource_hierarchy.c_str(),
496 static_cast<intmax_t
>(replica_to_update.data_id),
static_cast<intmax_t
>(replica_to_update.replica_number), destination_hierarchy.c_str());
499 if (!first_error.
ok()) {
508 const std::vector<leaf_bundle_t>& _leaf_bundles,
509 const int _batch_size,
510 const std::string& _invocation_timestamp,
511 const std::string& _resource_name) {
512 for (
size_t i=0; i<_leaf_bundles.size(); ++i) {
513 const std::string child_name = get_child_name_that_is_ancestor_of_bundle(_resource_name, _leaf_bundles[i]);
516 const int status_chlGetReplListForLeafBundles =
chlGetReplListForLeafBundles(_batch_size, i, &_leaf_bundles, &_invocation_timestamp, &data_ids_needing_new_replicas);
517 if (status_chlGetReplListForLeafBundles != 0) {
518 THROW(status_chlGetReplListForLeafBundles,
519 boost::format(
"failed to get data objects needing new replicas for resource [%s] bundle index [%d] bundles [%s]")
522 % leaf_bundles_to_string(_leaf_bundles));
524 if (data_ids_needing_new_replicas.empty()) {
528 proc_results_for_rebalance(_ctx, _resource_name, child_name, i, _leaf_bundles, data_ids_needing_new_replicas);