23 #include <condition_variable>
27 #include <boost/filesystem/convenience.hpp>
28 #include <boost/filesystem/operations.hpp>
29 #include <boost/format.hpp>
32 static std::atomic_bool re_server_terminated{};
40 j = atoi(sql_log_level);
54 openlog(
"irodsDelayServer", LOG_ODELAY | LOG_PID, LOG_DAEMON);
64 const std::vector<std::string>& exec_info) {
65 namespace bfs = boost::filesystem;
70 const auto& rule_exec_id = exec_info[0].c_str();
71 const auto& rei_file_path = exec_info[2].c_str();
72 bfs::path
p{rei_file_path};
75 THROW(
status, (boost::format(
"stat error for rei file [%s], id [%s]") %
76 rei_file_path % rule_exec_id).str());
79 rule_exec_submit_inp.packedReiAndArgBBuf->len =
static_cast<int>(bfs::file_size(
p));
80 rule_exec_submit_inp.packedReiAndArgBBuf->buf = malloc(rule_exec_submit_inp.packedReiAndArgBBuf->len + 1);
82 int fd = open(rei_file_path, O_RDONLY, 0);
85 THROW(
status, (boost::format(
"open error for rei file [%s]") %
86 rei_file_path).str());
89 memset(rule_exec_submit_inp.packedReiAndArgBBuf->buf, 0,
90 rule_exec_submit_inp.packedReiAndArgBBuf->len + 1);
91 ssize_t
status{read(fd, rule_exec_submit_inp.packedReiAndArgBBuf->buf,
92 rule_exec_submit_inp.packedReiAndArgBBuf->len)};
94 if (rule_exec_submit_inp.packedReiAndArgBBuf->len !=
static_cast<int>(
status)) {
97 rei_file_path % rule_exec_submit_inp.packedReiAndArgBBuf->len %
status).str());
101 rei_file_path %
status).str());
107 rstrcpy(rule_exec_submit_inp.userName, exec_info[3].c_str(),
NAME_LEN);
108 rstrcpy(rule_exec_submit_inp.exeAddress, exec_info[4].c_str(),
NAME_LEN);
110 rstrcpy(rule_exec_submit_inp.exeFrequency, exec_info[6].c_str(),
NAME_LEN);
111 rstrcpy(rule_exec_submit_inp.priority, exec_info[7].c_str(),
NAME_LEN);
112 rstrcpy(rule_exec_submit_inp.lastExecTime, exec_info[8].c_str(),
NAME_LEN);
113 rstrcpy(rule_exec_submit_inp.exeStatus, exec_info[9].c_str(),
NAME_LEN);
114 rstrcpy(rule_exec_submit_inp.estimateExeTime, exec_info[10].c_str(),
NAME_LEN);
115 rstrcpy(rule_exec_submit_inp.notificationAddr, exec_info[11].c_str(),
NAME_LEN);
117 return rule_exec_submit_inp;
120 int update_entry_for_repeat(
125 _exec_status = _exec_status > 0 ? 0 : _exec_status;
131 snprintf(current_time,
NAME_LEN,
"%ld", std::time(
nullptr));
133 const auto delete_rule_exec_info{[&_comm, &_inp]() ->
int {
139 "%s:%d - rcRuleExecDel failed %d for id %s") %
140 __FUNCTION__ % __LINE__ %
status % rule_exec_del_inp.ruleExecId).str());
145 const auto update_rule_exec_info = [&](
const bool repeat_rule) ->
int {
157 "%s:%d - rcRuleExecMod failed %d for id %s") %
158 __FUNCTION__ % __LINE__ %
status % rule_exec_mod_inp.ruleId).str());
160 if (rule_exec_mod_inp.condInput.len > 0) {
167 switch(repeat_status) {
170 return update_rule_exec_info(
false);
173 return !_exec_status ? delete_rule_exec_info() : update_rule_exec_info(false);
176 return delete_rule_exec_info();
179 return update_rule_exec_info(
true);
182 return !_exec_status ? delete_rule_exec_info() : update_rule_exec_info(true);
185 "%s:%d - getNextRepeatTime returned unknown value %d for id %s") %
186 __FUNCTION__ % __LINE__ % repeat_status % _inp.
ruleExecId).str());
187 return repeat_status;
196 exec_rule.packed_rei_.len = packed_rei_len;
199 size_t rule_len = strlen(_inp.
ruleName);
200 exec_rule.rule_text_.buf = (
char*)malloc(rule_len+1);
213 (
void**)&rei_and_arg,
219 "[%s] - unpackStruct error. status [%d]",
226 _comm.
proxyUser = *rei_and_arg->rei->uoic;
229 exec_rule.params_ = rei_and_arg->rei->msParamArray;
232 if(rei_and_arg->rei) {
233 if(rei_and_arg->rei->rsComm) {
234 free(rei_and_arg->rei->rsComm);
242 if (re_server_terminated) {
244 (boost::format(
"Rule [%s] completed with status [%d] but RE server was terminated.") %
249 return update_entry_for_repeat(_comm, _inp,
status);
253 "ruleExec of %s: %s failed.",
260 "rcRuleExecDel failed for %s, stat=%d",
267 (boost::format(
"rcRuleExecDel failed again for %s, stat=%d - exiting") %
279 rodsLog(
LOG_ERROR,
"Failed deleting rule exec %s from catalog", rule_exec_del_inp.ruleExecId);
287 const std::vector<std::string>& rule_info)
289 if (re_server_terminated) {
296 freeBBuf(rule_exec_submit_inp.packedReiAndArgBBuf);
300 rule_exec_submit_inp = fill_rule_exec_submit_inp(rule_info);
308 int status = run_rule_exec(conn_pool->get_connection(), rule_exec_submit_inp);
311 rule_exec_submit_inp.ruleExecId,
status);
313 }
catch(
const std::exception&
e) {
315 rule_exec_submit_inp.ruleExecId,
e.what());
318 if (!re_server_terminated) {
319 queue.
dequeue_rule(std::string(rule_exec_submit_inp.ruleExecId));
323 auto make_delay_queue_query_processor(
328 const auto now = std::to_string(std::time(
nullptr));
329 const auto qstr = (boost::format(
330 "SELECT RULE_EXEC_ID, \
332 RULE_EXEC_REI_FILE_PATH, \
333 RULE_EXEC_USER_NAME, \
336 RULE_EXEC_FREQUENCY, \
337 RULE_EXEC_PRIORITY, \
338 RULE_EXEC_LAST_EXE_TIME, \
340 RULE_EXEC_ESTIMATED_EXE_TIME, \
341 RULE_EXEC_NOTIFICATION_ADDR \
342 WHERE RULE_EXEC_TIME <= '%s'") % now).str();
343 const auto job = [&](
const result_row& result) ->
void
345 const auto& rule_id = result[0];
350 (boost::format(
"Enqueueing rule [%s]")
354 execute_rule(queue, result);
362 static std::condition_variable term_cv;
363 static std::mutex term_m;
364 const auto signal_exit_handler = [](
int signal) {
366 re_server_terminated =
true;
367 term_cv.notify_all();
369 signal(SIGINT, signal_exit_handler);
370 signal(SIGHUP, signal_exit_handler);
371 signal(SIGTERM, signal_exit_handler);
372 signal(SIGUSR1, signal_exit_handler);
374 auto log_fd = init_log();
379 const auto sleep_time = [] {
389 const auto go_to_sleep = [&sleep_time]() {
390 std::unique_lock<std::mutex> sleep_lock{term_m};
391 const auto until = std::chrono::system_clock::now() + std::chrono::seconds(sleep_time);
392 if (std::cv_status::no_timeout == term_cv.wait_until(sleep_lock, until)) {
397 const auto thread_count = [] {
410 while(!re_server_terminated) {
412 auto delay_queue_processor = make_delay_queue_query_processor(thread_pool, queue);
414 auto query_conn = query_conn_pool->get_connection();
415 auto future = delay_queue_processor.execute(thread_pool,
static_cast<rcComm_t&
>(query_conn));
416 auto errors = future.get();
417 if(errors.size() > 0) {
418 for(
const auto& [code, msg] : errors) {
420 (boost::format(
"executing delayed rule failed - [%d]::[%s]")
425 }
catch(
const std::exception&
e) {