worker-connection.c (dovecot-2.3.16) | : | worker-connection.c (dovecot-2.3.17) | ||
---|---|---|---|---|
skipping to change at line 28 | skipping to change at line 28 | |||
#define INDEXER_PROTOCOL_MAJOR_VERSION 1 | #define INDEXER_PROTOCOL_MAJOR_VERSION 1 | |||
#define INDEXER_PROTOCOL_MINOR_VERSION 0 | #define INDEXER_PROTOCOL_MINOR_VERSION 0 | |||
#define INDEXER_MASTER_NAME "indexer-master-worker" | #define INDEXER_MASTER_NAME "indexer-master-worker" | |||
#define INDEXER_WORKER_NAME "indexer-worker-master" | #define INDEXER_WORKER_NAME "indexer-worker-master" | |||
struct worker_connection { | struct worker_connection { | |||
struct connection conn; | struct connection conn; | |||
int refcount; | ||||
indexer_status_callback_t *callback; | indexer_status_callback_t *callback; | |||
worker_available_callback_t *avail_callback; | ||||
char *request_username; | char *request_username; | |||
struct indexer_request *request; | struct indexer_request *request; | |||
unsigned int process_limit; | ||||
}; | }; | |||
static unsigned int worker_last_process_limit = 0; | ||||
static void worker_connection_call_callback(struct worker_connection *worker, | static void worker_connection_call_callback(struct worker_connection *worker, | |||
int percentage) | int percentage) | |||
{ | { | |||
if (worker->request != NULL) | if (worker->request != NULL) | |||
worker->callback(percentage, &worker->conn); | worker->callback(percentage, worker->request); | |||
if (percentage < 0 || percentage == 100) | if (percentage < 0 || percentage == 100) | |||
worker->request = NULL; | worker->request = NULL; | |||
} | } | |||
static void worker_connection_destroy(struct connection *conn) | void worker_connection_destroy(struct connection *conn) | |||
{ | { | |||
struct worker_connection *worker = | struct worker_connection *worker = | |||
container_of(conn, struct worker_connection, conn); | container_of(conn, struct worker_connection, conn); | |||
worker->request = NULL; | worker_connection_call_callback(worker, -1); | |||
i_free_and_null(worker->request_username); | i_free_and_null(worker->request_username); | |||
connection_deinit(conn); | connection_deinit(conn); | |||
} | ||||
void worker_connection_unref(struct connection **_conn) | ||||
{ | ||||
struct connection *conn = *_conn; | ||||
struct worker_connection *worker = | ||||
container_of(conn, struct worker_connection, conn); | ||||
i_assert(worker->refcount > 0); | worker->avail_callback(); | |||
if (--worker->refcount > 0) | ||||
return; | ||||
worker_connection_destroy(conn); | ||||
i_free(conn); | i_free(conn); | |||
} | } | |||
static int | static int | |||
worker_connection_handshake_args(struct connection *conn, const char *const *arg s) | worker_connection_handshake_args(struct connection *conn, const char *const *arg s) | |||
{ | { | |||
struct worker_connection *worker = | unsigned int process_limit; | |||
container_of(conn, struct worker_connection, conn); | ||||
int ret; | int ret; | |||
if (!conn->version_received) { | if (!conn->version_received) { | |||
if ((ret = connection_handshake_args_default(conn, args)) < 1) | if ((ret = connection_handshake_args_default(conn, args)) < 1) | |||
return ret; | return ret; | |||
/* we are not done yet */ | /* we are not done yet */ | |||
return 0; | return 0; | |||
} | } | |||
i_assert(worker->process_limit == 0); | if (str_to_uint(args[0], &process_limit) < 0 || | |||
if (str_to_uint(args[0], &worker->process_limit) < 0 || | process_limit == 0) { | |||
worker->process_limit == 0) { | ||||
e_error(conn->event, "Worker sent invalid process limit '%s'", | e_error(conn->event, "Worker sent invalid process limit '%s'", | |||
args[0]); | args[0]); | |||
return -1; | return -1; | |||
} | } | |||
worker_last_process_limit = process_limit; | ||||
return 1; | return 1; | |||
} | } | |||
static int | static int | |||
worker_connection_input_args(struct connection *conn, const char *const *args) | worker_connection_input_args(struct connection *conn, const char *const *args) | |||
{ | { | |||
struct worker_connection *worker = | struct worker_connection *worker = | |||
container_of(conn, struct worker_connection, conn); | container_of(conn, struct worker_connection, conn); | |||
int percentage; | int percentage; | |||
int ret = 1; | int ret = 1; | |||
skipping to change at line 110 | skipping to change at line 98 | |||
if (str_to_int(args[0], &percentage) < 0 || | if (str_to_int(args[0], &percentage) < 0 || | |||
percentage < -1 || percentage > 100) { | percentage < -1 || percentage > 100) { | |||
e_error(conn->event, "Worker sent invalid progress '%s'", args[0] ); | e_error(conn->event, "Worker sent invalid progress '%s'", args[0] ); | |||
return -1; | return -1; | |||
} | } | |||
if (percentage < 0) | if (percentage < 0) | |||
ret = -1; | ret = -1; | |||
worker_connection_call_callback(worker, percentage); | worker_connection_call_callback(worker, percentage); | |||
if (worker->request == NULL) { | ||||
/* disconnect after each request */ | ||||
ret = -1; | ||||
} | ||||
return ret; | return ret; | |||
} | } | |||
bool worker_connection_is_connected(struct connection *conn) | bool worker_connection_is_connected(struct connection *conn) | |||
{ | { | |||
return !conn->disconnected; | return !conn->disconnected; | |||
} | } | |||
bool worker_connection_get_process_limit(struct connection *conn, | unsigned int worker_connections_get_process_limit(void) | |||
unsigned int *limit_r) | ||||
{ | { | |||
struct worker_connection *worker = | return worker_last_process_limit; | |||
container_of(conn, struct worker_connection, conn); | ||||
if (worker->process_limit == 0) | ||||
return FALSE; | ||||
*limit_r = worker->process_limit; | ||||
return TRUE; | ||||
} | } | |||
void worker_connection_request(struct connection *conn, | void worker_connection_request(struct connection *conn, | |||
struct indexer_request *request, | struct indexer_request *request, | |||
void *context) | void *context) | |||
{ | { | |||
struct worker_connection *worker = | struct worker_connection *worker = | |||
container_of(conn, struct worker_connection, conn); | container_of(conn, struct worker_connection, conn); | |||
i_assert(worker_connection_is_connected(conn)); | i_assert(worker_connection_is_connected(conn)); | |||
skipping to change at line 171 | skipping to change at line 155 | |||
str_printfa(str, "\t%u\t", request->max_recent_msgs); | str_printfa(str, "\t%u\t", request->max_recent_msgs); | |||
if (request->index) | if (request->index) | |||
str_append_c(str, 'i'); | str_append_c(str, 'i'); | |||
if (request->optimize) | if (request->optimize) | |||
str_append_c(str, 'o'); | str_append_c(str, 'o'); | |||
str_append_c(str, '\n'); | str_append_c(str, '\n'); | |||
o_stream_nsend(conn->output, str_data(str), str_len(str)); | o_stream_nsend(conn->output, str_data(str), str_len(str)); | |||
} T_END; | } T_END; | |||
} | } | |||
bool worker_connection_is_busy(struct connection *conn) | ||||
{ | ||||
struct worker_connection *worker = | ||||
container_of(conn, struct worker_connection, conn); | ||||
return worker->request != NULL; | ||||
} | ||||
const char *worker_connection_get_username(struct connection *conn) | const char *worker_connection_get_username(struct connection *conn) | |||
{ | { | |||
struct worker_connection *worker = | struct worker_connection *worker = | |||
container_of(conn, struct worker_connection, conn); | container_of(conn, struct worker_connection, conn); | |||
return worker->request_username; | return worker->request_username; | |||
} | } | |||
struct indexer_request * | ||||
worker_connection_get_request(struct connection *conn) | ||||
{ | ||||
struct worker_connection *worker = | ||||
container_of(conn, struct worker_connection, conn); | ||||
return worker->request; | ||||
} | ||||
static const struct connection_vfuncs worker_connection_vfuncs = { | static const struct connection_vfuncs worker_connection_vfuncs = { | |||
.destroy = worker_connection_destroy, | .destroy = worker_connection_destroy, | |||
.input_args = worker_connection_input_args, | .input_args = worker_connection_input_args, | |||
.handshake_args = worker_connection_handshake_args, | .handshake_args = worker_connection_handshake_args, | |||
}; | }; | |||
static const struct connection_settings worker_connection_set = { | static const struct connection_settings worker_connection_set = { | |||
.service_name_in = INDEXER_WORKER_NAME, | .service_name_in = INDEXER_WORKER_NAME, | |||
.service_name_out = INDEXER_MASTER_NAME, | .service_name_out = INDEXER_MASTER_NAME, | |||
.major_version = INDEXER_PROTOCOL_MAJOR_VERSION, | .major_version = INDEXER_PROTOCOL_MAJOR_VERSION, | |||
skipping to change at line 218 | skipping to change at line 187 | |||
struct connection_list *worker_connection_list_create(void) | struct connection_list *worker_connection_list_create(void) | |||
{ | { | |||
return connection_list_init(&worker_connection_set, | return connection_list_init(&worker_connection_set, | |||
&worker_connection_vfuncs); | &worker_connection_vfuncs); | |||
} | } | |||
struct connection * | struct connection * | |||
worker_connection_create(const char *socket_path, | worker_connection_create(const char *socket_path, | |||
indexer_status_callback_t *callback, | indexer_status_callback_t *callback, | |||
worker_available_callback_t *avail_callback, | ||||
struct connection_list *list) | struct connection_list *list) | |||
{ | { | |||
struct worker_connection *conn; | struct worker_connection *conn; | |||
conn = i_new(struct worker_connection, 1); | conn = i_new(struct worker_connection, 1); | |||
conn->refcount = 1; | ||||
conn->callback = callback; | conn->callback = callback; | |||
conn->avail_callback = avail_callback; | ||||
connection_init_client_unix(list, &conn->conn, socket_path); | connection_init_client_unix(list, &conn->conn, socket_path); | |||
return &conn->conn; | return &conn->conn; | |||
} | } | |||
End of changes. 20 change blocks. | ||||
49 lines changed or deleted | 19 lines changed or added |