indexer.c (dovecot-2.3.16) | : | indexer.c (dovecot-2.3.17) | ||
---|---|---|---|---|
skipping to change at line 22 | skipping to change at line 22 | |||
#include "worker-connection.h" | #include "worker-connection.h" | |||
struct worker_request { | struct worker_request { | |||
struct connection *conn; | struct connection *conn; | |||
struct indexer_request *request; | struct indexer_request *request; | |||
}; | }; | |||
static const struct master_service_settings *set; | static const struct master_service_settings *set; | |||
static struct indexer_queue *queue; | static struct indexer_queue *queue; | |||
static struct worker_pool *worker_pool; | static struct worker_pool *worker_pool; | |||
static struct timeout *to_send_more; | ||||
void indexer_refresh_proctitle(void) | void indexer_refresh_proctitle(void) | |||
{ | { | |||
if (!set->verbose_proctitle) | if (!set->verbose_proctitle) | |||
return; | return; | |||
process_title_set(t_strdup_printf("[%u clients, %u requests]", | process_title_set(t_strdup_printf("[%u clients, %u requests]", | |||
indexer_clients_get_count(), | indexer_clients_get_count(), | |||
indexer_queue_count(queue))); | indexer_queue_count(queue))); | |||
} | } | |||
static bool idle_die(void) | static bool idle_die(void) | |||
{ | { | |||
return indexer_queue_is_empty(queue) && | return indexer_queue_is_empty(queue) && | |||
!worker_pool_have_busy_connections(worker_pool); | !worker_pool_have_connections(worker_pool); | |||
} | } | |||
static void client_connected(struct master_service_connection *conn) | static void client_connected(struct master_service_connection *conn) | |||
{ | { | |||
master_service_client_connection_accept(conn); | master_service_client_connection_accept(conn); | |||
indexer_client_create(conn, queue); | indexer_client_create(conn, queue); | |||
} | } | |||
static void worker_send_request(struct connection *conn, | static void worker_send_request(struct connection *conn, | |||
struct indexer_request *request) | struct indexer_request *request) | |||
skipping to change at line 64 | skipping to change at line 63 | |||
indexer_queue_request_work(request); | indexer_queue_request_work(request); | |||
worker_connection_request(conn, request, wrequest); | worker_connection_request(conn, request, wrequest); | |||
} | } | |||
static void queue_try_send_more(struct indexer_queue *queue) | static void queue_try_send_more(struct indexer_queue *queue) | |||
{ | { | |||
struct connection *conn; | struct connection *conn; | |||
struct indexer_request *request, *first_moved_request = NULL; | struct indexer_request *request, *first_moved_request = NULL; | |||
timeout_remove(&to_send_more); | ||||
while ((request = indexer_queue_request_peek(queue)) != NULL) { | while ((request = indexer_queue_request_peek(queue)) != NULL) { | |||
conn = worker_pool_find_username_connection(worker_pool, | conn = worker_pool_find_username_connection(worker_pool, | |||
request->username); | request->username); | |||
if (conn != NULL && worker_connection_is_busy(conn)) { | if (conn != NULL) { | |||
/* There is already a connection handling a request | /* There is already a connection handling a request | |||
* for this user. Move the request to the back of the | * for this user. Move the request to the back of the | |||
* queue and handle requests from other users. | * queue and handle requests from other users. | |||
* Terminate if we went through all requests. */ | * Terminate if we went through all requests. */ | |||
if (request == first_moved_request) { | if (request == first_moved_request) { | |||
/* all requests are waiting for existing users | /* all requests are waiting for existing users | |||
to finish. */ | to finish. */ | |||
break; | break; | |||
} | } | |||
if (first_moved_request == NULL) | if (first_moved_request == NULL) | |||
first_moved_request = request; | first_moved_request = request; | |||
indexer_queue_move_head_to_tail(queue); | indexer_queue_move_head_to_tail(queue); | |||
continue; | continue; | |||
} else if (conn == NULL) { | } else { | |||
/* create a new connection to a worker */ | /* create a new connection to a worker */ | |||
if (!worker_pool_get_connection(worker_pool, &conn)) | if (!worker_pool_get_connection(worker_pool, &conn)) | |||
break; | break; | |||
} | } | |||
indexer_queue_request_remove(queue); | indexer_queue_request_remove(queue); | |||
worker_send_request(conn, request); | worker_send_request(conn, request); | |||
} | } | |||
} | } | |||
static void queue_listen_callback(struct indexer_queue *queue) | static void queue_listen_callback(struct indexer_queue *queue) | |||
{ | { | |||
queue_try_send_more(queue); | queue_try_send_more(queue); | |||
} | } | |||
static void worker_status_callback(int percentage, void *context) | static void | |||
worker_status_callback(int percentage, struct indexer_request *request) | ||||
{ | { | |||
struct connection *conn = context; | ||||
struct indexer_request *request = worker_connection_get_request(conn); | ||||
if (percentage >= 0 && percentage < 100) { | if (percentage >= 0 && percentage < 100) { | |||
indexer_queue_request_status(queue, request, | indexer_queue_request_status(queue, request, | |||
percentage); | percentage); | |||
return; | return; | |||
} | } | |||
indexer_queue_request_finish(queue, &request, | indexer_queue_request_finish(queue, &request, | |||
percentage == 100); | percentage == 100); | |||
if (worker_pool != NULL) /* not in deinit */ | } | |||
worker_pool_release_connection(worker_pool, conn); | ||||
/* if this was the last request for the connection, we can send more | static void worker_avail_callback(void) | |||
through it. delay it a bit, since we may be coming here from | { | |||
worker_connection_disconnect() and we want to finish it up. */ | /* A new worker became available. Try to shrink the queue. */ | |||
if (to_send_more == NULL) | queue_try_send_more(queue); | |||
to_send_more = timeout_add_short(0, queue_try_send_more, queue); | ||||
} | } | |||
int main(int argc, char *argv[]) | int main(int argc, char *argv[]) | |||
{ | { | |||
const char *error; | const char *error; | |||
master_service = master_service_init("indexer", 0, &argc, &argv, ""); | master_service = master_service_init("indexer", 0, &argc, &argv, ""); | |||
if (master_getopt(master_service) > 0) | if (master_getopt(master_service) > 0) | |||
return FATAL_DEFAULT; | return FATAL_DEFAULT; | |||
skipping to change at line 142 | skipping to change at line 135 | |||
set = master_service_settings_get(master_service); | set = master_service_settings_get(master_service); | |||
master_service_init_log(master_service); | master_service_init_log(master_service); | |||
restrict_access_by_env(RESTRICT_ACCESS_FLAG_ALLOW_ROOT, NULL); | restrict_access_by_env(RESTRICT_ACCESS_FLAG_ALLOW_ROOT, NULL); | |||
restrict_access_allow_coredumps(TRUE); | restrict_access_allow_coredumps(TRUE); | |||
master_service_set_idle_die_callback(master_service, idle_die); | master_service_set_idle_die_callback(master_service, idle_die); | |||
queue = indexer_queue_init(indexer_client_status_callback); | queue = indexer_queue_init(indexer_client_status_callback); | |||
indexer_queue_set_listen_callback(queue, queue_listen_callback); | indexer_queue_set_listen_callback(queue, queue_listen_callback); | |||
worker_pool = worker_pool_init("indexer-worker", | worker_pool = worker_pool_init("indexer-worker", | |||
worker_status_callback); | worker_status_callback, | |||
worker_avail_callback); | ||||
master_service_init_finish(master_service); | master_service_init_finish(master_service); | |||
master_service_run(master_service, client_connected); | master_service_run(master_service, client_connected); | |||
indexer_queue_cancel_all(queue); | indexer_queue_cancel_all(queue); | |||
indexer_clients_destroy_all(); | indexer_clients_destroy_all(); | |||
worker_pool_deinit(&worker_pool); | worker_pool_deinit(&worker_pool); | |||
indexer_queue_deinit(&queue); | indexer_queue_deinit(&queue); | |||
timeout_remove(&to_send_more); | ||||
master_service_deinit(&master_service); | master_service_deinit(&master_service); | |||
return 0; | return 0; | |||
} | } | |||
End of changes. 11 change blocks. | ||||
19 lines changed or deleted | 12 lines changed or added |