"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "ports/winnt/ntpd/ntp_iocompletionport.c" between
ntp-4.2.8p14.tar.gz and ntp-4.2.8p15.tar.gz

About: NTP is the the Official Reference Implementation of the Network Time Protocol (NTP) that synchronize the clocks of computers over a network.

ntp_iocompletionport.c  (ntp-4.2.8p14):ntp_iocompletionport.c  (ntp-4.2.8p15)
skipping to change at line 699 skipping to change at line 699
* So, for the time being, we have one request cycling... * So, for the time being, we have one request cycling...
* ------------------------------------------------------------------- * -------------------------------------------------------------------
*/ */
static BOOL __fastcall static BOOL __fastcall
QueueSerialWait( QueueSerialWait(
IoCtx_t * lpo, IoCtx_t * lpo,
recvbuf_t * buff recvbuf_t * buff
) )
{ {
static const char * const msg = static const char * const msgh =
"QueueSerialWait: cannot wait for COM event"; "QueueSerialWait: cannot wait for COM event";
BOOL rc; BOOL rc;
lpo->onIoDone = OnSerialWaitComplete; lpo->onIoDone = OnSerialWaitComplete;
lpo->recv_buf = buff; lpo->recv_buf = buff;
lpo->flRawMem = 0; lpo->flRawMem = 0;
buff->fd = lpo->iopad->riofd; buff->fd = lpo->iopad->riofd;
/* keep receive position for continuation of partial lines! */ /* keep receive position for continuation of partial lines! */
rc = WaitCommEvent(lpo->io.hnd, &lpo->aux.com_events, &lpo->ol); rc = WaitCommEvent(lpo->io.hnd, &lpo->aux.com_events, &lpo->ol);
return rc || IoResultCheck(GetLastError(), lpo, msg); return rc || IoResultCheck(GetLastError(), lpo, msgh);
} }
/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
static void static void
OnSerialWaitComplete( OnSerialWaitComplete(
ULONG_PTR key, ULONG_PTR key,
IoCtx_t * lpo IoCtx_t * lpo
) )
{ {
static const char * const msg = static const char * const msgh =
"OnSerialWaitComplete: wait for COM event failed"; "OnSerialWaitComplete: wait for COM event failed";
DevCtx_t * dev; DevCtx_t * dev;
PPSDataEx_t * ppsbuf; PPSDataEx_t * ppsbuf;
DWORD modem_status; DWORD modem_status;
u_long covc; u_long covc;
/* Make sure this RIO is not closed. */ /* Make sure this RIO is not closed. */
if (NULL == getRioFromIoCtx(lpo, key, msg)) if (NULL == getRioFromIoCtx(lpo, key, msgh))
return; return;
/* start next IO and leave if we hit an error */ /* start next IO and leave if we hit an error */
if (lpo->errCode != ERROR_SUCCESS) { if (lpo->errCode != ERROR_SUCCESS) {
memset(&lpo->aux, 0, sizeof(lpo->aux)); memset(&lpo->aux, 0, sizeof(lpo->aux));
IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf); IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf);
return; return;
} }
#ifdef DEBUG #ifdef DEBUG
skipping to change at line 838 skipping to change at line 838
* *
* common for both modes * common for both modes
* ------------------------------------------------------------------- * -------------------------------------------------------------------
*/ */
static BOOL __fastcall static BOOL __fastcall
QueueSerialReadCommon( QueueSerialReadCommon(
IoCtx_t * lpo, IoCtx_t * lpo,
recvbuf_t * buff recvbuf_t * buff
) )
{ {
static const char * const msg = static const char * const msgh =
"QueueSerialRead: cannot schedule device read"; "QueueSerialRead: cannot schedule device read";
BOOL rc; BOOL rc;
/* 'lpo->onIoDone' must be set already! */ /* 'lpo->onIoDone' must be set already! */
lpo->recv_buf = buff; lpo->recv_buf = buff;
lpo->flRawMem = 0; lpo->flRawMem = 0;
/* 'buff->recv_length' must be set already! */ /* 'buff->recv_length' must be set already! */
buff->fd = lpo->iopad->riofd; buff->fd = lpo->iopad->riofd;
buff->dstadr = NULL; buff->dstadr = NULL;
buff->receiver = process_refclock_packet; buff->receiver = process_refclock_packet;
buff->recv_peer = lpo->iopad->rsrc.rio->srcclock; buff->recv_peer = lpo->iopad->rsrc.rio->srcclock;
rc = ReadFile(lpo->io.hnd, rc = ReadFile(lpo->io.hnd,
(char*)buff->recv_buffer + buff->recv_length, (char*)buff->recv_buffer + buff->recv_length,
sizeof(buff->recv_buffer) - buff->recv_length, sizeof(buff->recv_buffer) - buff->recv_length,
NULL, &lpo->ol); NULL, &lpo->ol);
return rc || IoResultCheck(GetLastError(), lpo, msg); return rc || IoResultCheck(GetLastError(), lpo, msgh);
} }
/* /*
* ------------------------------------------------------------------- * -------------------------------------------------------------------
* Serial IO stuff * Serial IO stuff
* *
* Part 2 -- line discipline emulation * Part 2 -- line discipline emulation
* *
* Ideally this should *not* be done in the IO completion thread. * Ideally this should *not* be done in the IO completion thread.
* We use a worker pool thread to offload the low-level processing. * We use a worker pool thread to offload the low-level processing.
skipping to change at line 895 skipping to change at line 895
/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* IO completion thread callback. Takes a time stamp and offloads the * IO completion thread callback. Takes a time stamp and offloads the
* real work to the worker pool ASAP. * real work to the worker pool ASAP.
*/ */
static void static void
OnSerialReadComplete( OnSerialReadComplete(
ULONG_PTR key, ULONG_PTR key,
IoCtx_t * lpo IoCtx_t * lpo
) )
{ {
static const char * const msg = static const char * const msgh =
"OnSerialReadComplete: read from device failed"; "OnSerialReadComplete: read from device failed";
/* Make sure this RIO is not closed. */ /* Make sure this RIO is not closed. */
if (NULL == getRioFromIoCtx(lpo, key, msg)) if (NULL == getRioFromIoCtx(lpo, key, msgh))
return; return;
/* start next IO and leave if we hit an error */ /* start next IO and leave if we hit an error */
if (lpo->errCode != ERROR_SUCCESS) if (lpo->errCode != ERROR_SUCCESS)
goto wait_again; goto wait_again;
/* Offload to worker pool, if there is data */ /* Offload to worker pool, if there is data */
if (lpo->byteCount == 0) if (lpo->byteCount == 0)
goto wait_again; goto wait_again;
skipping to change at line 1025 skipping to change at line 1025
* implementation is very low-level to gather speed. * implementation is very low-level to gather speed.
*/ */
buff->recv_length += (int)lpo->byteCount; buff->recv_length += (int)lpo->byteCount;
sptr = (char *)buff->recv_buffer; sptr = (char *)buff->recv_buffer;
send = sptr + buff->recv_length; send = sptr + buff->recv_length;
if (sptr == send) if (sptr == send)
goto st_read_fresh; goto st_read_fresh;
st_new_obuf: st_new_obuf:
/* Get new receive buffer to store the line. */ /* Get new receive buffer to store the line. */
obuf = get_free_recv_buffer_alloc(); obuf = get_free_recv_buffer_alloc(TRUE);
if (!obuf) {
++packets_dropped; /* maybe atomic? */
buff->recv_length = 0;
goto st_read_fresh;
}
obuf->fd = buff->fd; obuf->fd = buff->fd;
obuf->receiver = buff->receiver; obuf->receiver = buff->receiver;
obuf->dstadr = NULL; obuf->dstadr = NULL;
obuf->recv_peer = buff->recv_peer; obuf->recv_peer = buff->recv_peer;
set_serial_recv_time(obuf, lpo); set_serial_recv_time(obuf, lpo);
st_copy_start: st_copy_start:
/* Copy data to new buffer, convert CR to LF on the fly. /* Copy data to new buffer, convert CR to LF on the fly.
* Stop after either. * Stop after either.
*/ */
skipping to change at line 1143 skipping to change at line 1148
/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* IO completion thread callback. Takes a time stamp and offloads the * IO completion thread callback. Takes a time stamp and offloads the
* real work to the worker pool ASAP. * real work to the worker pool ASAP.
*/ */
static void static void
OnRawSerialReadComplete( OnRawSerialReadComplete(
ULONG_PTR key, ULONG_PTR key,
IoCtx_t * lpo IoCtx_t * lpo
) )
{ {
static const char * const msg = static const char * const msgh =
"OnRawSerialReadComplete: read from device failed"; "OnRawSerialReadComplete: read from device failed";
recvbuf_t * buff = lpo->recv_buf; recvbuf_t * buff = lpo->recv_buf;
RIO_t * rio = getRioFromIoCtx(lpo, key, msg); RIO_t * rio = getRioFromIoCtx(lpo, key, msgh);
/* Make sure this RIO is not closed. */ /* Make sure this RIO is not closed. */
if (rio == NULL) if (rio == NULL)
return; return;
/* start next IO and leave if we hit an error */ /* start next IO and leave if we hit an error */
if (lpo->errCode == ERROR_SUCCESS && lpo->byteCount > 0) { if (lpo->errCode == ERROR_SUCCESS && lpo->byteCount > 0) {
buff->recv_length = (int)lpo->byteCount; buff->recv_length = (int)lpo->byteCount;
set_serial_recv_time(buff, lpo); set_serial_recv_time(buff, lpo);
iohpQueueLocked(lpo->iopad, iohpRefClockOK, buff); lpo->recv_buf = get_free_recv_buffer_alloc(TRUE);
buff = get_free_recv_buffer_alloc(); if (lpo->recv_buf) {
iohpQueueLocked(lpo->iopad, iohpRefClockOK, buff);
} else {
++packets_dropped; /* maybe atomic? */
buff->recv_length = 0;
lpo->recv_buf = buff;
}
} }
IoCtxStartChecked(lpo, QueueSerialWait, buff); IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf);
} }
static void static void
set_serial_recv_time( set_serial_recv_time(
recvbuf_t * obuf, recvbuf_t * obuf,
IoCtx_t * lpo IoCtx_t * lpo
) )
{ {
/* Time stamp assignment is interesting. If we /* Time stamp assignment is interesting. If we
* have a DCD stamp, we use it, otherwise we use * have a DCD stamp, we use it, otherwise we use
skipping to change at line 1204 skipping to change at line 1215
/* /*
* async_write, clone of write(), used by some reflock drivers * async_write, clone of write(), used by some reflock drivers
*/ */
int int
async_write( async_write(
int fd, int fd,
const void * data, const void * data,
unsigned int count unsigned int count
) )
{ {
static const char * const msg = static const char * const msgh =
"async_write: cannot schedule device write"; "async_write: cannot schedule device write";
static const char * const dmsg = static const char * const dmsg =
"overlapped IO data buffer"; "overlapped IO data buffer";
IoCtx_t * lpo = NULL; IoCtx_t * lpo = NULL;
void * buff = NULL; void * buff = NULL;
HANDLE hnd = NULL; HANDLE hnd = NULL;
BOOL rc; BOOL rc;
hnd = (HANDLE)_get_osfhandle(fd); hnd = (HANDLE)_get_osfhandle(fd);
skipping to change at line 1229 skipping to change at line 1240
if (NULL == (lpo = IoCtxAlloc(NULL, NULL))) if (NULL == (lpo = IoCtxAlloc(NULL, NULL)))
goto fail; goto fail;
lpo->io.hnd = hnd; lpo->io.hnd = hnd;
lpo->onIoDone = OnSerialWriteComplete; lpo->onIoDone = OnSerialWriteComplete;
lpo->trans_buf = buff; lpo->trans_buf = buff;
lpo->flRawMem = 1; lpo->flRawMem = 1;
rc = WriteFile(lpo->io.hnd, lpo->trans_buf, count, rc = WriteFile(lpo->io.hnd, lpo->trans_buf, count,
NULL, &lpo->ol); NULL, &lpo->ol);
if (rc || IoResultCheck(GetLastError(), lpo, msg)) if (rc || IoResultCheck(GetLastError(), lpo, msgh))
return count; /* normal/success return */ return count; /* normal/success return */
errno = EBADF; errno = EBADF;
return -1; return -1;
fail: fail:
IoCtxFree(lpo); IoCtxFree(lpo);
IOCPLPoolFree(buff, dmsg); IOCPLPoolFree(buff, dmsg);
return -1; return -1;
} }
skipping to change at line 1251 skipping to change at line 1262
static void static void
OnSerialWriteComplete( OnSerialWriteComplete(
ULONG_PTR key, ULONG_PTR key,
IoCtx_t * lpo IoCtx_t * lpo
) )
{ {
/* This is really trivial: Let 'getRioFromIoCtx()' do all the /* This is really trivial: Let 'getRioFromIoCtx()' do all the
* error processing, and it returns with a valid RIO, just * error processing, and it returns with a valid RIO, just
* drop the complete context. * drop the complete context.
*/ */
static const char * const msg = static const char * const msgh =
"OnSerialWriteComplete: serial output failed"; "OnSerialWriteComplete: serial output failed";
if (NULL != getRioFromIoCtx(lpo, key, msg)) if (NULL != getRioFromIoCtx(lpo, key, msgh))
IoCtxRelease(lpo); IoCtxRelease(lpo);
} }
/* /*
* ------------------------------------------------------------------- * -------------------------------------------------------------------
* Serial IO stuff * Serial IO stuff
* *
* Part 5 -- read PPS time stamps * Part 5 -- read PPS time stamps
* *
* ------------------------------------------------------------------- * -------------------------------------------------------------------
skipping to change at line 1352 skipping to change at line 1363
io_completion_port_add_clock_io( io_completion_port_add_clock_io(
RIO_t *rio RIO_t *rio
) )
{ {
static const char * const msgh = static const char * const msgh =
"io_completion_port_add_clock_io"; "io_completion_port_add_clock_io";
IoCtx_t * lpo; IoCtx_t * lpo;
HANDLE h; HANDLE h;
IoHndPad_T * iopad = NULL; IoHndPad_T * iopad = NULL;
recvbuf_t * rbuf;
/* preset to clear state for error cleanup:*/ /* preset to clear state for error cleanup:*/
rio->ioreg_ctx = NULL; rio->ioreg_ctx = NULL;
rio->device_ctx = NULL; rio->device_ctx = NULL;
h = (HANDLE)_get_osfhandle(rio->fd); h = (HANDLE)_get_osfhandle(rio->fd);
if (h == INVALID_HANDLE_VALUE) { if (h == INVALID_HANDLE_VALUE) {
msyslog(LOG_ERR, "%s: COM port FD not valid", msyslog(LOG_ERR, "%s: COM port FD not valid",
msgh); msgh);
goto fail; goto fail;
skipping to change at line 1380 skipping to change at line 1392
iopad->riofd = rio->fd; iopad->riofd = rio->fd;
iopad->rsrc.rio = rio; iopad->rsrc.rio = rio;
if (NULL == (rio->device_ctx = DevCtxAttach(serial_devctx(h)))) { if (NULL == (rio->device_ctx = DevCtxAttach(serial_devctx(h)))) {
msyslog(LOG_ERR, "%s: Failed to allocate device context", msyslog(LOG_ERR, "%s: Failed to allocate device context",
msgh); msgh);
goto fail; goto fail;
} }
if (NULL == (lpo = IoCtxAlloc(iopad, rio->device_ctx))) { if (NULL == (lpo = IoCtxAlloc(iopad, rio->device_ctx))) {
msyslog(LOG_ERR, "%: Failed to allocate IO context", msyslog(LOG_ERR, "%: no IO context: %m", msgh);
msgh);
goto fail; goto fail;
} }
if ( ! CreateIoCompletionPort(h, hndIOCPLPort, (ULONG_PTR)rio, 0)) { if ( ! CreateIoCompletionPort(h, hndIOCPLPort, (ULONG_PTR)rio, 0)) {
msyslog(LOG_ERR, "%s: Can't add COM port to i/o completion port: %m", msyslog(LOG_ERR, "%s: Can't add COM port to i/o completion port: %m",
msgh); msgh);
goto fail; goto fail;
} }
lpo->io.hnd = h; lpo->io.hnd = h;
memset(&lpo->aux, 0, sizeof(lpo->aux)); memset(&lpo->aux, 0, sizeof(lpo->aux));
return QueueSerialWait(lpo, get_free_recv_buffer_alloc()); if (NULL == (rbuf = get_free_recv_buffer_alloc(TRUE))) {
msyslog(LOG_ERR, "%s: no receive buffer: %m", msgh);
goto fail;
}
return QueueSerialWait(lpo, rbuf);
fail: fail:
rio->ioreg_ctx = iohpDetach(rio->ioreg_ctx); rio->ioreg_ctx = iohpDetach(rio->ioreg_ctx);
rio->device_ctx = DevCtxDetach(rio->device_ctx); rio->device_ctx = DevCtxDetach(rio->device_ctx);
return FALSE; return FALSE;
} }
/* ----------------------------------------------------------------- */ /* ----------------------------------------------------------------- */
static void static void
OnSerialDetach( OnSerialDetach(
skipping to change at line 1454 skipping to change at line 1469
* ReadFile() is less efficient. Also, WSARecvFrom delivers * ReadFile() is less efficient. Also, WSARecvFrom delivers
* the remote network address. With ReadFile, getting this * the remote network address. With ReadFile, getting this
* becomes a chore. * becomes a chore.
*/ */
static BOOL __fastcall static BOOL __fastcall
QueueSocketRecv( QueueSocketRecv(
IoCtx_t * lpo, IoCtx_t * lpo,
recvbuf_t * buff recvbuf_t * buff
) )
{ {
static const char * const msg = static const char * const msgh =
"QueueSocketRecv: cannot schedule socket receive"; "QueueSocketRecv: cannot schedule socket receive";
WSABUF wsabuf; WSABUF wsabuf;
int rc; int rc;
lpo->onIoDone = OnSocketRecv; lpo->onIoDone = OnSocketRecv;
lpo->recv_buf = buff; lpo->recv_buf = buff;
lpo->flRawMem = 0; lpo->flRawMem = 0;
lpo->ioFlags = 0; lpo->ioFlags = 0;
skipping to change at line 1476 skipping to change at line 1491
buff->recv_srcadr_len = sizeof(buff->recv_srcadr); buff->recv_srcadr_len = sizeof(buff->recv_srcadr);
buff->receiver = receive; buff->receiver = receive;
buff->dstadr = lpo->iopad->rsrc.ept; buff->dstadr = lpo->iopad->rsrc.ept;
wsabuf.buf = (char *)buff->recv_buffer; wsabuf.buf = (char *)buff->recv_buffer;
wsabuf.len = sizeof(buff->recv_buffer); wsabuf.len = sizeof(buff->recv_buffer);
rc = WSARecvFrom(lpo->io.sfd, &wsabuf, 1, NULL, &lpo->ioFlags, rc = WSARecvFrom(lpo->io.sfd, &wsabuf, 1, NULL, &lpo->ioFlags,
&buff->recv_srcadr.sa, &buff->recv_srcadr_len, &buff->recv_srcadr.sa, &buff->recv_srcadr_len,
&lpo->ol, NULL); &lpo->ol, NULL);
return !rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msg); return !rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msgh);
} }
/* ----------------------------------------------------------------- */ /* ----------------------------------------------------------------- */
static void static void
OnSocketRecv( OnSocketRecv(
ULONG_PTR key, ULONG_PTR key,
IoCtx_t * lpo IoCtx_t * lpo
) )
{ {
static const char * const msg = static const char * const msgh =
"OnSocketRecv: receive from socket failed"; "OnSocketRecv: receive from socket failed";
recvbuf_t * buff = NULL; recvbuf_t * buff = NULL;
IoHndPad_T * iopad = NULL; IoHndPad_T * iopad = NULL;
endpt * ep = NULL; endpt * ep = NULL;
int rc; int rc;
/* order is important -- check first, then get endpoint! */ /* order is important -- check first, then get endpoint! */
rc = socketErrorCheck(lpo, msg); rc = socketErrorCheck(lpo, msgh);
ep = getEndptFromIoCtx(lpo, key); ep = getEndptFromIoCtx(lpo, key);
/* Make sure this endpoint is not closed. */ /* Make sure this endpoint is not closed. */
if (ep == NULL) if (ep == NULL)
return; return;
/* We want to start a new read before we process the buffer. /* We want to start a new read before we process the buffer.
* Since we must not use the context object once it is in * Since we must not use the context object once it is in
* another IO, we go through some pains to read everything * another IO, we go through some pains to read everything
* before going out for another read request. * before going out for another read request.
* We also need an extra hold to the IOPAD structure. * We also need an extra hold to the IOPAD structure.
*/ */
iopad = iohpAttach(lpo->iopad); iopad = iohpAttach(lpo->iopad);
if (rc == PKT_OK && lpo->byteCount > 0) { if (rc == PKT_OK && lpo->byteCount > 0) {
/* keep input buffer, create new one for IO */ /* keep input buffer, create new one for IO */
buff = lpo->recv_buf; buff = lpo->recv_buf;
lpo->recv_buf = get_free_recv_buffer_alloc(); lpo->recv_buf = get_free_recv_buffer_alloc(FALSE);
if (lpo->recv_buf) {
buff->recv_time = lpo->aux.RecvTime; buff->recv_time = lpo->aux.RecvTime;
buff->recv_length = (int)lpo->byteCount; buff->recv_length = (int)lpo->byteCount;
} else {
lpo->recv_buf = buff;
buff = NULL;
++packets_dropped; /* maybe atomic? */
}
} /* Note: else we use the current buffer again */ } /* Note: else we use the current buffer again */
if (rc != PKT_SOCKET_ERROR) { if (rc != PKT_SOCKET_ERROR) {
IoCtxStartChecked(lpo, QueueSocketRecv, lpo->recv_buf); IoCtxStartChecked(lpo, QueueSocketRecv, lpo->recv_buf);
} else { } else {
freerecvbuf(lpo->recv_buf); freerecvbuf(lpo->recv_buf);
IoCtxFree(lpo); IoCtxFree(lpo);
} }
/* below this, any usage of 'lpo' is invalid! */ /* below this, any usage of 'lpo' is invalid! */
/* If we have a buffer, do some bookkeeping and other chores, /* If we have a buffer, do some bookkeeping and other chores,
* then feed it to the input queue. And we can be sure we have * then feed it to the input queue. And we can be sure we have
* a packet here, so we can update the stats. * a packet here, so we can update the stats.
*/ */
if (buff != NULL) { if (buff) {
INSIST(buff->recv_srcadr_len <= sizeof(buff->recv_srcadr)); INSIST(buff->recv_srcadr_len <= sizeof(buff->recv_srcadr));
DPRINTF(4, ("%sfd %d %s recv packet mode is %d\n", DPRINTF(4, ("%sfd %d %s recv packet mode is %d\n",
(MODE_BROADCAST == get_packet_mode(buff)) (MODE_BROADCAST == get_packet_mode(buff))
? " **** Broadcast " ? " **** Broadcast "
: "", : "",
(int)buff->fd, stoa(&buff->recv_srcadr), (int)buff->fd, stoa(&buff->recv_srcadr),
get_packet_mode(buff))); get_packet_mode(buff)));
if (iohpEndPointOK(iopad)) { if (iohpEndPointOK(iopad)) {
InterlockedIncrement(&ep->received); InterlockedIncrement(&ep->received);
skipping to change at line 1562 skipping to change at line 1582
} }
/* ----------------------------------------------------------------- */ /* ----------------------------------------------------------------- */
static void static void
OnSocketSend( OnSocketSend(
ULONG_PTR key, ULONG_PTR key,
IoCtx_t * lpo IoCtx_t * lpo
) )
{ {
/* this is somewhat easier: */ /* this is somewhat easier: */
static const char * const msg = static const char * const msgh =
"OnSocketSend: send to socket failed"; "OnSocketSend: send to socket failed";
endpt * ep = NULL; endpt * ep = NULL;
int rc; int rc;
/* order is important -- check first, then get endpoint! */ /* order is important -- check first, then get endpoint! */
rc = socketErrorCheck(lpo, msg); rc = socketErrorCheck(lpo, msgh);
ep = getEndptFromIoCtx(lpo, key); ep = getEndptFromIoCtx(lpo, key);
/* Make sure this endpoint is not closed. */ /* Make sure this endpoint is not closed. */
if (ep == NULL) if (ep == NULL)
return; return;
if (rc != PKT_OK) { if (rc != PKT_OK) {
InterlockedIncrement(&ep->notsent); InterlockedIncrement(&ep->notsent);
InterlockedDecrement(&ep->sent); InterlockedDecrement(&ep->sent);
InterlockedIncrement(&packets_notsent); InterlockedIncrement(&packets_notsent);
skipping to change at line 1670 skipping to change at line 1690
BOOL BOOL
io_completion_port_add_socket( io_completion_port_add_socket(
SOCKET sfd, SOCKET sfd,
endpt * ep, endpt * ep,
BOOL bcast BOOL bcast
) )
{ {
/* Assume the endpoint is already registered. Set the socket /* Assume the endpoint is already registered. Set the socket
* handle into the proper slot, and then start up the IO engine. * handle into the proper slot, and then start up the IO engine.
*/ */
static const char * const msg = static const char * const msgh =
"Can't add socket to i/o completion port"; "Can't add socket to i/o completion port";
IoCtx_t * lpo; IoCtx_t * lpo;
size_t n; size_t n;
ULONG_PTR key; ULONG_PTR key;
IoHndPad_T * iopad = NULL; IoHndPad_T * iopad = NULL;
recvbuf_t * rbuf;
key = ((ULONG_PTR)ep & ~(ULONG_PTR)1u) + !!bcast; key = ((ULONG_PTR)ep & ~(ULONG_PTR)1u) + !!bcast;
if (NULL == (iopad = (IoHndPad_T*)ep->ioreg_ctx)) { if (NULL == (iopad = (IoHndPad_T*)ep->ioreg_ctx)) {
msyslog(LOG_CRIT, "io_completion_port_add_socket: endpt = %p not registered, exiting", msyslog(LOG_CRIT, "io_completion_port_add_socket: endpt = %p not registered, exiting",
ep); ep);
exit(1); exit(1);
} else { } else {
endpt * rep = iopad->rsrc.ept; endpt * rep = iopad->rsrc.ept;
iopad->handles[!!bcast] = (HANDLE)sfd; iopad->handles[!!bcast] = (HANDLE)sfd;
INSIST(rep == ep); INSIST(rep == ep);
} }
if (NULL == CreateIoCompletionPort((HANDLE)sfd, if (NULL == CreateIoCompletionPort((HANDLE)sfd,
hndIOCPLPort, key, 0)) hndIOCPLPort, key, 0))
{ {
msyslog(LOG_ERR, "%s: %m", msg); msyslog(LOG_ERR, "%s: %m", msgh);
goto fail; goto fail;
} }
for (n = s_SockRecvSched; n > 0; --n) { for (n = s_SockRecvSched; n > 0; --n) {
if (NULL == (lpo = IoCtxAlloc(ep->ioreg_ctx, NULL))) { if (NULL == (lpo = IoCtxAlloc(ep->ioreg_ctx, NULL))) {
msyslog(LOG_ERR, "%s: no read buffer: %m", msg); msyslog(LOG_ERR, "%s: no IO context: %m", msgh);
goto fail; goto fail;
} }
lpo->io.sfd = sfd; lpo->io.sfd = sfd;
if (!QueueSocketRecv(lpo, get_free_recv_buffer_alloc())) if (NULL == (rbuf = get_free_recv_buffer_alloc(FALSE))) {
msyslog(LOG_ERR, "%s: no receive buffer: %m", msgh);
goto fail;
}
if (!QueueSocketRecv(lpo, rbuf))
goto fail; goto fail;
} }
return TRUE; return TRUE;
fail: fail:
ep->ioreg_ctx = iohpDetach(ep->ioreg_ctx); ep->ioreg_ctx = iohpDetach(ep->ioreg_ctx);
return FALSE; return FALSE;
} }
/* ----------------------------------------------------------------- */ /* ----------------------------------------------------------------- */
void void
skipping to change at line 1748 skipping to change at line 1773
*/ */
int int
io_completion_port_sendto( io_completion_port_sendto(
endpt * ep, endpt * ep,
SOCKET sfd, SOCKET sfd,
void * pkt, void * pkt,
size_t len, size_t len,
sockaddr_u * dest sockaddr_u * dest
) )
{ {
static const char * const msg = static const char * const msgh =
"sendto: cannot schedule socket send"; "sendto: cannot schedule socket send";
static const char * const dmsg = static const char * const dmsg =
"overlapped IO data buffer"; "overlapped IO data buffer";
IoCtx_t * lpo = NULL; IoCtx_t * lpo = NULL;
void * dbuf = NULL; void * dbuf = NULL;
WSABUF wsabuf; WSABUF wsabuf;
int rc; int rc;
if (len > INT_MAX) if (len > INT_MAX)
skipping to change at line 1781 skipping to change at line 1806
lpo->trans_buf = dbuf; lpo->trans_buf = dbuf;
lpo->flRawMem = 1; lpo->flRawMem = 1;
lpo->io.sfd = sfd; lpo->io.sfd = sfd;
wsabuf.buf = (void*)lpo->trans_buf; wsabuf.buf = (void*)lpo->trans_buf;
wsabuf.len = (DWORD)len; wsabuf.len = (DWORD)len;
rc = WSASendTo(sfd, &wsabuf, 1, NULL, 0, rc = WSASendTo(sfd, &wsabuf, 1, NULL, 0,
&dest->sa, SOCKLEN(dest), &dest->sa, SOCKLEN(dest),
&lpo->ol, NULL); &lpo->ol, NULL);
if (!rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msg)) if (!rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msgh))
return (int)len; /* normal/success return */ return (int)len; /* normal/success return */
errno = EBADF; errno = EBADF;
return -1; return -1;
fail: fail:
IoCtxFree(lpo); IoCtxFree(lpo);
IOCPLPoolFree(dbuf, dmsg); IOCPLPoolFree(dbuf, dmsg);
return -1; return -1;
} }
 End of changes. 35 change blocks. 
38 lines changed or deleted 63 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)