ntp_iocompletionport.c (ntp-4.2.8p14) | : | ntp_iocompletionport.c (ntp-4.2.8p15) | ||
---|---|---|---|---|
skipping to change at line 699 | skipping to change at line 699 | |||
* So, for the time being, we have one request cycling... | * So, for the time being, we have one request cycling... | |||
* ------------------------------------------------------------------- | * ------------------------------------------------------------------- | |||
*/ | */ | |||
static BOOL __fastcall | static BOOL __fastcall | |||
QueueSerialWait( | QueueSerialWait( | |||
IoCtx_t * lpo, | IoCtx_t * lpo, | |||
recvbuf_t * buff | recvbuf_t * buff | |||
) | ) | |||
{ | { | |||
static const char * const msg = | static const char * const msgh = | |||
"QueueSerialWait: cannot wait for COM event"; | "QueueSerialWait: cannot wait for COM event"; | |||
BOOL rc; | BOOL rc; | |||
lpo->onIoDone = OnSerialWaitComplete; | lpo->onIoDone = OnSerialWaitComplete; | |||
lpo->recv_buf = buff; | lpo->recv_buf = buff; | |||
lpo->flRawMem = 0; | lpo->flRawMem = 0; | |||
buff->fd = lpo->iopad->riofd; | buff->fd = lpo->iopad->riofd; | |||
/* keep receive position for continuation of partial lines! */ | /* keep receive position for continuation of partial lines! */ | |||
rc = WaitCommEvent(lpo->io.hnd, &lpo->aux.com_events, &lpo->ol); | rc = WaitCommEvent(lpo->io.hnd, &lpo->aux.com_events, &lpo->ol); | |||
return rc || IoResultCheck(GetLastError(), lpo, msg); | return rc || IoResultCheck(GetLastError(), lpo, msgh); | |||
} | } | |||
/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ | /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ | |||
static void | static void | |||
OnSerialWaitComplete( | OnSerialWaitComplete( | |||
ULONG_PTR key, | ULONG_PTR key, | |||
IoCtx_t * lpo | IoCtx_t * lpo | |||
) | ) | |||
{ | { | |||
static const char * const msg = | static const char * const msgh = | |||
"OnSerialWaitComplete: wait for COM event failed"; | "OnSerialWaitComplete: wait for COM event failed"; | |||
DevCtx_t * dev; | DevCtx_t * dev; | |||
PPSDataEx_t * ppsbuf; | PPSDataEx_t * ppsbuf; | |||
DWORD modem_status; | DWORD modem_status; | |||
u_long covc; | u_long covc; | |||
/* Make sure this RIO is not closed. */ | /* Make sure this RIO is not closed. */ | |||
if (NULL == getRioFromIoCtx(lpo, key, msg)) | if (NULL == getRioFromIoCtx(lpo, key, msgh)) | |||
return; | return; | |||
/* start next IO and leave if we hit an error */ | /* start next IO and leave if we hit an error */ | |||
if (lpo->errCode != ERROR_SUCCESS) { | if (lpo->errCode != ERROR_SUCCESS) { | |||
memset(&lpo->aux, 0, sizeof(lpo->aux)); | memset(&lpo->aux, 0, sizeof(lpo->aux)); | |||
IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf); | IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf); | |||
return; | return; | |||
} | } | |||
#ifdef DEBUG | #ifdef DEBUG | |||
skipping to change at line 838 | skipping to change at line 838 | |||
* | * | |||
* common for both modes | * common for both modes | |||
* ------------------------------------------------------------------- | * ------------------------------------------------------------------- | |||
*/ | */ | |||
static BOOL __fastcall | static BOOL __fastcall | |||
QueueSerialReadCommon( | QueueSerialReadCommon( | |||
IoCtx_t * lpo, | IoCtx_t * lpo, | |||
recvbuf_t * buff | recvbuf_t * buff | |||
) | ) | |||
{ | { | |||
static const char * const msg = | static const char * const msgh = | |||
"QueueSerialRead: cannot schedule device read"; | "QueueSerialRead: cannot schedule device read"; | |||
BOOL rc; | BOOL rc; | |||
/* 'lpo->onIoDone' must be set already! */ | /* 'lpo->onIoDone' must be set already! */ | |||
lpo->recv_buf = buff; | lpo->recv_buf = buff; | |||
lpo->flRawMem = 0; | lpo->flRawMem = 0; | |||
/* 'buff->recv_length' must be set already! */ | /* 'buff->recv_length' must be set already! */ | |||
buff->fd = lpo->iopad->riofd; | buff->fd = lpo->iopad->riofd; | |||
buff->dstadr = NULL; | buff->dstadr = NULL; | |||
buff->receiver = process_refclock_packet; | buff->receiver = process_refclock_packet; | |||
buff->recv_peer = lpo->iopad->rsrc.rio->srcclock; | buff->recv_peer = lpo->iopad->rsrc.rio->srcclock; | |||
rc = ReadFile(lpo->io.hnd, | rc = ReadFile(lpo->io.hnd, | |||
(char*)buff->recv_buffer + buff->recv_length, | (char*)buff->recv_buffer + buff->recv_length, | |||
sizeof(buff->recv_buffer) - buff->recv_length, | sizeof(buff->recv_buffer) - buff->recv_length, | |||
NULL, &lpo->ol); | NULL, &lpo->ol); | |||
return rc || IoResultCheck(GetLastError(), lpo, msg); | return rc || IoResultCheck(GetLastError(), lpo, msgh); | |||
} | } | |||
/* | /* | |||
* ------------------------------------------------------------------- | * ------------------------------------------------------------------- | |||
* Serial IO stuff | * Serial IO stuff | |||
* | * | |||
* Part 2 -- line discipline emulation | * Part 2 -- line discipline emulation | |||
* | * | |||
* Ideally this should *not* be done in the IO completion thread. | * Ideally this should *not* be done in the IO completion thread. | |||
* We use a worker pool thread to offload the low-level processing. | * We use a worker pool thread to offload the low-level processing. | |||
skipping to change at line 895 | skipping to change at line 895 | |||
/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - | /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - | |||
* IO completion thread callback. Takes a time stamp and offloads the | * IO completion thread callback. Takes a time stamp and offloads the | |||
* real work to the worker pool ASAP. | * real work to the worker pool ASAP. | |||
*/ | */ | |||
static void | static void | |||
OnSerialReadComplete( | OnSerialReadComplete( | |||
ULONG_PTR key, | ULONG_PTR key, | |||
IoCtx_t * lpo | IoCtx_t * lpo | |||
) | ) | |||
{ | { | |||
static const char * const msg = | static const char * const msgh = | |||
"OnSerialReadComplete: read from device failed"; | "OnSerialReadComplete: read from device failed"; | |||
/* Make sure this RIO is not closed. */ | /* Make sure this RIO is not closed. */ | |||
if (NULL == getRioFromIoCtx(lpo, key, msg)) | if (NULL == getRioFromIoCtx(lpo, key, msgh)) | |||
return; | return; | |||
/* start next IO and leave if we hit an error */ | /* start next IO and leave if we hit an error */ | |||
if (lpo->errCode != ERROR_SUCCESS) | if (lpo->errCode != ERROR_SUCCESS) | |||
goto wait_again; | goto wait_again; | |||
/* Offload to worker pool, if there is data */ | /* Offload to worker pool, if there is data */ | |||
if (lpo->byteCount == 0) | if (lpo->byteCount == 0) | |||
goto wait_again; | goto wait_again; | |||
skipping to change at line 1025 | skipping to change at line 1025 | |||
* implementation is very low-level to gather speed. | * implementation is very low-level to gather speed. | |||
*/ | */ | |||
buff->recv_length += (int)lpo->byteCount; | buff->recv_length += (int)lpo->byteCount; | |||
sptr = (char *)buff->recv_buffer; | sptr = (char *)buff->recv_buffer; | |||
send = sptr + buff->recv_length; | send = sptr + buff->recv_length; | |||
if (sptr == send) | if (sptr == send) | |||
goto st_read_fresh; | goto st_read_fresh; | |||
st_new_obuf: | st_new_obuf: | |||
/* Get new receive buffer to store the line. */ | /* Get new receive buffer to store the line. */ | |||
obuf = get_free_recv_buffer_alloc(); | obuf = get_free_recv_buffer_alloc(TRUE); | |||
if (!obuf) { | ||||
++packets_dropped; /* maybe atomic? */ | ||||
buff->recv_length = 0; | ||||
goto st_read_fresh; | ||||
} | ||||
obuf->fd = buff->fd; | obuf->fd = buff->fd; | |||
obuf->receiver = buff->receiver; | obuf->receiver = buff->receiver; | |||
obuf->dstadr = NULL; | obuf->dstadr = NULL; | |||
obuf->recv_peer = buff->recv_peer; | obuf->recv_peer = buff->recv_peer; | |||
set_serial_recv_time(obuf, lpo); | set_serial_recv_time(obuf, lpo); | |||
st_copy_start: | st_copy_start: | |||
/* Copy data to new buffer, convert CR to LF on the fly. | /* Copy data to new buffer, convert CR to LF on the fly. | |||
* Stop after either. | * Stop after either. | |||
*/ | */ | |||
skipping to change at line 1143 | skipping to change at line 1148 | |||
/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - | /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - | |||
* IO completion thread callback. Takes a time stamp and offloads the | * IO completion thread callback. Takes a time stamp and offloads the | |||
* real work to the worker pool ASAP. | * real work to the worker pool ASAP. | |||
*/ | */ | |||
static void | static void | |||
OnRawSerialReadComplete( | OnRawSerialReadComplete( | |||
ULONG_PTR key, | ULONG_PTR key, | |||
IoCtx_t * lpo | IoCtx_t * lpo | |||
) | ) | |||
{ | { | |||
static const char * const msg = | static const char * const msgh = | |||
"OnRawSerialReadComplete: read from device failed"; | "OnRawSerialReadComplete: read from device failed"; | |||
recvbuf_t * buff = lpo->recv_buf; | recvbuf_t * buff = lpo->recv_buf; | |||
RIO_t * rio = getRioFromIoCtx(lpo, key, msg); | RIO_t * rio = getRioFromIoCtx(lpo, key, msgh); | |||
/* Make sure this RIO is not closed. */ | /* Make sure this RIO is not closed. */ | |||
if (rio == NULL) | if (rio == NULL) | |||
return; | return; | |||
/* start next IO and leave if we hit an error */ | /* start next IO and leave if we hit an error */ | |||
if (lpo->errCode == ERROR_SUCCESS && lpo->byteCount > 0) { | if (lpo->errCode == ERROR_SUCCESS && lpo->byteCount > 0) { | |||
buff->recv_length = (int)lpo->byteCount; | buff->recv_length = (int)lpo->byteCount; | |||
set_serial_recv_time(buff, lpo); | set_serial_recv_time(buff, lpo); | |||
iohpQueueLocked(lpo->iopad, iohpRefClockOK, buff); | lpo->recv_buf = get_free_recv_buffer_alloc(TRUE); | |||
buff = get_free_recv_buffer_alloc(); | if (lpo->recv_buf) { | |||
iohpQueueLocked(lpo->iopad, iohpRefClockOK, buff); | ||||
} else { | ||||
++packets_dropped; /* maybe atomic? */ | ||||
buff->recv_length = 0; | ||||
lpo->recv_buf = buff; | ||||
} | ||||
} | } | |||
IoCtxStartChecked(lpo, QueueSerialWait, buff); | IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf); | |||
} | } | |||
static void | static void | |||
set_serial_recv_time( | set_serial_recv_time( | |||
recvbuf_t * obuf, | recvbuf_t * obuf, | |||
IoCtx_t * lpo | IoCtx_t * lpo | |||
) | ) | |||
{ | { | |||
/* Time stamp assignment is interesting. If we | /* Time stamp assignment is interesting. If we | |||
* have a DCD stamp, we use it, otherwise we use | * have a DCD stamp, we use it, otherwise we use | |||
skipping to change at line 1204 | skipping to change at line 1215 | |||
/* | /* | |||
* async_write, clone of write(), used by some reflock drivers | * async_write, clone of write(), used by some reflock drivers | |||
*/ | */ | |||
int | int | |||
async_write( | async_write( | |||
int fd, | int fd, | |||
const void * data, | const void * data, | |||
unsigned int count | unsigned int count | |||
) | ) | |||
{ | { | |||
static const char * const msg = | static const char * const msgh = | |||
"async_write: cannot schedule device write"; | "async_write: cannot schedule device write"; | |||
static const char * const dmsg = | static const char * const dmsg = | |||
"overlapped IO data buffer"; | "overlapped IO data buffer"; | |||
IoCtx_t * lpo = NULL; | IoCtx_t * lpo = NULL; | |||
void * buff = NULL; | void * buff = NULL; | |||
HANDLE hnd = NULL; | HANDLE hnd = NULL; | |||
BOOL rc; | BOOL rc; | |||
hnd = (HANDLE)_get_osfhandle(fd); | hnd = (HANDLE)_get_osfhandle(fd); | |||
skipping to change at line 1229 | skipping to change at line 1240 | |||
if (NULL == (lpo = IoCtxAlloc(NULL, NULL))) | if (NULL == (lpo = IoCtxAlloc(NULL, NULL))) | |||
goto fail; | goto fail; | |||
lpo->io.hnd = hnd; | lpo->io.hnd = hnd; | |||
lpo->onIoDone = OnSerialWriteComplete; | lpo->onIoDone = OnSerialWriteComplete; | |||
lpo->trans_buf = buff; | lpo->trans_buf = buff; | |||
lpo->flRawMem = 1; | lpo->flRawMem = 1; | |||
rc = WriteFile(lpo->io.hnd, lpo->trans_buf, count, | rc = WriteFile(lpo->io.hnd, lpo->trans_buf, count, | |||
NULL, &lpo->ol); | NULL, &lpo->ol); | |||
if (rc || IoResultCheck(GetLastError(), lpo, msg)) | if (rc || IoResultCheck(GetLastError(), lpo, msgh)) | |||
return count; /* normal/success return */ | return count; /* normal/success return */ | |||
errno = EBADF; | errno = EBADF; | |||
return -1; | return -1; | |||
fail: | fail: | |||
IoCtxFree(lpo); | IoCtxFree(lpo); | |||
IOCPLPoolFree(buff, dmsg); | IOCPLPoolFree(buff, dmsg); | |||
return -1; | return -1; | |||
} | } | |||
skipping to change at line 1251 | skipping to change at line 1262 | |||
static void | static void | |||
OnSerialWriteComplete( | OnSerialWriteComplete( | |||
ULONG_PTR key, | ULONG_PTR key, | |||
IoCtx_t * lpo | IoCtx_t * lpo | |||
) | ) | |||
{ | { | |||
/* This is really trivial: Let 'getRioFromIoCtx()' do all the | /* This is really trivial: Let 'getRioFromIoCtx()' do all the | |||
* error processing, and it returns with a valid RIO, just | * error processing, and it returns with a valid RIO, just | |||
* drop the complete context. | * drop the complete context. | |||
*/ | */ | |||
static const char * const msg = | static const char * const msgh = | |||
"OnSerialWriteComplete: serial output failed"; | "OnSerialWriteComplete: serial output failed"; | |||
if (NULL != getRioFromIoCtx(lpo, key, msg)) | if (NULL != getRioFromIoCtx(lpo, key, msgh)) | |||
IoCtxRelease(lpo); | IoCtxRelease(lpo); | |||
} | } | |||
/* | /* | |||
* ------------------------------------------------------------------- | * ------------------------------------------------------------------- | |||
* Serial IO stuff | * Serial IO stuff | |||
* | * | |||
* Part 5 -- read PPS time stamps | * Part 5 -- read PPS time stamps | |||
* | * | |||
* ------------------------------------------------------------------- | * ------------------------------------------------------------------- | |||
skipping to change at line 1352 | skipping to change at line 1363 | |||
io_completion_port_add_clock_io( | io_completion_port_add_clock_io( | |||
RIO_t *rio | RIO_t *rio | |||
) | ) | |||
{ | { | |||
static const char * const msgh = | static const char * const msgh = | |||
"io_completion_port_add_clock_io"; | "io_completion_port_add_clock_io"; | |||
IoCtx_t * lpo; | IoCtx_t * lpo; | |||
HANDLE h; | HANDLE h; | |||
IoHndPad_T * iopad = NULL; | IoHndPad_T * iopad = NULL; | |||
recvbuf_t * rbuf; | ||||
/* preset to clear state for error cleanup:*/ | /* preset to clear state for error cleanup:*/ | |||
rio->ioreg_ctx = NULL; | rio->ioreg_ctx = NULL; | |||
rio->device_ctx = NULL; | rio->device_ctx = NULL; | |||
h = (HANDLE)_get_osfhandle(rio->fd); | h = (HANDLE)_get_osfhandle(rio->fd); | |||
if (h == INVALID_HANDLE_VALUE) { | if (h == INVALID_HANDLE_VALUE) { | |||
msyslog(LOG_ERR, "%s: COM port FD not valid", | msyslog(LOG_ERR, "%s: COM port FD not valid", | |||
msgh); | msgh); | |||
goto fail; | goto fail; | |||
skipping to change at line 1380 | skipping to change at line 1392 | |||
iopad->riofd = rio->fd; | iopad->riofd = rio->fd; | |||
iopad->rsrc.rio = rio; | iopad->rsrc.rio = rio; | |||
if (NULL == (rio->device_ctx = DevCtxAttach(serial_devctx(h)))) { | if (NULL == (rio->device_ctx = DevCtxAttach(serial_devctx(h)))) { | |||
msyslog(LOG_ERR, "%s: Failed to allocate device context", | msyslog(LOG_ERR, "%s: Failed to allocate device context", | |||
msgh); | msgh); | |||
goto fail; | goto fail; | |||
} | } | |||
if (NULL == (lpo = IoCtxAlloc(iopad, rio->device_ctx))) { | if (NULL == (lpo = IoCtxAlloc(iopad, rio->device_ctx))) { | |||
msyslog(LOG_ERR, "%: Failed to allocate IO context", | msyslog(LOG_ERR, "%: no IO context: %m", msgh); | |||
msgh); | ||||
goto fail; | goto fail; | |||
} | } | |||
if ( ! CreateIoCompletionPort(h, hndIOCPLPort, (ULONG_PTR)rio, 0)) { | if ( ! CreateIoCompletionPort(h, hndIOCPLPort, (ULONG_PTR)rio, 0)) { | |||
msyslog(LOG_ERR, "%s: Can't add COM port to i/o completion port: %m", | msyslog(LOG_ERR, "%s: Can't add COM port to i/o completion port: %m", | |||
msgh); | msgh); | |||
goto fail; | goto fail; | |||
} | } | |||
lpo->io.hnd = h; | lpo->io.hnd = h; | |||
memset(&lpo->aux, 0, sizeof(lpo->aux)); | memset(&lpo->aux, 0, sizeof(lpo->aux)); | |||
return QueueSerialWait(lpo, get_free_recv_buffer_alloc()); | if (NULL == (rbuf = get_free_recv_buffer_alloc(TRUE))) { | |||
msyslog(LOG_ERR, "%s: no receive buffer: %m", msgh); | ||||
goto fail; | ||||
} | ||||
return QueueSerialWait(lpo, rbuf); | ||||
fail: | fail: | |||
rio->ioreg_ctx = iohpDetach(rio->ioreg_ctx); | rio->ioreg_ctx = iohpDetach(rio->ioreg_ctx); | |||
rio->device_ctx = DevCtxDetach(rio->device_ctx); | rio->device_ctx = DevCtxDetach(rio->device_ctx); | |||
return FALSE; | return FALSE; | |||
} | } | |||
/* ----------------------------------------------------------------- */ | /* ----------------------------------------------------------------- */ | |||
static void | static void | |||
OnSerialDetach( | OnSerialDetach( | |||
skipping to change at line 1454 | skipping to change at line 1469 | |||
* ReadFile() is less efficient. Also, WSARecvFrom delivers | * ReadFile() is less efficient. Also, WSARecvFrom delivers | |||
* the remote network address. With ReadFile, getting this | * the remote network address. With ReadFile, getting this | |||
* becomes a chore. | * becomes a chore. | |||
*/ | */ | |||
static BOOL __fastcall | static BOOL __fastcall | |||
QueueSocketRecv( | QueueSocketRecv( | |||
IoCtx_t * lpo, | IoCtx_t * lpo, | |||
recvbuf_t * buff | recvbuf_t * buff | |||
) | ) | |||
{ | { | |||
static const char * const msg = | static const char * const msgh = | |||
"QueueSocketRecv: cannot schedule socket receive"; | "QueueSocketRecv: cannot schedule socket receive"; | |||
WSABUF wsabuf; | WSABUF wsabuf; | |||
int rc; | int rc; | |||
lpo->onIoDone = OnSocketRecv; | lpo->onIoDone = OnSocketRecv; | |||
lpo->recv_buf = buff; | lpo->recv_buf = buff; | |||
lpo->flRawMem = 0; | lpo->flRawMem = 0; | |||
lpo->ioFlags = 0; | lpo->ioFlags = 0; | |||
skipping to change at line 1476 | skipping to change at line 1491 | |||
buff->recv_srcadr_len = sizeof(buff->recv_srcadr); | buff->recv_srcadr_len = sizeof(buff->recv_srcadr); | |||
buff->receiver = receive; | buff->receiver = receive; | |||
buff->dstadr = lpo->iopad->rsrc.ept; | buff->dstadr = lpo->iopad->rsrc.ept; | |||
wsabuf.buf = (char *)buff->recv_buffer; | wsabuf.buf = (char *)buff->recv_buffer; | |||
wsabuf.len = sizeof(buff->recv_buffer); | wsabuf.len = sizeof(buff->recv_buffer); | |||
rc = WSARecvFrom(lpo->io.sfd, &wsabuf, 1, NULL, &lpo->ioFlags, | rc = WSARecvFrom(lpo->io.sfd, &wsabuf, 1, NULL, &lpo->ioFlags, | |||
&buff->recv_srcadr.sa, &buff->recv_srcadr_len, | &buff->recv_srcadr.sa, &buff->recv_srcadr_len, | |||
&lpo->ol, NULL); | &lpo->ol, NULL); | |||
return !rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msg); | return !rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msgh); | |||
} | } | |||
/* ----------------------------------------------------------------- */ | /* ----------------------------------------------------------------- */ | |||
static void | static void | |||
OnSocketRecv( | OnSocketRecv( | |||
ULONG_PTR key, | ULONG_PTR key, | |||
IoCtx_t * lpo | IoCtx_t * lpo | |||
) | ) | |||
{ | { | |||
static const char * const msg = | static const char * const msgh = | |||
"OnSocketRecv: receive from socket failed"; | "OnSocketRecv: receive from socket failed"; | |||
recvbuf_t * buff = NULL; | recvbuf_t * buff = NULL; | |||
IoHndPad_T * iopad = NULL; | IoHndPad_T * iopad = NULL; | |||
endpt * ep = NULL; | endpt * ep = NULL; | |||
int rc; | int rc; | |||
/* order is important -- check first, then get endpoint! */ | /* order is important -- check first, then get endpoint! */ | |||
rc = socketErrorCheck(lpo, msg); | rc = socketErrorCheck(lpo, msgh); | |||
ep = getEndptFromIoCtx(lpo, key); | ep = getEndptFromIoCtx(lpo, key); | |||
/* Make sure this endpoint is not closed. */ | /* Make sure this endpoint is not closed. */ | |||
if (ep == NULL) | if (ep == NULL) | |||
return; | return; | |||
/* We want to start a new read before we process the buffer. | /* We want to start a new read before we process the buffer. | |||
* Since we must not use the context object once it is in | * Since we must not use the context object once it is in | |||
* another IO, we go through some pains to read everything | * another IO, we go through some pains to read everything | |||
* before going out for another read request. | * before going out for another read request. | |||
* We also need an extra hold to the IOPAD structure. | * We also need an extra hold to the IOPAD structure. | |||
*/ | */ | |||
iopad = iohpAttach(lpo->iopad); | iopad = iohpAttach(lpo->iopad); | |||
if (rc == PKT_OK && lpo->byteCount > 0) { | if (rc == PKT_OK && lpo->byteCount > 0) { | |||
/* keep input buffer, create new one for IO */ | /* keep input buffer, create new one for IO */ | |||
buff = lpo->recv_buf; | buff = lpo->recv_buf; | |||
lpo->recv_buf = get_free_recv_buffer_alloc(); | lpo->recv_buf = get_free_recv_buffer_alloc(FALSE); | |||
if (lpo->recv_buf) { | ||||
buff->recv_time = lpo->aux.RecvTime; | buff->recv_time = lpo->aux.RecvTime; | |||
buff->recv_length = (int)lpo->byteCount; | buff->recv_length = (int)lpo->byteCount; | |||
} else { | ||||
lpo->recv_buf = buff; | ||||
buff = NULL; | ||||
++packets_dropped; /* maybe atomic? */ | ||||
} | ||||
} /* Note: else we use the current buffer again */ | } /* Note: else we use the current buffer again */ | |||
if (rc != PKT_SOCKET_ERROR) { | if (rc != PKT_SOCKET_ERROR) { | |||
IoCtxStartChecked(lpo, QueueSocketRecv, lpo->recv_buf); | IoCtxStartChecked(lpo, QueueSocketRecv, lpo->recv_buf); | |||
} else { | } else { | |||
freerecvbuf(lpo->recv_buf); | freerecvbuf(lpo->recv_buf); | |||
IoCtxFree(lpo); | IoCtxFree(lpo); | |||
} | } | |||
/* below this, any usage of 'lpo' is invalid! */ | /* below this, any usage of 'lpo' is invalid! */ | |||
/* If we have a buffer, do some bookkeeping and other chores, | /* If we have a buffer, do some bookkeeping and other chores, | |||
* then feed it to the input queue. And we can be sure we have | * then feed it to the input queue. And we can be sure we have | |||
* a packet here, so we can update the stats. | * a packet here, so we can update the stats. | |||
*/ | */ | |||
if (buff != NULL) { | if (buff) { | |||
INSIST(buff->recv_srcadr_len <= sizeof(buff->recv_srcadr)); | INSIST(buff->recv_srcadr_len <= sizeof(buff->recv_srcadr)); | |||
DPRINTF(4, ("%sfd %d %s recv packet mode is %d\n", | DPRINTF(4, ("%sfd %d %s recv packet mode is %d\n", | |||
(MODE_BROADCAST == get_packet_mode(buff)) | (MODE_BROADCAST == get_packet_mode(buff)) | |||
? " **** Broadcast " | ? " **** Broadcast " | |||
: "", | : "", | |||
(int)buff->fd, stoa(&buff->recv_srcadr), | (int)buff->fd, stoa(&buff->recv_srcadr), | |||
get_packet_mode(buff))); | get_packet_mode(buff))); | |||
if (iohpEndPointOK(iopad)) { | if (iohpEndPointOK(iopad)) { | |||
InterlockedIncrement(&ep->received); | InterlockedIncrement(&ep->received); | |||
skipping to change at line 1562 | skipping to change at line 1582 | |||
} | } | |||
/* ----------------------------------------------------------------- */ | /* ----------------------------------------------------------------- */ | |||
static void | static void | |||
OnSocketSend( | OnSocketSend( | |||
ULONG_PTR key, | ULONG_PTR key, | |||
IoCtx_t * lpo | IoCtx_t * lpo | |||
) | ) | |||
{ | { | |||
/* this is somewhat easier: */ | /* this is somewhat easier: */ | |||
static const char * const msg = | static const char * const msgh = | |||
"OnSocketSend: send to socket failed"; | "OnSocketSend: send to socket failed"; | |||
endpt * ep = NULL; | endpt * ep = NULL; | |||
int rc; | int rc; | |||
/* order is important -- check first, then get endpoint! */ | /* order is important -- check first, then get endpoint! */ | |||
rc = socketErrorCheck(lpo, msg); | rc = socketErrorCheck(lpo, msgh); | |||
ep = getEndptFromIoCtx(lpo, key); | ep = getEndptFromIoCtx(lpo, key); | |||
/* Make sure this endpoint is not closed. */ | /* Make sure this endpoint is not closed. */ | |||
if (ep == NULL) | if (ep == NULL) | |||
return; | return; | |||
if (rc != PKT_OK) { | if (rc != PKT_OK) { | |||
InterlockedIncrement(&ep->notsent); | InterlockedIncrement(&ep->notsent); | |||
InterlockedDecrement(&ep->sent); | InterlockedDecrement(&ep->sent); | |||
InterlockedIncrement(&packets_notsent); | InterlockedIncrement(&packets_notsent); | |||
skipping to change at line 1670 | skipping to change at line 1690 | |||
BOOL | BOOL | |||
io_completion_port_add_socket( | io_completion_port_add_socket( | |||
SOCKET sfd, | SOCKET sfd, | |||
endpt * ep, | endpt * ep, | |||
BOOL bcast | BOOL bcast | |||
) | ) | |||
{ | { | |||
/* Assume the endpoint is already registered. Set the socket | /* Assume the endpoint is already registered. Set the socket | |||
* handle into the proper slot, and then start up the IO engine. | * handle into the proper slot, and then start up the IO engine. | |||
*/ | */ | |||
static const char * const msg = | static const char * const msgh = | |||
"Can't add socket to i/o completion port"; | "Can't add socket to i/o completion port"; | |||
IoCtx_t * lpo; | IoCtx_t * lpo; | |||
size_t n; | size_t n; | |||
ULONG_PTR key; | ULONG_PTR key; | |||
IoHndPad_T * iopad = NULL; | IoHndPad_T * iopad = NULL; | |||
recvbuf_t * rbuf; | ||||
key = ((ULONG_PTR)ep & ~(ULONG_PTR)1u) + !!bcast; | key = ((ULONG_PTR)ep & ~(ULONG_PTR)1u) + !!bcast; | |||
if (NULL == (iopad = (IoHndPad_T*)ep->ioreg_ctx)) { | if (NULL == (iopad = (IoHndPad_T*)ep->ioreg_ctx)) { | |||
msyslog(LOG_CRIT, "io_completion_port_add_socket: endpt = %p not registered, exiting", | msyslog(LOG_CRIT, "io_completion_port_add_socket: endpt = %p not registered, exiting", | |||
ep); | ep); | |||
exit(1); | exit(1); | |||
} else { | } else { | |||
endpt * rep = iopad->rsrc.ept; | endpt * rep = iopad->rsrc.ept; | |||
iopad->handles[!!bcast] = (HANDLE)sfd; | iopad->handles[!!bcast] = (HANDLE)sfd; | |||
INSIST(rep == ep); | INSIST(rep == ep); | |||
} | } | |||
if (NULL == CreateIoCompletionPort((HANDLE)sfd, | if (NULL == CreateIoCompletionPort((HANDLE)sfd, | |||
hndIOCPLPort, key, 0)) | hndIOCPLPort, key, 0)) | |||
{ | { | |||
msyslog(LOG_ERR, "%s: %m", msg); | msyslog(LOG_ERR, "%s: %m", msgh); | |||
goto fail; | goto fail; | |||
} | } | |||
for (n = s_SockRecvSched; n > 0; --n) { | for (n = s_SockRecvSched; n > 0; --n) { | |||
if (NULL == (lpo = IoCtxAlloc(ep->ioreg_ctx, NULL))) { | if (NULL == (lpo = IoCtxAlloc(ep->ioreg_ctx, NULL))) { | |||
msyslog(LOG_ERR, "%s: no read buffer: %m", msg); | msyslog(LOG_ERR, "%s: no IO context: %m", msgh); | |||
goto fail; | goto fail; | |||
} | } | |||
lpo->io.sfd = sfd; | lpo->io.sfd = sfd; | |||
if (!QueueSocketRecv(lpo, get_free_recv_buffer_alloc())) | if (NULL == (rbuf = get_free_recv_buffer_alloc(FALSE))) { | |||
msyslog(LOG_ERR, "%s: no receive buffer: %m", msgh); | ||||
goto fail; | ||||
} | ||||
if (!QueueSocketRecv(lpo, rbuf)) | ||||
goto fail; | goto fail; | |||
} | } | |||
return TRUE; | return TRUE; | |||
fail: | fail: | |||
ep->ioreg_ctx = iohpDetach(ep->ioreg_ctx); | ep->ioreg_ctx = iohpDetach(ep->ioreg_ctx); | |||
return FALSE; | return FALSE; | |||
} | } | |||
/* ----------------------------------------------------------------- */ | /* ----------------------------------------------------------------- */ | |||
void | void | |||
skipping to change at line 1748 | skipping to change at line 1773 | |||
*/ | */ | |||
int | int | |||
io_completion_port_sendto( | io_completion_port_sendto( | |||
endpt * ep, | endpt * ep, | |||
SOCKET sfd, | SOCKET sfd, | |||
void * pkt, | void * pkt, | |||
size_t len, | size_t len, | |||
sockaddr_u * dest | sockaddr_u * dest | |||
) | ) | |||
{ | { | |||
static const char * const msg = | static const char * const msgh = | |||
"sendto: cannot schedule socket send"; | "sendto: cannot schedule socket send"; | |||
static const char * const dmsg = | static const char * const dmsg = | |||
"overlapped IO data buffer"; | "overlapped IO data buffer"; | |||
IoCtx_t * lpo = NULL; | IoCtx_t * lpo = NULL; | |||
void * dbuf = NULL; | void * dbuf = NULL; | |||
WSABUF wsabuf; | WSABUF wsabuf; | |||
int rc; | int rc; | |||
if (len > INT_MAX) | if (len > INT_MAX) | |||
skipping to change at line 1781 | skipping to change at line 1806 | |||
lpo->trans_buf = dbuf; | lpo->trans_buf = dbuf; | |||
lpo->flRawMem = 1; | lpo->flRawMem = 1; | |||
lpo->io.sfd = sfd; | lpo->io.sfd = sfd; | |||
wsabuf.buf = (void*)lpo->trans_buf; | wsabuf.buf = (void*)lpo->trans_buf; | |||
wsabuf.len = (DWORD)len; | wsabuf.len = (DWORD)len; | |||
rc = WSASendTo(sfd, &wsabuf, 1, NULL, 0, | rc = WSASendTo(sfd, &wsabuf, 1, NULL, 0, | |||
&dest->sa, SOCKLEN(dest), | &dest->sa, SOCKLEN(dest), | |||
&lpo->ol, NULL); | &lpo->ol, NULL); | |||
if (!rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msg)) | if (!rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msgh)) | |||
return (int)len; /* normal/success return */ | return (int)len; /* normal/success return */ | |||
errno = EBADF; | errno = EBADF; | |||
return -1; | return -1; | |||
fail: | fail: | |||
IoCtxFree(lpo); | IoCtxFree(lpo); | |||
IOCPLPoolFree(dbuf, dmsg); | IOCPLPoolFree(dbuf, dmsg); | |||
return -1; | return -1; | |||
} | } | |||
End of changes. 35 change blocks. | ||||
38 lines changed or deleted | 63 lines changed or added |