"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "dnstap/dnstap_collector.c" between
nsd-4.3.6.tar.gz and nsd-4.3.7.tar.gz

About: NSD is an authoritative only, high performance, simple name server daemon.

dnstap_collector.c  (nsd-4.3.6):dnstap_collector.c  (nsd-4.3.7)
skipping to change at line 36 skipping to change at line 36
#endif #endif
#include "dnstap/dnstap_collector.h" #include "dnstap/dnstap_collector.h"
#include "dnstap/dnstap.h" #include "dnstap/dnstap.h"
#include "util.h" #include "util.h"
#include "nsd.h" #include "nsd.h"
#include "region-allocator.h" #include "region-allocator.h"
#include "buffer.h" #include "buffer.h"
#include "namedb.h" #include "namedb.h"
#include "options.h" #include "options.h"
#include "udb.h"
#include "rrl.h"
struct dt_collector* dt_collector_create(struct nsd* nsd) struct dt_collector* dt_collector_create(struct nsd* nsd)
{ {
int i, sv[2]; int i, sv[2];
struct dt_collector* dt_col = (struct dt_collector*)xalloc_zero( struct dt_collector* dt_col = (struct dt_collector*)xalloc_zero(
sizeof(*dt_col)); sizeof(*dt_col));
dt_col->count = nsd->child_count; dt_col->count = nsd->child_count * 2;
dt_col->dt_env = NULL; dt_col->dt_env = NULL;
dt_col->region = region_create(xalloc, free); dt_col->region = region_create(xalloc, free);
dt_col->send_buffer = buffer_create(dt_col->region, dt_col->send_buffer = buffer_create(dt_col->region,
/* msglen + is_response + addrlen + is_tcp + packetlen + packet + zonelen + zone + spare + local_addr + addr */ /* msglen + is_response + addrlen + is_tcp + packetlen + packet + zonelen + zone + spare + local_addr + addr */
4+1+4+1+4+TCP_MAX_MESSAGE_LEN+4+MAXHOSTNAMELEN + 32 + 4+1+4+1+4+TCP_MAX_MESSAGE_LEN+4+MAXHOSTNAMELEN + 32 +
#ifdef INET6 #ifdef INET6
sizeof(struct sockaddr_storage) + sizeof(struct sockaddr_storage) sizeof(struct sockaddr_storage) + sizeof(struct sockaddr_storage)
#else #else
sizeof(struct sockaddr_in) + sizeof(struct sockaddr_in) sizeof(struct sockaddr_in) + sizeof(struct sockaddr_in)
#endif #endif
); );
/* open pipes in struct nsd */ /* open communication channels in struct nsd */
nsd->dt_collector_fd_send = (int*)xalloc_array_zero(dt_col->count, nsd->dt_collector_fd_send = (int*)xalloc_array_zero(dt_col->count,
sizeof(int)); sizeof(int));
nsd->dt_collector_fd_recv = (int*)xalloc_array_zero(dt_col->count, nsd->dt_collector_fd_recv = (int*)xalloc_array_zero(dt_col->count,
sizeof(int)); sizeof(int));
for(i=0; i<dt_col->count; i++) { for(i=0; i<dt_col->count; i++) {
int fd[2]; int sv[2];
fd[0] = -1; int bufsz = buffer_capacity(dt_col->send_buffer);
fd[1] = -1; sv[0] = -1; /* For receiving by parent (dnstap-collector) */
if(pipe(fd) < 0) { sv[1] = -1; /* For sending by child (server childs) */
error("dnstap_collector: cannot create pipe: %s", if(socketpair(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0, sv) < 0) {
error("dnstap_collector: cannot create communication chan
nel: %s",
strerror(errno)); strerror(errno));
} }
if(fcntl(fd[0], F_SETFL, O_NONBLOCK) == -1) { if(setsockopt(sv[0], SOL_SOCKET, SO_RCVBUF, &bufsz, sizeof(bufsz)
log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno)); )) {
log_msg(LOG_ERR, "setting dnstap_collector "
"receive buffer size failed: %s", strerror(errno)
);
} }
if(fcntl(fd[1], F_SETFL, O_NONBLOCK) == -1) { if(setsockopt(sv[1], SOL_SOCKET, SO_SNDBUF, &bufsz, sizeof(bufsz)
log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno)); )) {
log_msg(LOG_ERR, "setting dnstap_collector "
"send buffer size failed: %s", strerror(errno));
} }
nsd->dt_collector_fd_recv[i] = fd[0]; nsd->dt_collector_fd_recv[i] = sv[0];
nsd->dt_collector_fd_send[i] = fd[1]; nsd->dt_collector_fd_send[i] = sv[1];
} }
nsd->dt_collector_fd_swap = nsd->dt_collector_fd_send + nsd->child_count;
/* open socketpair */ /* open socketpair */
if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) { if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
error("dnstap_collector: cannot create socketpair: %s", error("dnstap_collector: cannot create socketpair: %s",
strerror(errno)); strerror(errno));
} }
if(fcntl(sv[0], F_SETFL, O_NONBLOCK) == -1) { if(fcntl(sv[0], F_SETFL, O_NONBLOCK) == -1) {
log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno)); log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno));
} }
if(fcntl(sv[1], F_SETFL, O_NONBLOCK) == -1) { if(fcntl(sv[1], F_SETFL, O_NONBLOCK) == -1) {
skipping to change at line 99 skipping to change at line 106
dt_col->cmd_socket_nsd = sv[1]; dt_col->cmd_socket_nsd = sv[1];
return dt_col; return dt_col;
} }
void dt_collector_destroy(struct dt_collector* dt_col, struct nsd* nsd) void dt_collector_destroy(struct dt_collector* dt_col, struct nsd* nsd)
{ {
if(!dt_col) return; if(!dt_col) return;
free(nsd->dt_collector_fd_recv); free(nsd->dt_collector_fd_recv);
nsd->dt_collector_fd_recv = NULL; nsd->dt_collector_fd_recv = NULL;
free(nsd->dt_collector_fd_send); if (nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap)
free(nsd->dt_collector_fd_send);
else
free(nsd->dt_collector_fd_swap);
nsd->dt_collector_fd_send = NULL; nsd->dt_collector_fd_send = NULL;
nsd->dt_collector_fd_swap = NULL;
region_destroy(dt_col->region); region_destroy(dt_col->region);
free(dt_col); free(dt_col);
} }
void dt_collector_close(struct dt_collector* dt_col, struct nsd* nsd) void dt_collector_close(struct dt_collector* dt_col, struct nsd* nsd)
{ {
int i; int i, *fd_send;
if(!dt_col) return; if(!dt_col) return;
if(dt_col->cmd_socket_dt != -1) { if(dt_col->cmd_socket_dt != -1) {
close(dt_col->cmd_socket_dt); close(dt_col->cmd_socket_dt);
dt_col->cmd_socket_dt = -1; dt_col->cmd_socket_dt = -1;
} }
if(dt_col->cmd_socket_nsd != -1) { if(dt_col->cmd_socket_nsd != -1) {
close(dt_col->cmd_socket_nsd); close(dt_col->cmd_socket_nsd);
dt_col->cmd_socket_nsd = -1; dt_col->cmd_socket_nsd = -1;
} }
fd_send = nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap
? nsd->dt_collector_fd_send : nsd->dt_collector_fd_swap;
for(i=0; i<dt_col->count; i++) { for(i=0; i<dt_col->count; i++) {
if(nsd->dt_collector_fd_recv[i] != -1) { if(nsd->dt_collector_fd_recv[i] != -1) {
close(nsd->dt_collector_fd_recv[i]); close(nsd->dt_collector_fd_recv[i]);
nsd->dt_collector_fd_recv[i] = -1; nsd->dt_collector_fd_recv[i] = -1;
} }
if(nsd->dt_collector_fd_send[i] != -1) { if(fd_send[i] != -1) {
close(nsd->dt_collector_fd_send[i]); close(fd_send[i]);
nsd->dt_collector_fd_send[i] = -1; fd_send[i] = -1;
} }
} }
} }
/* handle command from nsd to dt collector. /* handle command from nsd to dt collector.
* mostly, check for fd closed, this means we have to exit */ * mostly, check for fd closed, this means we have to exit */
void void
dt_handle_cmd_from_nsd(int ATTR_UNUSED(fd), short event, void* arg) dt_handle_cmd_from_nsd(int ATTR_UNUSED(fd), short event, void* arg)
{ {
struct dt_collector* dt_col = (struct dt_collector*)arg; struct dt_collector* dt_col = (struct dt_collector*)arg;
if((event&EV_READ) != 0) { if((event&EV_READ) != 0) {
event_base_loopexit(dt_col->event_base, NULL); event_base_loopexit(dt_col->event_base, NULL);
} }
} }
/* read data from fd into buffer, true when message is complete */ /* receive data from fd into buffer, 1 when message received, -1 on error */
static int read_into_buffer(int fd, struct buffer* buf) static int recv_into_buffer(int fd, struct buffer* buf)
{ {
size_t msglen; size_t msglen;
ssize_t r; ssize_t r;
if(buffer_position(buf) < 4) {
/* read the length of the message */
r = read(fd, buffer_current(buf), 4 - buffer_position(buf));
if(r == -1) {
if(errno == EAGAIN || errno == EINTR) {
/* continue to read later */
return 0;
}
log_msg(LOG_ERR, "dnstap collector: read failed: %s",
strerror(errno));
return 0;
}
buffer_skip(buf, r);
if(buffer_position(buf) < 4)
return 0; /* continue to read more msglen later */
}
/* msglen complete */ assert(buffer_position(buf) == 0);
msglen = buffer_read_u32_at(buf, 0); r = recv(fd, buffer_current(buf), buffer_capacity(buf), MSG_DONTWAIT);
/* assert we have enough space, if we don't and we wanted to continue,
* we would have to skip the message somehow, but that should never
* happen because send_buffer and receive_buffer have the same size */
assert(buffer_capacity(buf) >= msglen + 4);
r = read(fd, buffer_current(buf), msglen - (buffer_position(buf) - 4));
if(r == -1) { if(r == -1) {
if(errno == EAGAIN || errno == EINTR) { if(errno == EAGAIN || errno == EINTR || errno == EMSGSIZE) {
/* continue to read later */ /* continue to receive a message later */
return 0; return 0;
} }
log_msg(LOG_ERR, "dnstap collector: read failed: %s", log_msg(LOG_ERR, "dnstap collector: receive failed: %s",
strerror(errno)); strerror(errno));
return -1;
}
if(r == 0) {
/* Remote end closed the connection? */
log_msg(LOG_ERR, "dnstap collector: remote closed connection");
return -1;
}
assert(r > 4);
msglen = buffer_read_u32_at(buf, 0);
if(msglen != (size_t)(r - 4)) {
/* Is this still possible now the communication channel is of
* type SOCK_DGRAM? I think not, but better safe than sorry. */
log_msg(LOG_ERR, "dnstap collector: out of sync (msglen: %u)",
(unsigned int) msglen);
return 0; return 0;
} }
buffer_skip(buf, r); buffer_skip(buf, r);
if(buffer_position(buf) < 4 + msglen)
return 0; /* read more msg later */
/* msg complete */
buffer_flip(buf); buffer_flip(buf);
return 1; return 1;
} }
/* submit the content of the buffer received to dnstap */ /* submit the content of the buffer received to dnstap */
static void static void
dt_submit_content(struct dt_env* dt_env, struct buffer* buf) dt_submit_content(struct dt_env* dt_env, struct buffer* buf)
{ {
uint8_t is_response, is_tcp; uint8_t is_response, is_tcp;
#ifdef INET6 #ifdef INET6
skipping to change at line 245 skipping to change at line 247
zonelen, data, pktlen); zonelen, data, pktlen);
} }
} }
/* handle input from worker for dnstap */ /* handle input from worker for dnstap */
void void
dt_handle_input(int fd, short event, void* arg) dt_handle_input(int fd, short event, void* arg)
{ {
struct dt_collector_input* dt_input = (struct dt_collector_input*)arg; struct dt_collector_input* dt_input = (struct dt_collector_input*)arg;
if((event&EV_READ) != 0) { if((event&EV_READ) != 0) {
/* read */ /* receive */
if(!read_into_buffer(fd, dt_input->buffer)) int r = recv_into_buffer(fd, dt_input->buffer);
if(r == 0)
return; return;
else if(r < 0) {
/* once data is complete, write it to dnstap */ event_base_loopexit(dt_input->dt_collector->event_base, N
ULL);
return;
}
/* once data is complete, send it to dnstap */
VERBOSITY(4, (LOG_INFO, "dnstap collector: received msg len %d", VERBOSITY(4, (LOG_INFO, "dnstap collector: received msg len %d",
(int)buffer_remaining(dt_input->buffer))); (int)buffer_remaining(dt_input->buffer)));
if(dt_input->dt_collector->dt_env) { if(dt_input->dt_collector->dt_env) {
dt_submit_content(dt_input->dt_collector->dt_env, dt_submit_content(dt_input->dt_collector->dt_env,
dt_input->buffer); dt_input->buffer);
} }
/* clear buffer for next message */ /* clear buffer for next message */
buffer_clear(dt_input->buffer); buffer_clear(dt_input->buffer);
} }
skipping to change at line 379 skipping to change at line 385
} }
/* cleanup and done */ /* cleanup and done */
VERBOSITY(1, (LOG_INFO, "dnstap collector stopped")); VERBOSITY(1, (LOG_INFO, "dnstap collector stopped"));
dt_collector_cleanup(dt_col, nsd); dt_collector_cleanup(dt_col, nsd);
exit(0); exit(0);
} }
void dt_collector_start(struct dt_collector* dt_col, struct nsd* nsd) void dt_collector_start(struct dt_collector* dt_col, struct nsd* nsd)
{ {
int i, *fd_send;
/* fork */ /* fork */
dt_col->dt_pid = fork(); dt_col->dt_pid = fork();
if(dt_col->dt_pid == -1) { if(dt_col->dt_pid == -1) {
error("dnstap_collector: fork failed: %s", strerror(errno)); error("dnstap_collector: fork failed: %s", strerror(errno));
} }
if(dt_col->dt_pid == 0) { if(dt_col->dt_pid == 0) {
/* the dt collector process is this */ /* the dt collector process is this */
/* close the nsd side of the command channel */ /* close the nsd side of the command channel */
close(dt_col->cmd_socket_nsd); close(dt_col->cmd_socket_nsd);
dt_col->cmd_socket_nsd = -1; dt_col->cmd_socket_nsd = -1;
/* close the send side of the communication channels */
assert(nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap);
fd_send = nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap
? nsd->dt_collector_fd_send : nsd->dt_collector_fd_swap;
for(i=0; i<dt_col->count; i++) {
if(fd_send[i] != -1) {
close(fd_send[i]);
fd_send[i] = -1;
}
}
#ifdef HAVE_SETPROCTITLE
setproctitle("dnstap_collector");
#endif
/* Free serve process specific memory pages */
#ifdef RATELIMIT
rrl_mmap_deinit_keep_mmap();
#endif
udb_base_free_keep_mmap(nsd->task[0]);
udb_base_free_keep_mmap(nsd->task[1]);
namedb_close_udb(nsd->db); /* keeps mmap */
namedb_close(nsd->db);
dt_collector_run(dt_col, nsd); dt_collector_run(dt_col, nsd);
/* NOTREACH */ /* NOTREACH */
exit(0); exit(0);
} else { } else {
/* the parent continues on, with starting NSD */ /* the parent continues on, with starting NSD */
/* close the dt side of the command channel */ /* close the dt side of the command channel */
close(dt_col->cmd_socket_dt); close(dt_col->cmd_socket_dt);
dt_col->cmd_socket_dt = -1; dt_col->cmd_socket_dt = -1;
/* close the receive side of the communication channels */
for(i=0; i<dt_col->count; i++) {
if(nsd->dt_collector_fd_recv[i] != -1) {
close(nsd->dt_collector_fd_recv[i]);
nsd->dt_collector_fd_recv[i] = -1;
}
}
} }
} }
/* put data for sending to the collector process into the buffer */ /* put data for sending to the collector process into the buffer */
static int static int
prep_send_data(struct buffer* buf, uint8_t is_response, prep_send_data(struct buffer* buf, uint8_t is_response,
#ifdef INET6 #ifdef INET6
struct sockaddr_storage* local_addr, struct sockaddr_storage* local_addr,
struct sockaddr_storage* addr, struct sockaddr_storage* addr,
#else #else
skipping to change at line 449 skipping to change at line 487
return 0; return 0;
buffer_write_u32(buf, 0); buffer_write_u32(buf, 0);
} }
buffer_flip(buf); buffer_flip(buf);
/* write length of message */ /* write length of message */
buffer_write_u32_at(buf, 0, buffer_remaining(buf)-4); buffer_write_u32_at(buf, 0, buffer_remaining(buf)-4);
return 1; return 1;
} }
/* attempt to write buffer to socket, if it blocks do not write it. */ /* attempt to send buffer to socket, if it blocks do not send it.
static void attempt_to_write(int s, uint8_t* data, size_t len) * return 0 on success, -1 on error */
static int attempt_to_send(int s, uint8_t* data, size_t len)
{ {
size_t total = 0;
ssize_t r; ssize_t r;
while(total < len) { if(len == 0)
r = write(s, data+total, len-total); return 0;
if(r == -1) { r = send(s, data, len, MSG_DONTWAIT | MSG_NOSIGNAL);
if(errno == EAGAIN && total == 0) { if(r == -1) {
/* on first write part, check if pipe is full, if(errno == EAGAIN || errno == EINTR ||
* if the nonblocking fd blocks, then drop errno == ENOBUFS || errno == EMSGSIZE) {
* the message */ /* check if pipe is full, if the nonblocking fd blocks,
return; * then drop the message */
} return 0;
if(errno != EAGAIN && errno != EINTR) {
/* some sort of error, print it and drop it */
log_msg(LOG_ERR,
"dnstap collector: write failed: %s",
strerror(errno));
return;
}
/* continue and write this again */
/* for EINTR, we have to do this,
* for EAGAIN, if the first part succeeded, we have
* to continue to write the remainder of the message,
* because otherwise partial messages confuse the
* receiver. */
continue;
} }
total += r; /* some sort of error, print it */
log_msg(LOG_ERR, "dnstap collector: send failed: %s",
strerror(errno));
return -1;
} }
assert(r > 0);
if(r > 0) {
assert((size_t)r == len);
return 0;
}
/* Other end closed the channel? */
log_msg(LOG_ERR, "dnstap collector: server child closed the channel");
return -1;
} }
void dt_collector_submit_auth_query(struct nsd* nsd, void dt_collector_submit_auth_query(struct nsd* nsd,
#ifdef INET6 #ifdef INET6
struct sockaddr_storage* local_addr, struct sockaddr_storage* local_addr,
struct sockaddr_storage* addr, struct sockaddr_storage* addr,
#else #else
struct sockaddr_in* local_addr, struct sockaddr_in* local_addr,
struct sockaddr_in* addr, struct sockaddr_in* addr,
#endif #endif
socklen_t addrlen, int is_tcp, struct buffer* packet) socklen_t addrlen, int is_tcp, struct buffer* packet)
{ {
if(!nsd->dt_collector) return; if(!nsd->dt_collector) return;
if(!nsd->options->dnstap_log_auth_query_messages) return; if(!nsd->options->dnstap_log_auth_query_messages) return;
if(nsd->dt_collector_fd_send[nsd->this_child->child_num] == -1) return;
VERBOSITY(4, (LOG_INFO, "dnstap submit auth query")); VERBOSITY(4, (LOG_INFO, "dnstap submit auth query"));
/* marshal data into send buffer */ /* marshal data into send buffer */
if(!prep_send_data(nsd->dt_collector->send_buffer, 0, local_addr, addr, a ddrlen, if(!prep_send_data(nsd->dt_collector->send_buffer, 0, local_addr, addr, a ddrlen,
is_tcp, packet, NULL)) is_tcp, packet, NULL))
return; /* probably did not fit in buffer */ return; /* probably did not fit in buffer */
/* attempt to send data; do not block */ /* attempt to send data; do not block */
attempt_to_write(nsd->dt_collector_fd_send[nsd->this_child->child_num], if(attempt_to_send(nsd->dt_collector_fd_send[nsd->this_child->child_num],
buffer_begin(nsd->dt_collector->send_buffer), buffer_begin(nsd->dt_collector->send_buffer),
buffer_remaining(nsd->dt_collector->send_buffer)); buffer_remaining(nsd->dt_collector->send_buffer))) {
/* Something went wrong sending to the socket. Don't send to
* this socket again. */
close(nsd->dt_collector_fd_send[nsd->this_child->child_num]);
nsd->dt_collector_fd_send[nsd->this_child->child_num] = -1;
}
} }
void dt_collector_submit_auth_response(struct nsd* nsd, void dt_collector_submit_auth_response(struct nsd* nsd,
#ifdef INET6 #ifdef INET6
struct sockaddr_storage* local_addr, struct sockaddr_storage* local_addr,
struct sockaddr_storage* addr, struct sockaddr_storage* addr,
#else #else
struct sockaddr_in* local_addr, struct sockaddr_in* local_addr,
struct sockaddr_in* addr, struct sockaddr_in* addr,
#endif #endif
socklen_t addrlen, int is_tcp, struct buffer* packet, socklen_t addrlen, int is_tcp, struct buffer* packet,
struct zone* zone) struct zone* zone)
{ {
if(!nsd->dt_collector) return; if(!nsd->dt_collector) return;
if(!nsd->options->dnstap_log_auth_response_messages) return; if(!nsd->options->dnstap_log_auth_response_messages) return;
if(nsd->dt_collector_fd_send[nsd->this_child->child_num] == -1) return;
VERBOSITY(4, (LOG_INFO, "dnstap submit auth response")); VERBOSITY(4, (LOG_INFO, "dnstap submit auth response"));
/* marshal data into send buffer */ /* marshal data into send buffer */
if(!prep_send_data(nsd->dt_collector->send_buffer, 1, local_addr, addr, a ddrlen, if(!prep_send_data(nsd->dt_collector->send_buffer, 1, local_addr, addr, a ddrlen,
is_tcp, packet, zone)) is_tcp, packet, zone))
return; /* probably did not fit in buffer */ return; /* probably did not fit in buffer */
/* attempt to send data; do not block */ /* attempt to send data; do not block */
attempt_to_write(nsd->dt_collector_fd_send[nsd->this_child->child_num], if(attempt_to_send(nsd->dt_collector_fd_send[nsd->this_child->child_num],
buffer_begin(nsd->dt_collector->send_buffer), buffer_begin(nsd->dt_collector->send_buffer),
buffer_remaining(nsd->dt_collector->send_buffer)); buffer_remaining(nsd->dt_collector->send_buffer))) {
/* Something went wrong sending to the socket. Don't send to
* this socket again. */
close(nsd->dt_collector_fd_send[nsd->this_child->child_num]);
nsd->dt_collector_fd_send[nsd->this_child->child_num] = -1;
}
} }
 End of changes. 34 change blocks. 
87 lines changed or deleted 139 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)