"Fossies" - the Fresh Open Source Software Archive 
Member "ntp-4.2.8p15/ports/winnt/ntpd/ntp_iocompletionport.c" (23 Jun 2020, 52159 Bytes) of package /linux/misc/ntp-4.2.8p15.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style:
standard) with prefixed line numbers and
code folding option.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "ntp_iocompletionport.c" see the
Fossies "Dox" file reference documentation and the latest
Fossies "Diffs" side-by-side code changes report:
4.2.8p14_vs_4.2.8p15.
1 /*
2 -----------------------------------------------------------------------
3 This is the IO completion port handling for async/overlapped IO on
4 Windows >= Win2000.
5
6 Some notes on the implementation:
7
8 + Only one thread is used to serve the IO completion port, for several
9 reasons:
10
11 * First, there seems to be (have been?) trouble that locked up NTPD
12 when more than one thread was used for IOCPL.
13
14 * Second, for the sake of the time stamp interpolation the threads
15 must run on the same CPU as the time interpolation thread. This
16 makes using more than one thread useless, as they would compete for
17 the same core and create contention.
18
19 + Some IO operations need a possibly lengthy post-processing. Emulating
20 the UN*X line discipline is currently the only but prominent example.
21 To avoid the processing in the time-critical IOCPL thread, longer
22 processing is offloaded the worker thread pool.
23
24 + A fact that seems not as well-known as it should be is that all
25 resources passed to an overlapped IO operation must be considered
26 owned by the OS until the result has been fetched/dequeued. This
27 includes all overlapped structures and buffers involved, so cleaning
28 up on shutdown must be carefully constructed. (This includes closing
29 all the IO handles and waiting for the results to be dequeued.
30 'CancleIo()' cannot be used since it's broken beyond repair.)
31
32 If this is not possible, then all resources should be dropped into
33 oblivion -- otherwise "bad things (tm)" are bound to happen.
34
35 Using a private heap that is silently dropped but not deleted is a
36 good way to avoid cluttering memory stats with IO context related
37 objects. Leak tracing becomes more interesting, though.
38
39
40 The current implementation is based on the work of Danny Mayer who improved
41 the original implementation and Dave Hart who improved on the serial I/O
42 routines. The true roots of this file seem to be shrouded by the mist of time...
43
44
45 This version still provides the 'user space PPS' emulation
46 feature.
47
48 Juergen Perlinger (perlinger@ntp.org) Feb 2012
49
50 -----------------------------------------------------------------------
51 */
52 #ifdef HAVE_CONFIG_H
53 # include <config.h>
54 #endif
55
56 #ifdef HAVE_IO_COMPLETION_PORT
57
58 #include <stddef.h>
59 #include <stdio.h>
60 #include <process.h>
61 #include <syslog.h>
62
63 #include "ntpd.h"
64 #include "ntp_request.h"
65
66 #include "ntp_iocompletionport.h"
67 #include "ntp_iocplmem.h"
68 #include "ntp_iocpltypes.h"
69
70
71 #define CONTAINEROF(p, type, member) \
72 ((type *)((char *)(p) - offsetof(type, member)))
73
74 enum io_packet_handling {
75 PKT_OK,
76 PKT_DROP,
77 PKT_SOCKET_ERROR
78 };
79
80 static const char * const st_packet_handling[3] = {
81 "accepted",
82 "dropped"
83 "error"
84 };
85
86 /*
87 * local function definitions
88 */
89 static void ntpd_addremove_semaphore(HANDLE, int);
90 static void set_serial_recv_time (recvbuf_t *, IoCtx_t *);
91
92 /* Initiate/Request async IO operations */
93 static BOOL __fastcall QueueSerialWait (IoCtx_t *, recvbuf_t *);
94 static BOOL __fastcall QueueSerialRead(IoCtx_t *, recvbuf_t *);
95 static BOOL __fastcall QueueRawSerialRead(IoCtx_t *, recvbuf_t *);
96 static BOOL __fastcall QueueSocketRecv(IoCtx_t *, recvbuf_t *);
97
98
99 /* High-level IO callback functions */
100 static void OnSocketRecv (ULONG_PTR, IoCtx_t *);
101 static void OnSocketSend (ULONG_PTR, IoCtx_t *);
102 static void OnSerialWaitComplete (ULONG_PTR, IoCtx_t *);
103 static void OnSerialReadComplete (ULONG_PTR, IoCtx_t *);
104 static void OnRawSerialReadComplete(ULONG_PTR, IoCtx_t *);
105 static void OnSerialWriteComplete (ULONG_PTR, IoCtx_t *);
106
107 /* worker pool offload functions */
108 static DWORD WINAPI OnSerialReadWorker(void * ctx);
109
110
111 /* keep a list to traverse to free memory on debug builds */
112 #ifdef DEBUG
113 static void free_io_completion_port_mem(void);
114 #endif
115
116
117 HANDLE WaitableExitEventHandle;
118 HANDLE WaitableIoEventHandle;
119 static HANDLE hndIOCPLPort;
120 static HANDLE hMainThread;
121 static HANDLE hMainRpcDone;
122
123 DWORD ActiveWaitHandles;
124 HANDLE WaitHandles[4];
125
126
127 /*
128 * -------------------------------------------------------------------
129 * Windows 2000 bluescreens with bugcheck 0x76 PROCESS_HAS_LOCKED_PAGES
130 * at ntpd process termination when using more than one pending
131 * receive per socket. A runtime version test during startup will
132 * allow using more on newer versions of Windows.
133 *
134 * perlinger@ntp.org: Considering the quirks fixed in the overlapped
135 * IO handling in recent years, it could even be that this is no longer
136 * an issue. Testing this might be tricky -- who runs a Win2k system
137 * in the year 2016?
138 */
139 static size_t s_SockRecvSched = 1; /* possibly adjusted later */
140
141
142 /*
143 * -------------------------------------------------------------------
144 * The IO completion thread and support functions
145 *
146 * There is only one completion thread, because it is locked to the same
147 * core as the time interpolation. Having more than one causes core
148 * contention and is not useful.
149 * -------------------------------------------------------------------
150 */
151 static HANDLE hIoCompletionThread;
152 static UINT tidCompletionThread;
153
154 /*
155 * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
156 * The IO completion worker thread
157 *
158 * Note that this thread does not enter an alertable wait state and that
159 * the only waiting point is the IO completion port. If stopping this
160 * thread with a special queued result packet does not work,
161 * 'TerminateThread()' is the only remaining weapon in the arsenal. A
162 * dangerous weapon -- it's like SIGKILL.
163 */
164 static unsigned WINAPI
165 iocompletionthread(
166 void *NotUsed
167 )
168 {
169 DWORD err;
170 DWORD octets;
171 ULONG_PTR key;
172 OVERLAPPED * pol;
173 IoCtx_t * lpo;
174
175 UNUSED_ARG(NotUsed);
176
177 /* Socket and refclock receive call gettimeofday() so the I/O
178 * thread needs to be on the same processor as the main and
179 * timing threads to ensure consistent QueryPerformanceCounter()
180 * results.
181 *
182 * This gets seriously into the way of efficient thread pooling
183 * on multi-core systems.
184 */
185 lock_thread_to_processor(GetCurrentThread());
186
187 /* Set the thread priority high enough so I/O will pre-empt
188 * normal recv packet processing, but not higher than the timer
189 * sync thread.
190 */
191 if (!SetThreadPriority(GetCurrentThread(),
192 THREAD_PRIORITY_ABOVE_NORMAL))
193 msyslog(LOG_ERR, "Can't set thread priority: %m");
194
195 for(;;) {
196 if (GetQueuedCompletionStatus(
197 hndIOCPLPort,
198 &octets,
199 &key,
200 &pol,
201 INFINITE)) {
202 err = ERROR_SUCCESS;
203 } else {
204 err = GetLastError();
205 }
206 if (pol == NULL) {
207 DPRINTF(2, ("Overlapped IO Thread Exiting\n"));
208 break; /* fail */
209 }
210 lpo = CONTAINEROF(pol, IoCtx_t, ol);
211 get_systime(&lpo->aux.RecvTime);
212 lpo->byteCount = octets;
213 lpo->errCode = err;
214 handler_calls++;
215 (*lpo->onIoDone)(key, lpo);
216 }
217
218 return 0;
219 }
220
221 /*
222 * -------------------------------------------------------------------
223 * Create/initialise the I/O creation port
224 */
225 void
226 init_io_completion_port(void)
227 {
228 OSVERSIONINFO vi;
229
230 # ifdef DEBUG
231 atexit(&free_io_completion_port_mem);
232 # endif
233
234 memset(&vi, 0, sizeof(vi));
235 vi.dwOSVersionInfoSize = sizeof(vi);
236
237 /* For windows 7 and above, schedule more than one receive */
238 if (GetVersionEx(&vi) && vi.dwMajorVersion >= 6)
239 s_SockRecvSched = 4;
240
241 /* Create the context pool first. */
242 IOCPLPoolInit(20);
243
244 /* Create the event used to signal an IO event */
245 WaitableIoEventHandle = CreateEvent(NULL, FALSE, FALSE, NULL);
246 if (WaitableIoEventHandle == NULL) {
247 msyslog(LOG_ERR, "Can't create I/O event handle: %m");
248 exit(1);
249 }
250 /* Create the event used to signal an exit event */
251 WaitableExitEventHandle = CreateEvent(NULL, FALSE, FALSE, NULL);
252 if (WaitableExitEventHandle == NULL) {
253 msyslog(LOG_ERR, "Can't create exit event handle: %m");
254 exit(1);
255 }
256 hMainRpcDone = CreateEvent(NULL, FALSE, FALSE, NULL);
257 if (hMainRpcDone == NULL) {
258 msyslog(LOG_ERR, "Can't create RPC sync handle: %m");
259 exit(1);
260 }
261
262 /* Create the IO completion port */
263 hndIOCPLPort = CreateIoCompletionPort(
264 INVALID_HANDLE_VALUE, NULL, 0, 0);
265 if (hndIOCPLPort == NULL) {
266 msyslog(LOG_ERR, "Can't create I/O completion port: %m");
267 exit(1);
268 }
269
270 /* Initialize the Wait Handles table */
271 WaitHandles[0] = WaitableIoEventHandle;
272 WaitHandles[1] = WaitableExitEventHandle; /* exit request */
273 WaitHandles[2] = WaitableTimerHandle;
274 ActiveWaitHandles = 3;
275
276 /* Supply ntp_worker.c with function to add or remove a
277 * semaphore to the ntpd I/O loop which is signalled by a worker
278 * when a response is ready. The callback is invoked in the
279 * parent.
280 */
281 addremove_io_semaphore = &ntpd_addremove_semaphore;
282
283 /* Create a true handle for the main thread (APC processing) */
284 DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),
285 GetCurrentProcess(), &hMainThread,
286 0, FALSE, DUPLICATE_SAME_ACCESS);
287
288 /* Have one thread servicing I/O. See rationale in front matter. */
289 hIoCompletionThread = (HANDLE)_beginthreadex(
290 NULL,
291 0,
292 iocompletionthread,
293 NULL,
294 0,
295 &tidCompletionThread);
296 }
297
298
299 /*
300 * -------------------------------------------------------------------
301 * completion port teardown
302 */
303 void
304 uninit_io_completion_port(
305 void
306 )
307 {
308 DWORD rc;
309
310 /* do noting if completion port already gone. */
311 if (hndIOCPLPort == NULL)
312 return;
313
314 /* Service thread seems running. Terminate him with grace
315 * first and force later...
316 */
317 if (tidCompletionThread != GetCurrentThreadId()) {
318 PostQueuedCompletionStatus(hndIOCPLPort, 0, 0, 0);
319 rc = WaitForSingleObject(hIoCompletionThread, 5000);
320 if (rc == WAIT_TIMEOUT) {
321 /* Thread lost. Kill off with TerminateThread. */
322 msyslog(LOG_ERR,
323 "IO completion thread refuses to terminate");
324 TerminateThread(hIoCompletionThread, ~0UL);
325 }
326 }
327
328 /* close the additional main thread handle */
329 if (hMainThread) {
330 CloseHandle(hMainThread);
331 hMainThread = NULL;
332 }
333
334 /* stop using the memory pool */
335 IOCPLPoolDone();
336
337 /* now reap all handles... */
338 CloseHandle(hIoCompletionThread);
339 hIoCompletionThread = NULL;
340 CloseHandle(hndIOCPLPort);
341 hndIOCPLPort = NULL;
342 CloseHandle(hMainRpcDone);
343 hMainRpcDone = NULL;
344 }
345
346
347 /*
348 * -------------------------------------------------------------------
349 * external worker thread support (wait handle stuff)
350 *
351 * !Attention!
352 *
353 * - This function must only be called from the main thread. Changing
354 * a set of wait handles while someone is waiting on it creates
355 * undefined behaviour. Also there's no provision for mutual
356 * exclusion when accessing global values.
357 *
358 * - It's not possible to register a handle that is already in the table.
359 */
360 static void
361 ntpd_addremove_semaphore(
362 HANDLE sem,
363 int remove
364 )
365 {
366 DWORD hi;
367
368 /* search for a matching entry first. */
369 for (hi = 3; hi < ActiveWaitHandles; hi++)
370 if (sem == WaitHandles[hi])
371 break;
372
373 if (remove) {
374 /* If found, eventually swap with last entry to keep
375 * the table dense.
376 */
377 if (hi < ActiveWaitHandles) {
378 ActiveWaitHandles--;
379 if (hi < ActiveWaitHandles)
380 WaitHandles[hi] =
381 WaitHandles[ActiveWaitHandles];
382 WaitHandles[ActiveWaitHandles] = NULL;
383 }
384 } else {
385 /* Make sure the entry is not found and there is enough
386 * room, then append to the table array.
387 */
388 if (hi >= ActiveWaitHandles) {
389 INSIST(ActiveWaitHandles < COUNTOF(WaitHandles));
390 WaitHandles[ActiveWaitHandles] = sem;
391 ActiveWaitHandles++;
392 }
393 }
394 }
395
396
397 #ifdef DEBUG
398 static void
399 free_io_completion_port_mem(void)
400 {
401 /* At the moment, do absolutely nothing. Returning memory here
402 * requires NO PENDING OVERLAPPED OPERATIONS AT ALL at this
403 * point in time, and as long we cannot be reasonable sure about
404 * that the simple advice is:
405 *
406 * HANDS OFF!
407 */
408 }
409 #endif /* DEBUG */
410
411 void
412 iocpl_notify(
413 IoHndPad_T * iopad,
414 void (*pfunc)(ULONG_PTR, IoCtx_t *),
415 UINT_PTR fdn
416 )
417 {
418 IoCtx_t xf;
419
420 memset(&xf, 0, sizeof(xf));
421 xf.iopad = iopad;
422 xf.ppswake = hMainRpcDone;
423 xf.onIoDone = pfunc;
424 xf.io.sfd = fdn;
425 PostQueuedCompletionStatus(hndIOCPLPort, 1, 0, &xf.ol);
426 WaitForSingleObject(xf.ppswake, INFINITE);
427 }
428
429 /*
430 * -------------------------------------------------------------------
431 * APC callback for scheduling interface scans.
432 *
433 * We get an error when trying to send if the network interface is
434 * gone or has lost link. Rescan interfaces to catch on sooner, but no
435 * more often than once per minute. Once ntpd is able to detect
436 * changes without polling this should be unnecessary.
437 */
438 static void WINAPI
439 apcOnUnexpectedNetworkError(
440 ULONG_PTR arg
441 )
442 {
443 static u_long time_next_ifscan_after_error;
444
445 UNUSED_ARG(arg);
446
447 if (time_next_ifscan_after_error < current_time) {
448 time_next_ifscan_after_error = current_time + 60;
449 timer_interfacetimeout(current_time);
450 }
451 DPRINTF(4, ("UnexpectedNetworkError: interface may be down\n"));
452 }
453
454 /* -------------------------------------------------------------------
455 *
456 * Prelude to madness -- common error checking code
457 *
458 * -------------------------------------------------------------------
459 */
460 extern char * NTstrerror(int err, BOOL *bfreebuf);
461
462 static void
463 LogIoError(
464 const char * msg,
465 HANDLE hnd,
466 DWORD err
467 )
468 {
469 static const char * const rmsg =
470 "LogIoError (unknown source)";
471
472 /* -*- format & print the error message -*-
473 * We have to resort to the low level error formatting functions
474 * here, since the error code can come from an overlapped result.
475 * Relying the value to be the same as the 'GetLastError()'
476 * result at this point of execution is shaky at best, and using
477 * 'SetLastError()' to force it seems too nasty.
478 */
479 BOOL dynbuf = FALSE;
480 char * msgbuf = NTstrerror(err, &dynbuf);
481 msyslog(LOG_ERR, "%s: hnd=%p, err=%u, '%s'",
482 (msg ? msg : rmsg), hnd, err, msgbuf);
483 if (dynbuf)
484 LocalFree(msgbuf);
485 }
486
487 /* -------------------------------------------------------------------
488 * synchronous IO request result check (network & serial)
489 * -------------------------------------------------------------------
490 */
491 static BOOL
492 IoResultCheck(
493 DWORD err,
494 IoCtx_t * ctx,
495 const char * msg
496 )
497 {
498 DPRINTF(6, ("in IoResultCheck err = %d\n", err));
499
500 switch (err) {
501 /* The first ones are no real errors. */
502 case ERROR_SUCCESS: /* all is good */
503 case ERROR_IO_PENDING: /* callback pending */
504 break;
505
506 /* this defers the error processing to the main thread
507 * and continues silently.
508 */
509 case ERROR_UNEXP_NET_ERR:
510 if (hMainThread) {
511 QueueUserAPC(apcOnUnexpectedNetworkError,
512 hMainThread, ctx->io.sfd);
513 }
514 IoCtxRelease(ctx);
515 return FALSE;
516
517 default:
518 LogIoError(msg, ctx->io.hnd, err);
519 /* the next ones go silently -- only clean-up is done */
520 case ERROR_INVALID_PARAMETER: /* handle already closed (clock)*/
521 case WSAENOTSOCK : /* handle already closed (socket)*/
522 IoCtxRelease(ctx);
523 return FALSE;
524 }
525 return TRUE;
526 }
527
528 /* -------------------------------------------------------------------
529 * IO callback context check -- serial (non-network) data streams
530 *
531 * Attention: deletes the IO context when the clock is dead!
532 * -------------------------------------------------------------------
533 */
534 static RIO_t*
535 getRioFromIoCtx(
536 IoCtx_t * ctx,
537 ULONG_PTR key,
538 const char * msg
539 )
540 {
541 /* Make sure the key matches the context info in the shared
542 * lock, the check for errors. If the error indicates the
543 * operation was cancelled, let the operation fail silently.
544 */
545 RIO_t * rio = NULL;
546 IoHndPad_T * iopad = ctx->iopad;
547 if (NULL != iopad) {
548 rio = iopad->rsrc.rio;
549 if (key != iopad->rsrc.key)
550 rio = NULL;
551 else if (ctx->io.hnd != iopad->handles[0])
552 rio = NULL;
553 }
554 if (rio != NULL) switch (ctx->errCode) {
555 /* When we got cancelled, don't spill messages */
556 case ERROR_INVALID_PARAMETER: /* handle already closed (clock) */
557 case ERROR_OPERATION_ABORTED: /* handle closed while wait */
558 case WSAENOTSOCK: /* handle already closed (sock?) */
559 ctx->errCode = ERROR_SUCCESS;
560 rio = NULL;
561 case ERROR_SUCCESS: /* all is good */
562 break;
563 default:
564 /* log error, but return -- caller has to handle this! */
565 LogIoError(msg, ctx->io.hnd, ctx->errCode);
566 break;
567 }
568 if (rio == NULL)
569 IoCtxRelease(ctx);
570 return rio;
571 }
572
573 /* -------------------------------------------------------------------
574 * IO callback context check -- network sockets
575 *
576 * Attention: deletes the IO context when the endpoint is dead!
577 * -------------------------------------------------------------------
578 */
579 static endpt*
580 getEndptFromIoCtx(
581 IoCtx_t * ctx,
582 ULONG_PTR key
583 )
584 {
585 /* Make sure the key matches the context info in the shared
586 * lock, then check for errors. If the error indicates the
587 * operation was cancelled, let the operation fail silently.
588 *
589 * !Note! Since we use the lowest bit of the key to distinguish
590 * between regular and broadcast socket, we must make sure the
591 * LSB is not used in the reverse-link check. Hence we shift
592 * it out in both the input key and the registered source.
593 */
594 endpt * ep = NULL;
595 IoHndPad_T * iopad = ctx->iopad;
596 if (iopad != NULL) {
597 ep = iopad->rsrc.ept;
598 if ((key >> 1) != (iopad->rsrc.key >> 1))
599 ep = NULL;
600 else if (ctx->io.hnd != iopad->handles[key & 1])
601 ep = NULL;
602 }
603 if (ep == NULL)
604 IoCtxRelease(ctx);
605 return ep;
606 }
607
608
609 static int
610 socketErrorCheck(
611 IoCtx_t * ctx,
612 const char * msg
613 )
614 {
615 int oval, olen; /* getsockopt params */
616 int retCode;
617
618 switch (ctx->errCode) {
619 case ERROR_SUCCESS: /* all is good */
620 retCode = PKT_OK;
621 break;
622 case ERROR_UNEXP_NET_ERR:
623 if (hMainThread)
624 QueueUserAPC(apcOnUnexpectedNetworkError,
625 hMainThread, ctx->io.sfd);
626 case ERROR_INVALID_PARAMETER: /* handle already closed (clock?)*/
627 case ERROR_OPERATION_ABORTED: /* handle closed while wait */
628 case WSAENOTSOCK : /* handle already closed (sock) */
629 retCode = PKT_SOCKET_ERROR;
630 break;
631
632 /* [Bug 3019] is hard to squash.
633 * We should not get this, but we do, unfortunately. Obviously
634 * Windows insists in terminating one overlapped I/O request
635 * when it receives a TTL-expired ICMP message, and since the
636 * write that caused it is long finished, this unfortunately
637 * hits the pending receive.
638 *
639 * The only way out seems to be to silently ignore this error
640 * and restart another round, in the hope this condition does
641 * not prevail. Clear any pending socket level errors, too.
642 */
643 case ERROR_HOST_UNREACHABLE:
644 oval = 0;
645 olen = sizeof(oval);
646 getsockopt(ctx->io.sfd, SOL_SOCKET, SO_ERROR, (void *)&oval, &olen);
647 retCode = PKT_DROP;
648 break;
649
650 /* [Bug 3110] On POSIX systems, reading UDP data into too small
651 * a buffers silently truncates the message. Under Windows the
652 * data is also truncated, but it blarts loudly about that.
653 * Just pretend all is well, and all will be well.
654 *
655 * Note: We accept the truncated packet -- this is consistent with the
656 * POSIX / UNIX case where we have no notification about this at all.
657 */
658 case ERROR_MORE_DATA: /* Too Much data for Buffer */
659 case WSAEMSGSIZE:
660 retCode = PKT_OK; /* or PKT_DROP ??? */
661 break;
662
663 /* For any other error, log the error, clear the byte count, but
664 * return the endpoint. This prevents processing the packet and
665 * keeps the read-chain running -- otherwise NTPD will play
666 * dead duck!
667 */
668 default:
669 LogIoError(msg, ctx->io.hnd, ctx->errCode);
670 retCode = PKT_DROP;
671 break;
672 }
673 return retCode;
674 }
675
676 /*
677 * -------------------------------------------------------------------
678 * Serial IO stuff
679 *
680 * Part 1 -- COMM event handling
681 *
682 * This is the initial step for serial line input: wait for COM event.
683 * We always check for DCD changes (for user-mode PPS time stamps) and
684 * either a flag char (line feed, for line mode emulation) or any
685 * input character (raw mode). In the callback we decide if we just
686 * have to go on with waiting, or if there is data we must read.
687 * Depending on the mode, we either queue a raw read or a 'regular'
688 * read request.
689 *
690 * !Note! Currently on single IO context circles through the WAIT,
691 * READ and PROCESS stages. For better performance, it might make
692 * sense to have on cycle for the wait, spinning off new read requests
693 * when there is data. There are actually two problems that must be
694 * solved:
695 * - We would need a queue on post-processing.
696 * - We have to take care of the order of read results. While the
697 * IOCPL queue guarantees delivery in the order of enque, the
698 * order of enque is not guaranteed once multiple reads are in
699 * flight.
700 *
701 * So, for the time being, we have one request cycling...
702 * -------------------------------------------------------------------
703 */
704
705 static BOOL __fastcall
706 QueueSerialWait(
707 IoCtx_t * lpo,
708 recvbuf_t * buff
709 )
710 {
711 static const char * const msgh =
712 "QueueSerialWait: cannot wait for COM event";
713
714 BOOL rc;
715
716 lpo->onIoDone = OnSerialWaitComplete;
717 lpo->recv_buf = buff;
718 lpo->flRawMem = 0;
719
720 buff->fd = lpo->iopad->riofd;
721 /* keep receive position for continuation of partial lines! */
722 rc = WaitCommEvent(lpo->io.hnd, &lpo->aux.com_events, &lpo->ol);
723 return rc || IoResultCheck(GetLastError(), lpo, msgh);
724 }
725
726 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
727
728 static void
729 OnSerialWaitComplete(
730 ULONG_PTR key,
731 IoCtx_t * lpo
732 )
733 {
734 static const char * const msgh =
735 "OnSerialWaitComplete: wait for COM event failed";
736
737 DevCtx_t * dev;
738 PPSDataEx_t * ppsbuf;
739 DWORD modem_status;
740 u_long covc;
741
742 /* Make sure this RIO is not closed. */
743 if (NULL == getRioFromIoCtx(lpo, key, msgh))
744 return;
745
746 /* start next IO and leave if we hit an error */
747 if (lpo->errCode != ERROR_SUCCESS) {
748 memset(&lpo->aux, 0, sizeof(lpo->aux));
749 IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf);
750 return;
751 }
752
753 #ifdef DEBUG
754 if (~(EV_RXFLAG | EV_RLSD | EV_RXCHAR) & lpo->aux.com_events) {
755 msyslog(LOG_ERR, "WaitCommEvent returned unexpected mask %x",
756 lpo->aux.com_events);
757 exit(-1);
758 }
759 #endif
760 /* Take note of changes on DCD; 'user mode PPS hack'.
761 * perlinger@ntp.org suggested a way of solving several problems
762 * with this code that makes a lot of sense: move to a putative
763 * dcdpps-ppsapi-provider.dll.
764 *
765 * perlinger@ntp.org: It came out as loopback-ppsapi-provider
766 * (because it loops back into NTPD), but I had to maintain the
767 * old hack for backward compatibility.
768 */
769 if (EV_RLSD & lpo->aux.com_events) {
770 modem_status = 0;
771 GetCommModemStatus(lpo->io.hnd, &modem_status);
772 if (NULL != (dev = lpo->devCtx)) {
773 /* PPS-context available -- use it! */
774 if (MS_RLSD_ON & modem_status) {
775 dev->pps_data.cc_assert++;
776 dev->pps_data.ts_assert = lpo->aux.RecvTime;
777 DPRINTF(2, ("upps-real: fd %d DCD PPS Rise at %s\n",
778 lpo->iopad->rsrc.rio->fd,
779 ulfptoa(&lpo->aux.RecvTime, 6)));
780 } else {
781 dev->pps_data.cc_clear++;
782 dev->pps_data.ts_clear = lpo->aux.RecvTime;
783 DPRINTF(2, ("upps-real: fd %d DCD PPS Fall at %s\n",
784 lpo->iopad->rsrc.rio->fd,
785 ulfptoa(&lpo->aux.RecvTime, 6)));
786 }
787 /* Update PPS buffer, writing from low to high, with index
788 * update as last action. We use interlocked ops and a
789 * volatile data destination to avoid reordering on compiler
790 * and CPU level. The interlocked instruction act as full
791 * barriers -- we need only release semantics, but we don't
792 * have them before VS2010.
793 */
794 covc = dev->cov_count + 1u;
795 ppsbuf = dev->pps_buff + (covc & PPS_QUEUE_MSK);
796 InterlockedExchange((PLONG)&ppsbuf->cov_count, covc);
797 ppsbuf->data = dev->pps_data;
798 InterlockedExchange((PLONG)&dev->cov_count, covc);
799 }
800 /* perlinger@ntp.org, 2012-11-19
801 * It can be argued that once you have the PPS API active, you can
802 * disable the old pps hack. This would give a behaviour that's much
803 * more like the behaviour under a UN*Xish OS. On the other hand, it
804 * will give a nasty surprise for people which have until now happily
805 * taken the pps hack for granted, and after the first complaint, I have
806 * decided to keep the old implementation.
807 *
808 * perlinger@ntp.org, 2017-03-04
809 * If the loopback PPS API provider is active on this channel, the
810 * PPS hack will be *disabled*.
811 *
812 * backward compat: 'usermode-pps-hack'
813 */
814 if ((MS_RLSD_ON & modem_status) && !(dev && dev->pps_active)) {
815 lpo->aux.DCDSTime = lpo->aux.RecvTime;
816 lpo->aux.flTsDCDS = 1;
817 DPRINTF(2, ("upps-hack: fd %d DCD PPS Rise at %s\n",
818 lpo->iopad->rsrc.rio->fd,
819 ulfptoa(&lpo->aux.RecvTime, 6)));
820 }
821 }
822
823 /* If IO ready, read data. Go back waiting else. */
824 if (EV_RXFLAG & lpo->aux.com_events) { /* line discipline */
825 lpo->aux.FlagTime = lpo->aux.RecvTime;
826 lpo->aux.flTsFlag = 1;
827 IoCtxStartChecked(lpo, QueueSerialRead, lpo->recv_buf);
828 } else if (EV_RXCHAR & lpo->aux.com_events) { /* raw discipline */
829 lpo->aux.FlagTime = lpo->aux.RecvTime;
830 lpo->aux.flTsFlag = 1;
831 IoCtxStartChecked(lpo, QueueRawSerialRead, lpo->recv_buf);
832 } else { /* idle... */
833 IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf);
834 }
835 }
836
837 /*
838 * -------------------------------------------------------------------
839 * Serial IO stuff
840 *
841 * common for both modes
842 * -------------------------------------------------------------------
843 */
844 static BOOL __fastcall
845 QueueSerialReadCommon(
846 IoCtx_t * lpo,
847 recvbuf_t * buff
848 )
849 {
850 static const char * const msgh =
851 "QueueSerialRead: cannot schedule device read";
852
853 BOOL rc;
854
855 /* 'lpo->onIoDone' must be set already! */
856 lpo->recv_buf = buff;
857 lpo->flRawMem = 0;
858
859 /* 'buff->recv_length' must be set already! */
860 buff->fd = lpo->iopad->riofd;
861 buff->dstadr = NULL;
862 buff->receiver = process_refclock_packet;
863 buff->recv_peer = lpo->iopad->rsrc.rio->srcclock;
864
865 rc = ReadFile(lpo->io.hnd,
866 (char*)buff->recv_buffer + buff->recv_length,
867 sizeof(buff->recv_buffer) - buff->recv_length,
868 NULL, &lpo->ol);
869 return rc || IoResultCheck(GetLastError(), lpo, msgh);
870 }
871
872 /*
873 * -------------------------------------------------------------------
874 * Serial IO stuff
875 *
876 * Part 2 -- line discipline emulation
877 *
878 * Ideally this should *not* be done in the IO completion thread.
879 * We use a worker pool thread to offload the low-level processing.
880 * -------------------------------------------------------------------
881 */
882
883 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
884 * Start & Queue a serial read for line discipline emulation.
885 */
886 static BOOL __fastcall
887 QueueSerialRead(
888 IoCtx_t * lpo,
889 recvbuf_t * buff
890 )
891 {
892 lpo->onIoDone = &OnSerialReadComplete;
893 /* keep 'buff->recv_length' for line continuation! */
894 return QueueSerialReadCommon(lpo, buff);
895 }
896
897 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
898 * IO completion thread callback. Takes a time stamp and offloads the
899 * real work to the worker pool ASAP.
900 */
901 static void
902 OnSerialReadComplete(
903 ULONG_PTR key,
904 IoCtx_t * lpo
905 )
906 {
907 static const char * const msgh =
908 "OnSerialReadComplete: read from device failed";
909
910 /* Make sure this RIO is not closed. */
911 if (NULL == getRioFromIoCtx(lpo, key, msgh))
912 return;
913
914 /* start next IO and leave if we hit an error */
915 if (lpo->errCode != ERROR_SUCCESS)
916 goto wait_again;
917
918 /* Offload to worker pool, if there is data */
919 if (lpo->byteCount == 0)
920 goto wait_again;
921
922 if (QueueUserWorkItem(&OnSerialReadWorker, lpo, WT_EXECUTEDEFAULT))
923 return; /* successful regular exit! */
924
925 /* croak as we're throwing away data */
926 msyslog(LOG_ERR,
927 "Can't offload to worker thread, will skip data: %m");
928
929 wait_again:
930 /* make sure the read is issued again */
931 memset(&lpo->aux, 0, sizeof(lpo->aux));
932 IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf);
933 }
934
935
936 /*
937 * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
938 * Worker pool offload function -- avoid lengthy operations in the IO
939 * completion thread (affects timing...)
940 *
941 * This function does the real work of emulating the UN*X line
942 * discipline. Since this involves allocation of additional buffers and
943 * string parsing/copying, it is offloaded to the worker thread pool so
944 * the IO completion thread can resume faster.
945 *
946 * !!ATTENTION!!
947 * This function runs on an arbitrary worker thread. The resource
948 * management with regard to IO is synchronised only between the main
949 * thread and the IO worker thread, so decisions about queueing and
950 * starting new IO must be made by either of them.
951 *
952 * Since the IO thread sticks in the IOCPL queue and is not alertable,
953 * we could either use the APC queue to the main thread or the IOCPL
954 * queue to the IO thread.
955 *
956 * We separate the effort -- filtering based on the RIO state is done
957 * by the main thread, restarting the IO by the IO thread to reduce
958 * delays.
959 */
960
961 /* -------------------------------------------------------------------
962 * IOCPL deferred bouncer -- start a new serial wait from IOCPL thread
963 */
964 static void
965 OnDeferredStartWait(
966 ULONG_PTR key,
967 IoCtx_t * lpo
968 )
969 {
970 IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf);
971 }
972
973 /* -------------------------------------------------------------------
974 * APC deferred bouncer -- put buffer to receive queueor eventually
975 * discard it if source is already disabled. Runs in the context
976 * of the main thread exclusively.
977 */
978 static void WINAPI
979 OnEnqueAPC(
980 ULONG_PTR arg
981 )
982 {
983 recvbuf_t * buff = (recvbuf_t*)arg;
984 IoHndPad_T * iopad = (IoHndPad_T*)buff->recv_peer;
985 RIO_t * rio = iopad->rsrc.rio;
986
987 /* Down below we make a nasty hack to transport the iopad
988 * pointer in the buffer so we can avoid another temporary
989 * allocation. We must undo this here.
990 */
991 if (NULL != rio) {
992 /* OK, refclock still attached */
993 buff->recv_peer = rio->srcclock;
994 if (iohpQueueLocked(iopad, iohpRefClockOK, buff))
995 ++rio->srcclock->received;
996 } else {
997 /* refclock detached while in flight... */
998 freerecvbuf(buff);
999 }
1000 iohpDetach(iopad); /* one unit owned by this callback! */
1001 }
1002
1003 /* -------------------------------------------------------------------
1004 * worker pool thread worker doing the string processing
1005 */
1006 static DWORD WINAPI
1007 OnSerialReadWorker(
1008 void * ctx
1009 )
1010 {
1011 IoCtx_t * lpo = (IoCtx_t*)ctx;
1012 IoHndPad_T * iop = lpo->iopad;
1013 recvbuf_t * buff = lpo->recv_buf;
1014 recvbuf_t * obuf = NULL;
1015 char *sptr, *send, *dptr;
1016 BOOL eol;
1017 int ch;
1018
1019 /* We should never gat a zero-byte read here. If we do, nothing
1020 * really bad happens, just a useless rescan of data we have
1021 * already processed. But somethings not quite right in logic
1022 * and we croak loudly in debug builds.
1023 */
1024 DEBUG_INSIST(lpo->byteCount > 0);
1025
1026 /* Account for additional input and then mimic the UNIX line
1027 * discipline. This is an implict state machine -- the
1028 * implementation is very low-level to gather speed.
1029 */
1030 buff->recv_length += (int)lpo->byteCount;
1031 sptr = (char *)buff->recv_buffer;
1032 send = sptr + buff->recv_length;
1033 if (sptr == send)
1034 goto st_read_fresh;
1035
1036 st_new_obuf:
1037 /* Get new receive buffer to store the line. */
1038 obuf = get_free_recv_buffer_alloc(TRUE);
1039 if (!obuf) {
1040 ++packets_dropped; /* maybe atomic? */
1041 buff->recv_length = 0;
1042 goto st_read_fresh;
1043 }
1044 obuf->fd = buff->fd;
1045 obuf->receiver = buff->receiver;
1046 obuf->dstadr = NULL;
1047 obuf->recv_peer = buff->recv_peer;
1048 set_serial_recv_time(obuf, lpo);
1049
1050 st_copy_start:
1051 /* Copy data to new buffer, convert CR to LF on the fly.
1052 * Stop after either.
1053 */
1054 dptr = (char *)obuf->recv_buffer;
1055 do {
1056 ch = *sptr++;
1057 if ('\r' == ch)
1058 ch = '\n';
1059 *dptr++ = ch;
1060 eol = ('\n' == ch);
1061 } while (!(eol || sptr == send));
1062 obuf->recv_length = (int)(dptr - (char *)obuf->recv_buffer);
1063
1064 /* If we're not at EOL, we need more data to continue the line.
1065 * But this can only be done if there's more room in the buffer;
1066 * if we have already reached the maximum size, treat the whole
1067 * buffer as part of a mega-line and pass it on.
1068 */
1069 if (!eol) {
1070 if (obuf->recv_length < sizeof(obuf->recv_buffer))
1071 goto st_read_more;
1072 else
1073 goto st_pass_buffer;
1074 }
1075
1076 /* if we should drop empty lines, do it here. */
1077 if (obuf->recv_length < 2 && iop->flDropEmpty) {
1078 obuf->recv_length = 0;
1079 if (sptr != send)
1080 goto st_copy_start;
1081 else
1082 goto st_read_more;
1083 }
1084
1085 if ( ! iop->flFirstSeen) {
1086 iop->flFirstSeen = 1;
1087 obuf->recv_length = 0;
1088 if (sptr != send)
1089 goto st_copy_start;
1090 else
1091 goto st_read_more;
1092 }
1093
1094 st_pass_buffer:
1095 /* if we arrive here, we can spin off another text line to the
1096 * receive queue. We use a hack to supplant the RIO pointer in
1097 * the receive buffer with the IOPAD to save us a temporary
1098 * workspace allocation. Note the callback owns one refcount
1099 * unit to keep the IOPAD alive! Also checking that the RIO in
1100 * the IOPAD matches the RIO in the buffer is dangerous: That
1101 * pointer is manipulated by the other threads!
1102 */
1103 obuf->recv_peer = (struct peer*)iohpAttach(lpo->iopad);
1104 QueueUserAPC(OnEnqueAPC, hMainThread, (ULONG_PTR)obuf);
1105 if (sptr != send)
1106 goto st_new_obuf;
1107 buff->recv_length = 0;
1108 goto st_read_fresh;
1109
1110 st_read_more:
1111 /* read more data into current OBUF, which is valid and will
1112 * replace BUFF.
1113 */
1114 lpo->recv_buf = obuf;
1115 freerecvbuf(buff);
1116
1117 st_read_fresh:
1118 /* Start next round. This is deferred to the IOCPL thread, as
1119 * read access to the IOPAD is unsafe from a worker thread
1120 * for anything but the flags. If the IOCPL handle is gone,
1121 * just mop up the pieces.
1122 */
1123 lpo->onIoDone = OnDeferredStartWait;
1124 if (!(hndIOCPLPort && PostQueuedCompletionStatus(hndIOCPLPort, 1, 0, &lpo->ol)))
1125 IoCtxRelease(lpo);
1126 return 0;
1127 }
1128
1129
1130 /*
1131 * -------------------------------------------------------------------
1132 * Serial IO stuff
1133 *
1134 * Part 3 -- raw data input
1135 *
1136 * Raw data processing is fast enough to do without offloading to the
1137 * worker pool, so this is rather short'n sweet...
1138 * -------------------------------------------------------------------
1139 */
1140
1141 static BOOL __fastcall
1142 QueueRawSerialRead(
1143 IoCtx_t * lpo,
1144 recvbuf_t * buff
1145 )
1146 {
1147 lpo->onIoDone = OnRawSerialReadComplete;
1148 buff->recv_length = 0;
1149 return QueueSerialReadCommon(lpo, buff);
1150 }
1151
1152 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1153 * IO completion thread callback. Takes a time stamp and offloads the
1154 * real work to the worker pool ASAP.
1155 */
1156 static void
1157 OnRawSerialReadComplete(
1158 ULONG_PTR key,
1159 IoCtx_t * lpo
1160 )
1161 {
1162 static const char * const msgh =
1163 "OnRawSerialReadComplete: read from device failed";
1164
1165 recvbuf_t * buff = lpo->recv_buf;
1166 RIO_t * rio = getRioFromIoCtx(lpo, key, msgh);
1167 /* Make sure this RIO is not closed. */
1168 if (rio == NULL)
1169 return;
1170
1171 /* start next IO and leave if we hit an error */
1172 if (lpo->errCode == ERROR_SUCCESS && lpo->byteCount > 0) {
1173 buff->recv_length = (int)lpo->byteCount;
1174 set_serial_recv_time(buff, lpo);
1175 lpo->recv_buf = get_free_recv_buffer_alloc(TRUE);
1176 if (lpo->recv_buf) {
1177 iohpQueueLocked(lpo->iopad, iohpRefClockOK, buff);
1178 } else {
1179 ++packets_dropped; /* maybe atomic? */
1180 buff->recv_length = 0;
1181 lpo->recv_buf = buff;
1182 }
1183 }
1184 IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf);
1185 }
1186
1187
1188 static void
1189 set_serial_recv_time(
1190 recvbuf_t * obuf,
1191 IoCtx_t * lpo
1192 )
1193 {
1194 /* Time stamp assignment is interesting. If we
1195 * have a DCD stamp, we use it, otherwise we use
1196 * the FLAG char event time, and if that is also
1197 * not / no longer available we use the arrival
1198 * time.
1199 */
1200 if (lpo->aux.flTsDCDS)
1201 obuf->recv_time = lpo->aux.DCDSTime;
1202 else if (lpo->aux.flTsFlag)
1203 obuf->recv_time = lpo->aux.FlagTime;
1204 else
1205 obuf->recv_time = lpo->aux.RecvTime;
1206
1207 lpo->aux.flTsDCDS = lpo->aux.flTsFlag = 0; /* use only once! */
1208 }
1209
1210
1211 /*
1212 * -------------------------------------------------------------------
1213 * Serial IO stuff
1214 *
1215 * Part 4 -- Overlapped serial output
1216 *
1217 * Again, no need to offload any work.
1218 * -------------------------------------------------------------------
1219 */
1220
1221 /*
1222 * async_write, clone of write(), used by some reflock drivers
1223 */
1224 int
1225 async_write(
1226 int fd,
1227 const void * data,
1228 unsigned int count
1229 )
1230 {
1231 static const char * const msgh =
1232 "async_write: cannot schedule device write";
1233 static const char * const dmsg =
1234 "overlapped IO data buffer";
1235
1236 IoCtx_t * lpo = NULL;
1237 void * buff = NULL;
1238 HANDLE hnd = NULL;
1239 BOOL rc;
1240
1241 hnd = (HANDLE)_get_osfhandle(fd);
1242 if (hnd == INVALID_HANDLE_VALUE)
1243 goto fail;
1244 if (NULL == (buff = IOCPLPoolMemDup(data, count, dmsg)))
1245 goto fail;
1246 if (NULL == (lpo = IoCtxAlloc(NULL, NULL)))
1247 goto fail;
1248
1249 lpo->io.hnd = hnd;
1250 lpo->onIoDone = OnSerialWriteComplete;
1251 lpo->trans_buf = buff;
1252 lpo->flRawMem = 1;
1253
1254 rc = WriteFile(lpo->io.hnd, lpo->trans_buf, count,
1255 NULL, &lpo->ol);
1256 if (rc || IoResultCheck(GetLastError(), lpo, msgh))
1257 return count; /* normal/success return */
1258
1259 errno = EBADF;
1260 return -1;
1261
1262 fail:
1263 IoCtxFree(lpo);
1264 IOCPLPoolFree(buff, dmsg);
1265 return -1;
1266 }
1267
1268 static void
1269 OnSerialWriteComplete(
1270 ULONG_PTR key,
1271 IoCtx_t * lpo
1272 )
1273 {
1274 /* This is really trivial: Let 'getRioFromIoCtx()' do all the
1275 * error processing, and it returns with a valid RIO, just
1276 * drop the complete context.
1277 */
1278 static const char * const msgh =
1279 "OnSerialWriteComplete: serial output failed";
1280
1281 if (NULL != getRioFromIoCtx(lpo, key, msgh))
1282 IoCtxRelease(lpo);
1283 }
1284
1285
1286 /*
1287 * -------------------------------------------------------------------
1288 * Serial IO stuff
1289 *
1290 * Part 5 -- read PPS time stamps
1291 *
1292 * -------------------------------------------------------------------
1293 */
1294
1295 __declspec(dllexport) void* __stdcall
1296 ntp_pps_attach_device(
1297 HANDLE hndIo
1298 )
1299 {
1300 DevCtx_t * dev = NULL;
1301
1302 dev = DevCtxAttach(serial_devctx(hndIo));
1303 if (NULL == dev)
1304 SetLastError(ERROR_INVALID_HANDLE);
1305 return dev;
1306 }
1307
1308 __declspec(dllexport) void __stdcall
1309 ntp_pps_detach_device(
1310 DevCtx_t * dev
1311 )
1312 {
1313 DevCtxDetach(dev);
1314 }
1315
1316 __declspec(dllexport) BOOL __stdcall
1317 ntp_pps_read(
1318 DevCtx_t * dev,
1319 PPSData_t * data,
1320 size_t dlen
1321 )
1322 {
1323 u_long guard, covc;
1324 int repc;
1325 PPSDataEx_t * ppsbuf;
1326
1327
1328 if (dev == NULL) {
1329 SetLastError(ERROR_INVALID_HANDLE);
1330 return FALSE;
1331 }
1332 if (data == NULL || dlen != sizeof(PPSData_t)) {
1333 SetLastError(ERROR_INVALID_PARAMETER);
1334 return FALSE;
1335 }
1336 /* Reading from shared memory in a lock-free fashion can be
1337 * a bit tricky, since we have to read the components in the
1338 * opposite direction from the write, and the compiler must
1339 * not reorder the read sequence.
1340 * We use interlocked ops and a volatile data source to avoid
1341 * reordering on compiler and CPU level. The interlocked
1342 * instruction act as full barriers -- we need only acquire
1343 * semantics, but we don't have them before VS2010.
1344 */
1345 repc = 3;
1346 do {
1347 covc = InterlockedExchangeAdd((PLONG)&dev->cov_count, 0);
1348 ppsbuf = dev->pps_buff + (covc & PPS_QUEUE_MSK);
1349 *data = ppsbuf->data;
1350 guard = InterlockedExchangeAdd((PLONG)&ppsbuf->cov_count, 0);
1351 guard ^= covc;
1352 } while (guard && ~guard && --repc);
1353
1354 if (guard) {
1355 SetLastError(ERROR_INVALID_DATA);
1356 return FALSE;
1357 }
1358 return TRUE;
1359 }
1360
1361 /* --------------------------------------------------------------------
1362 * register and unregister refclock IOs with the IO engine
1363 * --------------------------------------------------------------------
1364 */
1365
1366 /* Add a reference clock data structures I/O handles to
1367 * the I/O completion port. Return FALSE if any error,
1368 * TRUE on success
1369 */
1370 BOOL
1371 io_completion_port_add_clock_io(
1372 RIO_t *rio
1373 )
1374 {
1375 static const char * const msgh =
1376 "io_completion_port_add_clock_io";
1377
1378 IoCtx_t * lpo;
1379 HANDLE h;
1380 IoHndPad_T * iopad = NULL;
1381 recvbuf_t * rbuf;
1382
1383 /* preset to clear state for error cleanup:*/
1384 rio->ioreg_ctx = NULL;
1385 rio->device_ctx = NULL;
1386
1387 h = (HANDLE)_get_osfhandle(rio->fd);
1388 if (h == INVALID_HANDLE_VALUE) {
1389 msyslog(LOG_ERR, "%s: COM port FD not valid",
1390 msgh);
1391 goto fail;
1392 }
1393
1394 if (NULL == (rio->ioreg_ctx = iopad = iohpCreate(rio))) {
1395 msyslog(LOG_ERR, "%s: Failed to create shared lock",
1396 msgh);
1397 goto fail;
1398 }
1399 iopad->handles[0] = h;
1400 iopad->riofd = rio->fd;
1401 iopad->rsrc.rio = rio;
1402
1403 if (NULL == (rio->device_ctx = DevCtxAttach(serial_devctx(h)))) {
1404 msyslog(LOG_ERR, "%s: Failed to allocate device context",
1405 msgh);
1406 goto fail;
1407 }
1408
1409 if (NULL == (lpo = IoCtxAlloc(iopad, rio->device_ctx))) {
1410 msyslog(LOG_ERR, "%: no IO context: %m", msgh);
1411 goto fail;
1412 }
1413
1414 if ( ! CreateIoCompletionPort(h, hndIOCPLPort, (ULONG_PTR)rio, 0)) {
1415 msyslog(LOG_ERR, "%s: Can't add COM port to i/o completion port: %m",
1416 msgh);
1417 goto fail;
1418 }
1419 lpo->io.hnd = h;
1420 memset(&lpo->aux, 0, sizeof(lpo->aux));
1421 if (NULL == (rbuf = get_free_recv_buffer_alloc(TRUE))) {
1422 msyslog(LOG_ERR, "%s: no receive buffer: %m", msgh);
1423 goto fail;
1424 }
1425 return QueueSerialWait(lpo, rbuf);
1426
1427 fail:
1428 rio->ioreg_ctx = iohpDetach(rio->ioreg_ctx);
1429 rio->device_ctx = DevCtxDetach(rio->device_ctx);
1430 return FALSE;
1431 }
1432
1433 /* ----------------------------------------------------------------- */
1434 static void
1435 OnSerialDetach(
1436 ULONG_PTR key,
1437 IoCtx_t * lpo
1438 )
1439 {
1440 /* Make sure the key matches the context info in the shared
1441 * lock, the check for errors. If the error indicates the
1442 * operation was cancelled, let the operation fail silently.
1443 */
1444 IoHndPad_T * iopad = lpo->iopad;
1445
1446 INSIST(NULL != iopad);
1447 if (iopad->handles[0] == lpo->io.hnd) {
1448 iopad->handles[0] = INVALID_HANDLE_VALUE;
1449 iopad->handles[1] = INVALID_HANDLE_VALUE;
1450 iopad->rsrc.rio = NULL;
1451 iopad->riofd = -1;
1452 }
1453 SetEvent(lpo->ppswake);
1454 }
1455
1456
1457 void
1458 io_completion_port_remove_clock_io(
1459 RIO_t *rio
1460 )
1461 {
1462 IoHndPad_T * iopad = (IoHndPad_T*)rio->ioreg_ctx;
1463
1464 INSIST(hndIOCPLPort && hMainRpcDone);
1465 if (iopad)
1466 iocpl_notify(iopad, OnSerialDetach, _get_osfhandle(rio->fd));
1467 }
1468
1469 /*
1470 * -------------------------------------------------------------------
1471 * Socket IO stuff
1472 * -------------------------------------------------------------------
1473 */
1474
1475 /* Queue a receiver on a socket. Returns 0 if no buffer can be queued
1476 *
1477 * Note: As per the WINSOCK documentation, we use WSARecvFrom. Using
1478 * ReadFile() is less efficient. Also, WSARecvFrom delivers
1479 * the remote network address. With ReadFile, getting this
1480 * becomes a chore.
1481 */
1482 static BOOL __fastcall
1483 QueueSocketRecv(
1484 IoCtx_t * lpo,
1485 recvbuf_t * buff
1486 )
1487 {
1488 static const char * const msgh =
1489 "QueueSocketRecv: cannot schedule socket receive";
1490
1491 WSABUF wsabuf;
1492 int rc;
1493
1494 lpo->onIoDone = OnSocketRecv;
1495 lpo->recv_buf = buff;
1496 lpo->flRawMem = 0;
1497 lpo->ioFlags = 0;
1498
1499 buff->fd = lpo->io.sfd;
1500 buff->recv_srcadr_len = sizeof(buff->recv_srcadr);
1501 buff->receiver = receive;
1502 buff->dstadr = lpo->iopad->rsrc.ept;
1503
1504 wsabuf.buf = (char *)buff->recv_buffer;
1505 wsabuf.len = sizeof(buff->recv_buffer);
1506
1507 rc = WSARecvFrom(lpo->io.sfd, &wsabuf, 1, NULL, &lpo->ioFlags,
1508 &buff->recv_srcadr.sa, &buff->recv_srcadr_len,
1509 &lpo->ol, NULL);
1510 return !rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msgh);
1511 }
1512
1513 /* ----------------------------------------------------------------- */
1514 static void
1515 OnSocketRecv(
1516 ULONG_PTR key,
1517 IoCtx_t * lpo
1518 )
1519 {
1520 static const char * const msgh =
1521 "OnSocketRecv: receive from socket failed";
1522
1523 recvbuf_t * buff = NULL;
1524 IoHndPad_T * iopad = NULL;
1525 endpt * ep = NULL;
1526 int rc;
1527
1528 /* order is important -- check first, then get endpoint! */
1529 rc = socketErrorCheck(lpo, msgh);
1530 ep = getEndptFromIoCtx(lpo, key);
1531
1532 /* Make sure this endpoint is not closed. */
1533 if (ep == NULL)
1534 return;
1535
1536 /* We want to start a new read before we process the buffer.
1537 * Since we must not use the context object once it is in
1538 * another IO, we go through some pains to read everything
1539 * before going out for another read request.
1540 * We also need an extra hold to the IOPAD structure.
1541 */
1542 iopad = iohpAttach(lpo->iopad);
1543 if (rc == PKT_OK && lpo->byteCount > 0) {
1544 /* keep input buffer, create new one for IO */
1545 buff = lpo->recv_buf;
1546 lpo->recv_buf = get_free_recv_buffer_alloc(FALSE);
1547 if (lpo->recv_buf) {
1548 buff->recv_time = lpo->aux.RecvTime;
1549 buff->recv_length = (int)lpo->byteCount;
1550 } else {
1551 lpo->recv_buf = buff;
1552 buff = NULL;
1553 ++packets_dropped; /* maybe atomic? */
1554 }
1555
1556 } /* Note: else we use the current buffer again */
1557
1558 if (rc != PKT_SOCKET_ERROR) {
1559 IoCtxStartChecked(lpo, QueueSocketRecv, lpo->recv_buf);
1560 } else {
1561 freerecvbuf(lpo->recv_buf);
1562 IoCtxFree(lpo);
1563 }
1564 /* below this, any usage of 'lpo' is invalid! */
1565
1566 /* If we have a buffer, do some bookkeeping and other chores,
1567 * then feed it to the input queue. And we can be sure we have
1568 * a packet here, so we can update the stats.
1569 */
1570 if (buff) {
1571 INSIST(buff->recv_srcadr_len <= sizeof(buff->recv_srcadr));
1572 DPRINTF(4, ("%sfd %d %s recv packet mode is %d\n",
1573 (MODE_BROADCAST == get_packet_mode(buff))
1574 ? " **** Broadcast "
1575 : "",
1576 (int)buff->fd, stoa(&buff->recv_srcadr),
1577 get_packet_mode(buff)));
1578
1579 if (iohpEndPointOK(iopad)) {
1580 InterlockedIncrement(&ep->received);
1581 InterlockedIncrement(&packets_received);
1582 InterlockedIncrement(&handler_pkts);
1583 }
1584
1585 DPRINTF(2, ("Received %d bytes fd %d in buffer %p from %s, state = %s\n",
1586 buff->recv_length, (int)buff->fd, buff,
1587 stoa(&buff->recv_srcadr), st_packet_handling[rc]));
1588 iohpQueueLocked(iopad, iohpEndPointOK, buff);
1589 }
1590 iohpDetach(iopad);
1591 }
1592
1593 /* ----------------------------------------------------------------- */
1594 static void
1595 OnSocketSend(
1596 ULONG_PTR key,
1597 IoCtx_t * lpo
1598 )
1599 {
1600 /* this is somewhat easier: */
1601 static const char * const msgh =
1602 "OnSocketSend: send to socket failed";
1603
1604 endpt * ep = NULL;
1605 int rc;
1606
1607 /* order is important -- check first, then get endpoint! */
1608 rc = socketErrorCheck(lpo, msgh);
1609 ep = getEndptFromIoCtx(lpo, key);
1610
1611 /* Make sure this endpoint is not closed. */
1612 if (ep == NULL)
1613 return;
1614
1615 if (rc != PKT_OK) {
1616 InterlockedIncrement(&ep->notsent);
1617 InterlockedDecrement(&ep->sent);
1618 InterlockedIncrement(&packets_notsent);
1619 InterlockedDecrement(&packets_sent);
1620 }
1621 IoCtxRelease(lpo);
1622 }
1623
1624 /* --------------------------------------------------------------------
1625 * register and de-register interface endpoints with the IO engine
1626 * --------------------------------------------------------------------
1627 */
1628 static void
1629 OnInterfaceDetach(
1630 ULONG_PTR key,
1631 IoCtx_t * lpo
1632 )
1633 {
1634 IoHndPad_T * iopad = lpo->iopad;
1635
1636 INSIST(NULL != iopad);
1637 iopad->handles[0] = INVALID_HANDLE_VALUE;
1638 iopad->handles[1] = INVALID_HANDLE_VALUE;
1639 iopad->rsrc.ept = NULL;
1640
1641 SetEvent(lpo->ppswake);
1642 }
1643
1644 /* ----------------------------------------------------------------- */
1645 BOOL
1646 io_completion_port_add_interface(
1647 endpt * ep
1648 )
1649 {
1650 /* Registering an endpoint is simple: allocate a shared lock for
1651 * the enpoint and return if the allocation was successful.
1652 */
1653 ep->ioreg_ctx = iohpCreate(ep);
1654 return ep->ioreg_ctx != NULL;
1655 }
1656 /* ----------------------------------------------------------------- */
1657 void
1658 io_completion_port_remove_interface(
1659 endpt * ep
1660 )
1661 {
1662 /* Removing an endpoint is simple, too: Lock the shared lock
1663 * for write access, then invalidate the handles and the
1664 * endpoint pointer. Do an additional detach and leave the
1665 * write lock.
1666 */
1667 IoHndPad_T * iopad = (IoHndPad_T*)ep->ioreg_ctx;
1668
1669 INSIST(hndIOCPLPort && hMainRpcDone);
1670 if (iopad)
1671 iocpl_notify(iopad, OnInterfaceDetach, (UINT_PTR)-1);
1672 }
1673
1674 /* --------------------------------------------------------------------
1675 * register and de-register sockets for an endpoint
1676 * --------------------------------------------------------------------
1677 */
1678
1679 static void
1680 OnSocketDetach(
1681 ULONG_PTR key,
1682 IoCtx_t * lpo
1683 )
1684 {
1685 IoHndPad_T * iopad = lpo->iopad;
1686
1687 INSIST(NULL != iopad);
1688 if (iopad->handles[0] == lpo->io.hnd)
1689 iopad->handles[0] = INVALID_HANDLE_VALUE;
1690 if (iopad->handles[1] == lpo->io.hnd)
1691 iopad->handles[1] = INVALID_HANDLE_VALUE;
1692
1693 SetEvent(lpo->ppswake);
1694 }
1695
1696 /* Add a socket handle to the I/O completion port, and send
1697 * NTP_RECVS_PER_SOCKET receive requests to the kernel.
1698 */
1699 BOOL
1700 io_completion_port_add_socket(
1701 SOCKET sfd,
1702 endpt * ep,
1703 BOOL bcast
1704 )
1705 {
1706 /* Assume the endpoint is already registered. Set the socket
1707 * handle into the proper slot, and then start up the IO engine.
1708 */
1709 static const char * const msgh =
1710 "Can't add socket to i/o completion port";
1711
1712 IoCtx_t * lpo;
1713 size_t n;
1714 ULONG_PTR key;
1715 IoHndPad_T * iopad = NULL;
1716 recvbuf_t * rbuf;
1717
1718 key = ((ULONG_PTR)ep & ~(ULONG_PTR)1u) + !!bcast;
1719
1720 if (NULL == (iopad = (IoHndPad_T*)ep->ioreg_ctx)) {
1721 msyslog(LOG_CRIT, "io_completion_port_add_socket: endpt = %p not registered, exiting",
1722 ep);
1723 exit(1);
1724 } else {
1725 endpt * rep = iopad->rsrc.ept;
1726 iopad->handles[!!bcast] = (HANDLE)sfd;
1727 INSIST(rep == ep);
1728 }
1729
1730 if (NULL == CreateIoCompletionPort((HANDLE)sfd,
1731 hndIOCPLPort, key, 0))
1732 {
1733 msyslog(LOG_ERR, "%s: %m", msgh);
1734 goto fail;
1735 }
1736 for (n = s_SockRecvSched; n > 0; --n) {
1737 if (NULL == (lpo = IoCtxAlloc(ep->ioreg_ctx, NULL))) {
1738 msyslog(LOG_ERR, "%s: no IO context: %m", msgh);
1739 goto fail;
1740 }
1741 lpo->io.sfd = sfd;
1742 if (NULL == (rbuf = get_free_recv_buffer_alloc(FALSE))) {
1743 msyslog(LOG_ERR, "%s: no receive buffer: %m", msgh);
1744 goto fail;
1745 }
1746 if (!QueueSocketRecv(lpo, rbuf))
1747 goto fail;
1748 }
1749 return TRUE;
1750
1751 fail:
1752 ep->ioreg_ctx = iohpDetach(ep->ioreg_ctx);
1753 return FALSE;
1754 }
1755 /* ----------------------------------------------------------------- */
1756 void
1757 io_completion_port_remove_socket(
1758 SOCKET fd,
1759 endpt * ep
1760 )
1761 {
1762 /* Lock the shared lock for write, then search the given
1763 * socket handle and replace it with an invalid handle value.
1764 */
1765 IoHndPad_T * iopad = (IoHndPad_T*)ep->ioreg_ctx;
1766
1767 INSIST(hndIOCPLPort && hMainRpcDone);
1768 if (iopad)
1769 iocpl_notify(iopad, OnSocketDetach, fd);
1770 }
1771
1772
1773 /* --------------------------------------------------------------------
1774 * I/O API functions for endpoints / interfaces
1775 * --------------------------------------------------------------------
1776 */
1777
1778 /* io_completion_port_sendto() -- sendto() replacement for Windows
1779 *
1780 * Returns len after successful send.
1781 * Returns -1 for any error, with the error code available via
1782 * msyslog() %m, or GetLastError().
1783 */
1784 int
1785 io_completion_port_sendto(
1786 endpt * ep,
1787 SOCKET sfd,
1788 void * pkt,
1789 size_t len,
1790 sockaddr_u * dest
1791 )
1792 {
1793 static const char * const msgh =
1794 "sendto: cannot schedule socket send";
1795 static const char * const dmsg =
1796 "overlapped IO data buffer";
1797
1798 IoCtx_t * lpo = NULL;
1799 void * dbuf = NULL;
1800 WSABUF wsabuf;
1801 int rc;
1802
1803 if (len > INT_MAX)
1804 len = INT_MAX;
1805
1806 if (NULL == (dbuf = IOCPLPoolMemDup(pkt, len, dmsg)))
1807 goto fail;
1808 /* We register the IO operation against the shared lock here.
1809 * This is not strictly necessary, since the callback does not
1810 * access the endpoint structure in any way...
1811 */
1812 if (NULL == (lpo = IoCtxAlloc(ep->ioreg_ctx, NULL)))
1813 goto fail;
1814
1815 lpo->onIoDone = OnSocketSend;
1816 lpo->trans_buf = dbuf;
1817 lpo->flRawMem = 1;
1818 lpo->io.sfd = sfd;
1819
1820 wsabuf.buf = (void*)lpo->trans_buf;
1821 wsabuf.len = (DWORD)len;
1822
1823 rc = WSASendTo(sfd, &wsabuf, 1, NULL, 0,
1824 &dest->sa, SOCKLEN(dest),
1825 &lpo->ol, NULL);
1826 if (!rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msgh))
1827 return (int)len; /* normal/success return */
1828
1829 errno = EBADF;
1830 return -1;
1831
1832 fail:
1833 IoCtxFree(lpo);
1834 IOCPLPoolFree(dbuf, dmsg);
1835 return -1;
1836 }
1837
1838 /* --------------------------------------------------------------------
1839 * GetReceivedBuffers
1840 * Note that this is in effect the main loop for processing requests
1841 * both send and receive. This should be reimplemented
1842 */
1843 int
1844 GetReceivedBuffers(void)
1845 {
1846 DWORD index;
1847 HANDLE ready;
1848 int errcode;
1849 BOOL dynbuf;
1850 BOOL have_packet;
1851 char * msgbuf;
1852
1853 have_packet = FALSE;
1854 while (!have_packet) {
1855 index = WaitForMultipleObjectsEx(
1856 ActiveWaitHandles, WaitHandles,
1857 FALSE, INFINITE, TRUE);
1858 switch (index) {
1859
1860 case WAIT_OBJECT_0 + 0: /* Io event */
1861 DPRINTF(4, ("IoEvent occurred\n"));
1862 have_packet = TRUE;
1863 break;
1864
1865 case WAIT_OBJECT_0 + 1: /* exit request */
1866 exit(0);
1867 break;
1868
1869 case WAIT_OBJECT_0 + 2: /* timer */
1870 timer();
1871 break;
1872
1873 case WAIT_IO_COMPLETION: /* there might be something after APC */
1874 have_packet = !!full_recvbuffs();
1875 break;
1876
1877 case WAIT_TIMEOUT:
1878 msyslog(LOG_ERR,
1879 "WaitForMultipleObjectsEx INFINITE timed out.");
1880 break;
1881
1882 case WAIT_FAILED:
1883 dynbuf = FALSE;
1884 errcode = GetLastError();
1885 msgbuf = NTstrerror(errcode, &dynbuf);
1886 msyslog(LOG_ERR,
1887 "WaitForMultipleObjectsEx Failed: Errcode = %n, msg = %s", errcode, msgbuf);
1888 if (dynbuf)
1889 LocalFree(msgbuf);
1890 exit(1);
1891 break;
1892
1893 default:
1894 DEBUG_INSIST((index - WAIT_OBJECT_0) <
1895 ActiveWaitHandles);
1896 ready = WaitHandles[index - WAIT_OBJECT_0];
1897 handle_blocking_resp_sem(ready);
1898 break;
1899
1900 } /* switch */
1901 }
1902
1903 return (full_recvbuffs()); /* get received buffers */
1904 }
1905
1906 #else /*defined(HAVE_IO_COMPLETION_PORT) */
1907 static int NonEmptyCompilationUnit;
1908 #endif /*!defined(HAVE_IO_COMPLETION_PORT) */