irods  4.2.8
About: iRODS (the integrated Rule Oriented Data System) is a distributed data-management system for creating data grids, digital libraries, persistent archives, and real-time data systems.
  Fossies Dox: irods-4.2.8.tar.gz  ("unofficial" and yet experimental doxygen-generated source code documentation)  

connection_pool.cpp
Go to the documentation of this file.
1 #include "connection_pool.hpp"
2 
3 #include "irods_query.hpp"
4 #include "thread_pool.hpp"
5 
6 #include <stdexcept>
7 #include <thread>
8 
9 namespace irods {
10 
12 {
13  if (pool_ && uninitialized_index != index_) {
15  }
16 }
17 
19  : pool_{_other.pool_}
20  , conn_{_other.conn_}
21  , index_{_other.index_}
22 {
23  _other.pool_ = nullptr;
24  _other.conn_ = nullptr;
25  _other.index_ = uninitialized_index;
26 }
27 
29 {
30  pool_ = _other.pool_;
31  conn_ = _other.conn_;
32  index_ = _other.index_;
33 
34  _other.pool_ = nullptr;
35  _other.conn_ = nullptr;
36  _other.index_ = uninitialized_index;
37 
38  return *this;
39 }
40 
41 connection_pool::connection_proxy::operator bool() const noexcept
42 {
43  return nullptr != conn_;
44 }
45 
46 connection_pool::connection_proxy::operator rcComm_t&() const
47 {
48  if (!conn_) {
49  throw std::runtime_error{"Invalid connection object"};
50  }
51 
52  return *conn_;
53 }
54 
55 connection_pool::connection_proxy::operator rcComm_t*() const noexcept
56 {
57  return conn_;
58 }
59 
61 {
62  pool_->release_connection(index_);
63  auto conn = conn_;
64  conn_ = nullptr;
65  return conn;
66 }
67 
69  rcComm_t& _conn,
70  int _index) noexcept
71  : pool_{&_pool}
72  , conn_{&_conn}
73  , index_{_index}
74 {
75 }
76 
78  const std::string& _host,
79  const int _port,
80  const std::string& _username,
81  const std::string& _zone,
82  const int _refresh_time)
83  : host_{_host}
84  , port_{_port}
85  , username_{_username}
86  , zone_{_zone}
87  , refresh_time_(_refresh_time)
88  , conn_ctxs_(_size)
89 {
90  if (_size < 1) {
91  throw std::runtime_error{"invalid connection pool size"};
92  }
93 
94  // Always initialize the first connection to guarantee that the
95  // network plugin is loaded. This guarantees that asynchronous calls
96  // to rcConnect do not cause a segfault.
97  create_connection(0,
98  [] { throw std::runtime_error{"connect error"}; },
99  [] { throw std::runtime_error{"client login error"}; });
100 
101  // If the size of the pool is one, then return immediately.
102  if (_size == 1) {
103  return;
104  }
105 
106  // Initialize the rest of the connection pool asynchronously.
107 
108  irods::thread_pool thread_pool{std::min<int>(_size, std::thread::hardware_concurrency())};
109 
110  std::atomic<bool> connect_error{};
111  std::atomic<bool> login_error{};
112 
113  for (int i = 1; i < _size; ++i) {
114  irods::thread_pool::post(thread_pool, [this, i, &connect_error, &login_error] {
115  if (connect_error.load() || login_error.load()) {
116  return;
117  }
118 
119  create_connection(i,
120  [&connect_error] { connect_error.store(true); },
121  [&login_error] { login_error.store(true); });
122  });
123  }
124 
125  thread_pool.join();
126 
127  if (connect_error.load()) {
128  throw std::runtime_error{"connect error"};
129  }
130 
131  if (login_error.load()) {
132  throw std::runtime_error{"client login error"};
133  }
134 }
135 
137  std::function<void()> _on_connect_error,
138  std::function<void()> _on_login_error)
139 {
140  auto& ctx = conn_ctxs_[_index];
141  ctx.creation_time = std::time(nullptr);
142  ctx.conn.reset(rcConnect(host_.c_str(),
143  port_,
144  username_.c_str(),
145  zone_.c_str(),
146  NO_RECONN,
147  &ctx.error));
148 
149  if (!ctx.conn) {
150  _on_connect_error();
151  return;
152  }
153 
154  if (clientLogin(ctx.conn.get()) != 0) {
155  _on_login_error();
156  }
157 }
158 
160 {
161  auto& ctx = conn_ctxs_[_index];
162 
163  if (!ctx.conn) {
164  return false;
165  }
166 
167  try {
168  query<rcComm_t>{ctx.conn.get(), "select ZONE_NAME where ZONE_TYPE = 'local'"};
169  if (std::time(nullptr) - ctx.creation_time > refresh_time_) {
170  return false;
171  }
172  }
173  catch (const std::exception&) {
174  return false;
175  }
176 
177  return true;
178 }
179 
181 {
182  auto& ctx = conn_ctxs_[_index];
183  ctx.error = {};
184 
185  if (ctx.refresh) {
186  ctx.refresh = false;
187  ctx.conn.release();
188  }
189 
190  if (!verify_connection(_index)) {
191  create_connection(_index,
192  [] { throw std::runtime_error{"connect error"}; },
193  [] { throw std::runtime_error{"client login error"}; });
194  }
195 
196  return ctx.conn.get();
197 }
198 
200 {
201  for (int i = 0;; i = ++i % conn_ctxs_.size()) {
202  std::unique_lock<std::mutex> lock{conn_ctxs_[i].mutex, std::defer_lock};
203 
204  if (lock.try_lock()) {
205  if (!conn_ctxs_[i].in_use.load()) {
206  conn_ctxs_[i].in_use.store(true);
207  return {*this, *refresh_connection(i), i};
208  }
209  }
210  }
211 }
212 
214 {
215  conn_ctxs_[_index].in_use.store(false);
216 }
217 
219 {
220  conn_ctxs_[_index].refresh = true;
221 }
222 
223 std::shared_ptr<connection_pool> make_connection_pool(int size)
224 {
225  rodsEnv env{};
226  _getRodsEnv(env);
227  return std::make_shared<irods::connection_pool>(
228  size,
229  env.rodsHost,
230  env.rodsPort,
231  env.rodsUserName,
232  env.rodsZone,
233  env.irodsConnectionPoolRefreshTime);
234 }
235 
236 } // namespace irods
237 
rcComm_t
Definition: rcConnect.h:95
irods::connection_pool::refresh_time_
const int refresh_time_
Definition: connection_pool.hpp:89
irods::connection_pool::refresh_connection
rcComm_t * refresh_connection(int _index)
Definition: connection_pool.cpp:180
connection_pool.hpp
env
Definition: restructs.hpp:226
irods::connection_pool::conn_ctxs_
std::vector< connection_context > conn_ctxs_
Definition: connection_pool.hpp:90
irods::connection_pool::host_
const std::string host_
Definition: connection_pool.hpp:85
irods::thread_pool::post
static void post(thread_pool &_pool, Function &&_func)
Definition: thread_pool.hpp:35
irods::connection_pool::connection_proxy::uninitialized_index
static constexpr int uninitialized_index
Definition: connection_pool.hpp:41
irods::connection_pool::username_
const std::string username_
Definition: connection_pool.hpp:87
irods::connection_pool::connection_proxy::index_
int index_
Definition: connection_pool.hpp:45
irods_query.hpp
irods::connection_pool::get_connection
connection_proxy get_connection()
Definition: connection_pool.cpp:199
irods.pypyodbc.lock
lock
Definition: pypyodbc.py:66
irods::connection_pool::connection_proxy::connection_proxy
connection_proxy(connection_proxy &&)
Definition: connection_pool.cpp:18
rcConnect
rcComm_t * rcConnect(const char *rodsHost, int rodsPort, const char *userName, const char *rodsZone, int reconnFlag, rErrMsg_t *errMsg)
Definition: rcConnect.cpp:30
irods
Definition: apiHandler.hpp:35
irods::connection_pool::zone_
const std::string zone_
Definition: connection_pool.hpp:88
irods::connection_pool::connection_proxy::release
rcComm_t * release()
Definition: connection_pool.cpp:60
_getRodsEnv
void _getRodsEnv(rodsEnv &rodsEnvArg)
Definition: getRodsEnv.cpp:121
irods::connection_pool::return_connection
void return_connection(int _index)
Definition: connection_pool.cpp:213
irods::connection_pool::create_connection
void create_connection(int _index, std::function< void()> _on_connect_error, std::function< void()> _on_login_error)
Definition: connection_pool.cpp:136
irods::connection_pool::release_connection
void release_connection(int _index)
Definition: connection_pool.cpp:218
irods::connection_pool::connection_proxy::operator=
connection_proxy & operator=(connection_proxy &&)
Definition: connection_pool.cpp:28
irods::connection_pool::connection_proxy::pool_
connection_pool * pool_
Definition: connection_pool.hpp:43
irods::make_connection_pool
std::shared_ptr< connection_pool > make_connection_pool(int size=1)
Definition: connection_pool.cpp:223
irods::connection_pool::verify_connection
bool verify_connection(int _index)
Definition: connection_pool.cpp:159
irods::connection_pool::connection_proxy
Definition: connection_pool.hpp:22
irods::thread_pool
Definition: thread_pool.hpp:11
size
long long size
Definition: filesystem.cpp:102
irods::query
Definition: irods_query.hpp:25
clientLogin
int clientLogin(rcComm_t *conn, const char *_context, const char *_scheme_override)
Definition: clientLogin.cpp:222
irods::connection_pool::connection_proxy::~connection_proxy
~connection_proxy()
Definition: connection_pool.cpp:11
thread_pool.hpp
rodsEnv
Definition: getRodsEnv.h:8
irods::connection_pool::port_
const int port_
Definition: connection_pool.hpp:86
NO_RECONN
#define NO_RECONN
Definition: rcConnect.h:34
irods::connection_pool
Definition: connection_pool.hpp:16
irods::connection_pool::connection_pool
connection_pool(int _size, const std::string &_host, const int _port, const std::string &_username, const std::string &_zone, const int _refresh_time)
Definition: connection_pool.cpp:77