"Fossies" - the Fresh Open Source Software Archive

Member "gstreamer-1.16.1/libs/gst/net/gstptpclock.c" (19 Apr 2019, 84492 Bytes) of package /linux/misc/gstreamer-1.16.1.tar.xz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "gstptpclock.c" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 1.14.4_vs_1.16.0.

    1 /* GStreamer
    2  * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
    3  *
    4  *
    5  * This library is free software; you can redistribute it and/or
    6  * modify it under the terms of the GNU Library General Public
    7  * License as published by the Free Software Foundation; either
    8  * version 2 of the License, or (at your option) any later version.
    9  *
   10  * This library is distributed in the hope that it will be useful,
   11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
   12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
   13  * Library General Public License for more details.
   14  *
   15  * You should have received a copy of the GNU Library General Public
   16  * License along with this library; if not, write to the
   17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
   18  * Boston, MA 02110-1301, USA.
   19  */
   20 /**
   21  * SECTION:gstptpclock
   22  * @title: GstPtpClock
   23  * @short_description: Special clock that synchronizes to a remote time
   24  *                     provider via PTP (IEEE1588:2008).
   25  * @see_also: #GstClock, #GstNetClientClock, #GstPipeline
   26  *
   27  * GstPtpClock implements a PTP (IEEE1588:2008) ordinary clock in slave-only
   28  * mode, that allows a GStreamer pipeline to synchronize to a PTP network
   29  * clock in some specific domain.
   30  *
   31  * The PTP subsystem can be initialized with gst_ptp_init(), which then starts
   32  * a helper process to do the actual communication via the PTP ports. This is
   33  * required as PTP listens on ports < 1024 and thus requires special
   34  * privileges. Once this helper process is started, the main process will
   35  * synchronize to all PTP domains that are detected on the selected
   36  * interfaces.
   37  *
   38  * gst_ptp_clock_new() then allows to create a GstClock that provides the PTP
   39  * time from a master clock inside a specific PTP domain. This clock will only
   40  * return valid timestamps once the timestamps in the PTP domain are known. To
   41  * check this, you can use gst_clock_wait_for_sync(), the GstClock::synced
   42  * signal and gst_clock_is_synced().
   43  *
   44  * To gather statistics about the PTP clock synchronization,
   45  * gst_ptp_statistics_callback_add() can be used. This gives the application
   46  * the possibility to collect all kinds of statistics from the clock
   47  * synchronization.
   48  *
   49  * Since: 1.6
   50  *
   51  */
   52 #ifdef HAVE_CONFIG_H
   53 #include "config.h"
   54 #endif
   55 
   56 #include "gstptpclock.h"
   57 
   58 #include "gstptp_private.h"
   59 
   60 #ifdef HAVE_SYS_WAIT_H
   61 #include <sys/wait.h>
   62 #endif
   63 #ifdef G_OS_WIN32
   64 #define WIN32_LEAN_AND_MEAN
   65 #include <windows.h>
   66 #endif
   67 #include <sys/types.h>
   68 
   69 #ifdef HAVE_UNISTD_H
   70 #include <unistd.h>
   71 #elif defined(G_OS_WIN32)
   72 #include <io.h>
   73 #endif
   74 
   75 #include <gst/base/base.h>
   76 
   77 GST_DEBUG_CATEGORY_STATIC (ptp_debug);
   78 #define GST_CAT_DEFAULT (ptp_debug)
   79 
   80 /* IEEE 1588 7.7.3.1 */
   81 #define PTP_ANNOUNCE_RECEIPT_TIMEOUT 4
   82 
   83 /* Use a running average for calculating the mean path delay instead
   84  * of just using the last measurement. Enabling this helps in unreliable
   85  * networks, like wifi, with often changing delays
   86  *
   87  * Undef for following IEEE1588-2008 by the letter
   88  */
   89 #define USE_RUNNING_AVERAGE_DELAY 1
   90 
   91 /* Filter out any measurements that are above a certain threshold compared to
   92  * previous measurements. Enabling this helps filtering out outliers that
   93  * happen fairly often in unreliable networks, like wifi.
   94  *
   95  * Undef for following IEEE1588-2008 by the letter
   96  */
   97 #define USE_MEASUREMENT_FILTERING 1
   98 
   99 /* Select the first clock from which we capture a SYNC message as the master
  100  * clock of the domain until we are ready to run the best master clock
  101  * algorithm. This allows faster syncing but might mean a change of the master
  102  * clock in the beginning. As all clocks in a domain are supposed to use the
  103  * same time, this shouldn't be much of a problem.
  104  *
  105  * Undef for following IEEE1588-2008 by the letter
  106  */
  107 #define USE_OPPORTUNISTIC_CLOCK_SELECTION 1
  108 
  109 /* Only consider SYNC messages for which we are allowed to send a DELAY_REQ
  110  * afterwards. This allows better synchronization in networks with varying
  111  * delays, as for every other SYNC message we would have to assume that it's
  112  * the average of what we saw before. But that might be completely off
  113  */
  114 #define USE_ONLY_SYNC_WITH_DELAY 1
  115 
  116 /* Filter out delay measurements that are too far away from the median of the
  117  * last delay measurements, currently those that are more than 2 times as big.
  118  * This increases accuracy a lot on wifi.
  119  */
  120 #define USE_MEDIAN_PRE_FILTERING 1
  121 #define MEDIAN_PRE_FILTERING_WINDOW 9
  122 
  123 /* How many updates should be skipped at maximum when using USE_MEASUREMENT_FILTERING */
  124 #define MAX_SKIPPED_UPDATES 5
  125 
  126 typedef enum
  127 {
  128   PTP_MESSAGE_TYPE_SYNC = 0x0,
  129   PTP_MESSAGE_TYPE_DELAY_REQ = 0x1,
  130   PTP_MESSAGE_TYPE_PDELAY_REQ = 0x2,
  131   PTP_MESSAGE_TYPE_PDELAY_RESP = 0x3,
  132   PTP_MESSAGE_TYPE_FOLLOW_UP = 0x8,
  133   PTP_MESSAGE_TYPE_DELAY_RESP = 0x9,
  134   PTP_MESSAGE_TYPE_PDELAY_RESP_FOLLOW_UP = 0xA,
  135   PTP_MESSAGE_TYPE_ANNOUNCE = 0xB,
  136   PTP_MESSAGE_TYPE_SIGNALING = 0xC,
  137   PTP_MESSAGE_TYPE_MANAGEMENT = 0xD
  138 } PtpMessageType;
  139 
  140 typedef struct
  141 {
  142   guint64 seconds_field;        /* 48 bits valid */
  143   guint32 nanoseconds_field;
  144 } PtpTimestamp;
  145 
  146 #define PTP_TIMESTAMP_TO_GST_CLOCK_TIME(ptp) (ptp.seconds_field * GST_SECOND + ptp.nanoseconds_field)
  147 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_SECONDS(gst) (((GstClockTime) gst) / GST_SECOND)
  148 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_NANOSECONDS(gst) (((GstClockTime) gst) % GST_SECOND)
  149 
  150 typedef struct
  151 {
  152   guint64 clock_identity;
  153   guint16 port_number;
  154 } PtpClockIdentity;
  155 
  156 static gint
  157 compare_clock_identity (const PtpClockIdentity * a, const PtpClockIdentity * b)
  158 {
  159   if (a->clock_identity < b->clock_identity)
  160     return -1;
  161   else if (a->clock_identity > b->clock_identity)
  162     return 1;
  163 
  164   if (a->port_number < b->port_number)
  165     return -1;
  166   else if (a->port_number > b->port_number)
  167     return 1;
  168 
  169   return 0;
  170 }
  171 
  172 typedef struct
  173 {
  174   guint8 clock_class;
  175   guint8 clock_accuracy;
  176   guint16 offset_scaled_log_variance;
  177 } PtpClockQuality;
  178 
  179 typedef struct
  180 {
  181   guint8 transport_specific;
  182   PtpMessageType message_type;
  183   /* guint8 reserved; */
  184   guint8 version_ptp;
  185   guint16 message_length;
  186   guint8 domain_number;
  187   /* guint8 reserved; */
  188   guint16 flag_field;
  189   gint64 correction_field;      /* 48.16 fixed point nanoseconds */
  190   /* guint32 reserved; */
  191   PtpClockIdentity source_port_identity;
  192   guint16 sequence_id;
  193   guint8 control_field;
  194   gint8 log_message_interval;
  195 
  196   union
  197   {
  198     struct
  199     {
  200       PtpTimestamp origin_timestamp;
  201       gint16 current_utc_offset;
  202       /* guint8 reserved; */
  203       guint8 grandmaster_priority_1;
  204       PtpClockQuality grandmaster_clock_quality;
  205       guint8 grandmaster_priority_2;
  206       guint64 grandmaster_identity;
  207       guint16 steps_removed;
  208       guint8 time_source;
  209     } announce;
  210 
  211     struct
  212     {
  213       PtpTimestamp origin_timestamp;
  214     } sync;
  215 
  216     struct
  217     {
  218       PtpTimestamp precise_origin_timestamp;
  219     } follow_up;
  220 
  221     struct
  222     {
  223       PtpTimestamp origin_timestamp;
  224     } delay_req;
  225 
  226     struct
  227     {
  228       PtpTimestamp receive_timestamp;
  229       PtpClockIdentity requesting_port_identity;
  230     } delay_resp;
  231 
  232   } message_specific;
  233 } PtpMessage;
  234 
  235 static GMutex ptp_lock;
  236 static GCond ptp_cond;
  237 static gboolean initted = FALSE;
  238 #ifdef HAVE_PTP
  239 static gboolean supported = TRUE;
  240 #else
  241 static gboolean supported = FALSE;
  242 #endif
  243 static GPid ptp_helper_pid;
  244 static GThread *ptp_helper_thread;
  245 static GMainContext *main_context;
  246 static GMainLoop *main_loop;
  247 static GIOChannel *stdin_channel, *stdout_channel;
  248 static GRand *delay_req_rand;
  249 static GstClock *observation_system_clock;
  250 static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 };
  251 
  252 typedef struct
  253 {
  254   GstClockTime receive_time;
  255 
  256   PtpClockIdentity master_clock_identity;
  257 
  258   guint8 grandmaster_priority_1;
  259   PtpClockQuality grandmaster_clock_quality;
  260   guint8 grandmaster_priority_2;
  261   guint64 grandmaster_identity;
  262   guint16 steps_removed;
  263   guint8 time_source;
  264 
  265   guint16 sequence_id;
  266 } PtpAnnounceMessage;
  267 
  268 typedef struct
  269 {
  270   PtpClockIdentity master_clock_identity;
  271 
  272   GstClockTime announce_interval;       /* last interval we received */
  273   GQueue announce_messages;
  274 } PtpAnnounceSender;
  275 
  276 typedef struct
  277 {
  278   guint domain;
  279   PtpClockIdentity master_clock_identity;
  280 
  281   guint16 sync_seqnum;
  282   GstClockTime sync_recv_time_local;    /* t2 */
  283   GstClockTime sync_send_time_remote;   /* t1, might be -1 if FOLLOW_UP pending */
  284   GstClockTime follow_up_recv_time_local;
  285 
  286   GSource *timeout_source;
  287   guint16 delay_req_seqnum;
  288   GstClockTime delay_req_send_time_local;       /* t3, -1 if we wait for FOLLOW_UP */
  289   GstClockTime delay_req_recv_time_remote;      /* t4, -1 if we wait */
  290   GstClockTime delay_resp_recv_time_local;
  291 
  292   gint64 correction_field_sync; /* sum of the correction fields of SYNC/FOLLOW_UP */
  293   gint64 correction_field_delay;        /* sum of the correction fields of DELAY_RESP */
  294 } PtpPendingSync;
  295 
  296 static void
  297 ptp_pending_sync_free (PtpPendingSync * sync)
  298 {
  299   if (sync->timeout_source) {
  300     g_source_destroy (sync->timeout_source);
  301     g_source_unref (sync->timeout_source);
  302   }
  303   g_free (sync);
  304 }
  305 
  306 typedef struct
  307 {
  308   guint domain;
  309 
  310   GstClockTime last_ptp_time;
  311   GstClockTime last_local_time;
  312   gint skipped_updates;
  313 
  314   /* Used for selecting the master/grandmaster */
  315   GList *announce_senders;
  316 
  317   /* Last selected master clock */
  318   gboolean have_master_clock;
  319   PtpClockIdentity master_clock_identity;
  320   guint64 grandmaster_identity;
  321 
  322   /* Last SYNC or FOLLOW_UP timestamp we received */
  323   GstClockTime last_ptp_sync_time;
  324   GstClockTime sync_interval;
  325 
  326   GstClockTime mean_path_delay;
  327   GstClockTime last_delay_req, min_delay_req_interval;
  328   guint16 last_delay_req_seqnum;
  329 
  330   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
  331   gint last_path_delays_missing;
  332 
  333   GQueue pending_syncs;
  334 
  335   GstClock *domain_clock;
  336 } PtpDomainData;
  337 
  338 static GList *domain_data;
  339 static GMutex domain_clocks_lock;
  340 static GList *domain_clocks;
  341 
  342 /* Protected by PTP lock */
  343 static void emit_ptp_statistics (guint8 domain, const GstStructure * stats);
  344 static GHookList domain_stats_hooks;
  345 static gint domain_stats_n_hooks;
  346 static gboolean domain_stats_hooks_initted = FALSE;
  347 
  348 /* Converts log2 seconds to GstClockTime */
  349 static GstClockTime
  350 log2_to_clock_time (gint l)
  351 {
  352   if (l < 0)
  353     return GST_SECOND >> (-l);
  354   else
  355     return GST_SECOND << l;
  356 }
  357 
  358 static void
  359 dump_ptp_message (PtpMessage * msg)
  360 {
  361   GST_TRACE ("PTP message:");
  362   GST_TRACE ("\ttransport_specific: %u", msg->transport_specific);
  363   GST_TRACE ("\tmessage_type: 0x%01x", msg->message_type);
  364   GST_TRACE ("\tversion_ptp: %u", msg->version_ptp);
  365   GST_TRACE ("\tmessage_length: %u", msg->message_length);
  366   GST_TRACE ("\tdomain_number: %u", msg->domain_number);
  367   GST_TRACE ("\tflag_field: 0x%04x", msg->flag_field);
  368   GST_TRACE ("\tcorrection_field: %" G_GINT64_FORMAT ".%03u",
  369       (msg->correction_field / 65536),
  370       (guint) ((msg->correction_field & 0xffff) * 1000) / 65536);
  371   GST_TRACE ("\tsource_port_identity: 0x%016" G_GINT64_MODIFIER "x %u",
  372       msg->source_port_identity.clock_identity,
  373       msg->source_port_identity.port_number);
  374   GST_TRACE ("\tsequence_id: %u", msg->sequence_id);
  375   GST_TRACE ("\tcontrol_field: 0x%02x", msg->control_field);
  376   GST_TRACE ("\tmessage_interval: %" GST_TIME_FORMAT,
  377       GST_TIME_ARGS (log2_to_clock_time (msg->log_message_interval)));
  378 
  379   switch (msg->message_type) {
  380     case PTP_MESSAGE_TYPE_ANNOUNCE:
  381       GST_TRACE ("\tANNOUNCE:");
  382       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
  383           msg->message_specific.announce.origin_timestamp.seconds_field,
  384           msg->message_specific.announce.origin_timestamp.nanoseconds_field);
  385       GST_TRACE ("\t\tcurrent_utc_offset: %d",
  386           msg->message_specific.announce.current_utc_offset);
  387       GST_TRACE ("\t\tgrandmaster_priority_1: %u",
  388           msg->message_specific.announce.grandmaster_priority_1);
  389       GST_TRACE ("\t\tgrandmaster_clock_quality: 0x%02x 0x%02x %u",
  390           msg->message_specific.announce.grandmaster_clock_quality.clock_class,
  391           msg->message_specific.announce.
  392           grandmaster_clock_quality.clock_accuracy,
  393           msg->message_specific.announce.
  394           grandmaster_clock_quality.offset_scaled_log_variance);
  395       GST_TRACE ("\t\tgrandmaster_priority_2: %u",
  396           msg->message_specific.announce.grandmaster_priority_2);
  397       GST_TRACE ("\t\tgrandmaster_identity: 0x%016" G_GINT64_MODIFIER "x",
  398           msg->message_specific.announce.grandmaster_identity);
  399       GST_TRACE ("\t\tsteps_removed: %u",
  400           msg->message_specific.announce.steps_removed);
  401       GST_TRACE ("\t\ttime_source: 0x%02x",
  402           msg->message_specific.announce.time_source);
  403       break;
  404     case PTP_MESSAGE_TYPE_SYNC:
  405       GST_TRACE ("\tSYNC:");
  406       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
  407           msg->message_specific.sync.origin_timestamp.seconds_field,
  408           msg->message_specific.sync.origin_timestamp.nanoseconds_field);
  409       break;
  410     case PTP_MESSAGE_TYPE_FOLLOW_UP:
  411       GST_TRACE ("\tFOLLOW_UP:");
  412       GST_TRACE ("\t\tprecise_origin_timestamp: %" G_GUINT64_FORMAT ".%09u",
  413           msg->message_specific.follow_up.
  414           precise_origin_timestamp.seconds_field,
  415           msg->message_specific.follow_up.
  416           precise_origin_timestamp.nanoseconds_field);
  417       break;
  418     case PTP_MESSAGE_TYPE_DELAY_REQ:
  419       GST_TRACE ("\tDELAY_REQ:");
  420       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
  421           msg->message_specific.delay_req.origin_timestamp.seconds_field,
  422           msg->message_specific.delay_req.origin_timestamp.nanoseconds_field);
  423       break;
  424     case PTP_MESSAGE_TYPE_DELAY_RESP:
  425       GST_TRACE ("\tDELAY_RESP:");
  426       GST_TRACE ("\t\treceive_timestamp: %" G_GUINT64_FORMAT ".%09u",
  427           msg->message_specific.delay_resp.receive_timestamp.seconds_field,
  428           msg->message_specific.delay_resp.receive_timestamp.nanoseconds_field);
  429       GST_TRACE ("\t\trequesting_port_identity: 0x%016" G_GINT64_MODIFIER
  430           "x %u",
  431           msg->message_specific.delay_resp.
  432           requesting_port_identity.clock_identity,
  433           msg->message_specific.delay_resp.
  434           requesting_port_identity.port_number);
  435       break;
  436     default:
  437       break;
  438   }
  439   GST_TRACE (" ");
  440 }
  441 
  442 /* IEEE 1588-2008 5.3.3 */
  443 static gboolean
  444 parse_ptp_timestamp (PtpTimestamp * timestamp, GstByteReader * reader)
  445 {
  446   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 10, FALSE);
  447 
  448   timestamp->seconds_field =
  449       (((guint64) gst_byte_reader_get_uint32_be_unchecked (reader)) << 16) |
  450       gst_byte_reader_get_uint16_be_unchecked (reader);
  451   timestamp->nanoseconds_field =
  452       gst_byte_reader_get_uint32_be_unchecked (reader);
  453 
  454   if (timestamp->nanoseconds_field >= 1000000000)
  455     return FALSE;
  456 
  457   return TRUE;
  458 }
  459 
  460 /* IEEE 1588-2008 13.3 */
  461 static gboolean
  462 parse_ptp_message_header (PtpMessage * msg, GstByteReader * reader)
  463 {
  464   guint8 b;
  465 
  466   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 34, FALSE);
  467 
  468   b = gst_byte_reader_get_uint8_unchecked (reader);
  469   msg->transport_specific = b >> 4;
  470   msg->message_type = b & 0x0f;
  471 
  472   b = gst_byte_reader_get_uint8_unchecked (reader);
  473   msg->version_ptp = b & 0x0f;
  474   if (msg->version_ptp != 2) {
  475     GST_WARNING ("Unsupported PTP message version (%u != 2)", msg->version_ptp);
  476     return FALSE;
  477   }
  478 
  479   msg->message_length = gst_byte_reader_get_uint16_be_unchecked (reader);
  480   if (gst_byte_reader_get_remaining (reader) + 4 < msg->message_length) {
  481     GST_WARNING ("Not enough data (%u < %u)",
  482         gst_byte_reader_get_remaining (reader) + 4, msg->message_length);
  483     return FALSE;
  484   }
  485 
  486   msg->domain_number = gst_byte_reader_get_uint8_unchecked (reader);
  487   gst_byte_reader_skip_unchecked (reader, 1);
  488 
  489   msg->flag_field = gst_byte_reader_get_uint16_be_unchecked (reader);
  490   msg->correction_field = gst_byte_reader_get_uint64_be_unchecked (reader);
  491   gst_byte_reader_skip_unchecked (reader, 4);
  492 
  493   msg->source_port_identity.clock_identity =
  494       gst_byte_reader_get_uint64_be_unchecked (reader);
  495   msg->source_port_identity.port_number =
  496       gst_byte_reader_get_uint16_be_unchecked (reader);
  497 
  498   msg->sequence_id = gst_byte_reader_get_uint16_be_unchecked (reader);
  499   msg->control_field = gst_byte_reader_get_uint8_unchecked (reader);
  500   msg->log_message_interval = gst_byte_reader_get_uint8_unchecked (reader);
  501 
  502   return TRUE;
  503 }
  504 
  505 /* IEEE 1588-2008 13.5 */
  506 static gboolean
  507 parse_ptp_message_announce (PtpMessage * msg, GstByteReader * reader)
  508 {
  509   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_ANNOUNCE, FALSE);
  510 
  511   if (gst_byte_reader_get_remaining (reader) < 20)
  512     return FALSE;
  513 
  514   if (!parse_ptp_timestamp (&msg->message_specific.announce.origin_timestamp,
  515           reader))
  516     return FALSE;
  517 
  518   msg->message_specific.announce.current_utc_offset =
  519       gst_byte_reader_get_uint16_be_unchecked (reader);
  520   gst_byte_reader_skip_unchecked (reader, 1);
  521 
  522   msg->message_specific.announce.grandmaster_priority_1 =
  523       gst_byte_reader_get_uint8_unchecked (reader);
  524   msg->message_specific.announce.grandmaster_clock_quality.clock_class =
  525       gst_byte_reader_get_uint8_unchecked (reader);
  526   msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy =
  527       gst_byte_reader_get_uint8_unchecked (reader);
  528   msg->message_specific.announce.
  529       grandmaster_clock_quality.offset_scaled_log_variance =
  530       gst_byte_reader_get_uint16_be_unchecked (reader);
  531   msg->message_specific.announce.grandmaster_priority_2 =
  532       gst_byte_reader_get_uint8_unchecked (reader);
  533   msg->message_specific.announce.grandmaster_identity =
  534       gst_byte_reader_get_uint64_be_unchecked (reader);
  535   msg->message_specific.announce.steps_removed =
  536       gst_byte_reader_get_uint16_be_unchecked (reader);
  537   msg->message_specific.announce.time_source =
  538       gst_byte_reader_get_uint8_unchecked (reader);
  539 
  540   return TRUE;
  541 }
  542 
  543 /* IEEE 1588-2008 13.6 */
  544 static gboolean
  545 parse_ptp_message_sync (PtpMessage * msg, GstByteReader * reader)
  546 {
  547   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_SYNC, FALSE);
  548 
  549   if (gst_byte_reader_get_remaining (reader) < 10)
  550     return FALSE;
  551 
  552   if (!parse_ptp_timestamp (&msg->message_specific.sync.origin_timestamp,
  553           reader))
  554     return FALSE;
  555 
  556   return TRUE;
  557 }
  558 
  559 /* IEEE 1588-2008 13.6 */
  560 static gboolean
  561 parse_ptp_message_delay_req (PtpMessage * msg, GstByteReader * reader)
  562 {
  563   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_REQ, FALSE);
  564 
  565   if (gst_byte_reader_get_remaining (reader) < 10)
  566     return FALSE;
  567 
  568   if (!parse_ptp_timestamp (&msg->message_specific.delay_req.origin_timestamp,
  569           reader))
  570     return FALSE;
  571 
  572   return TRUE;
  573 }
  574 
  575 /* IEEE 1588-2008 13.7 */
  576 static gboolean
  577 parse_ptp_message_follow_up (PtpMessage * msg, GstByteReader * reader)
  578 {
  579   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_FOLLOW_UP, FALSE);
  580 
  581   if (gst_byte_reader_get_remaining (reader) < 10)
  582     return FALSE;
  583 
  584   if (!parse_ptp_timestamp (&msg->message_specific.
  585           follow_up.precise_origin_timestamp, reader))
  586     return FALSE;
  587 
  588   return TRUE;
  589 }
  590 
  591 /* IEEE 1588-2008 13.8 */
  592 static gboolean
  593 parse_ptp_message_delay_resp (PtpMessage * msg, GstByteReader * reader)
  594 {
  595   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_RESP,
  596       FALSE);
  597 
  598   if (gst_byte_reader_get_remaining (reader) < 20)
  599     return FALSE;
  600 
  601   if (!parse_ptp_timestamp (&msg->message_specific.delay_resp.receive_timestamp,
  602           reader))
  603     return FALSE;
  604 
  605   msg->message_specific.delay_resp.requesting_port_identity.clock_identity =
  606       gst_byte_reader_get_uint64_be_unchecked (reader);
  607   msg->message_specific.delay_resp.requesting_port_identity.port_number =
  608       gst_byte_reader_get_uint16_be_unchecked (reader);
  609 
  610   return TRUE;
  611 }
  612 
  613 static gboolean
  614 parse_ptp_message (PtpMessage * msg, const guint8 * data, gsize size)
  615 {
  616   GstByteReader reader;
  617   gboolean ret = FALSE;
  618 
  619   gst_byte_reader_init (&reader, data, size);
  620 
  621   if (!parse_ptp_message_header (msg, &reader)) {
  622     GST_WARNING ("Failed to parse PTP message header");
  623     return FALSE;
  624   }
  625 
  626   switch (msg->message_type) {
  627     case PTP_MESSAGE_TYPE_SYNC:
  628       ret = parse_ptp_message_sync (msg, &reader);
  629       break;
  630     case PTP_MESSAGE_TYPE_FOLLOW_UP:
  631       ret = parse_ptp_message_follow_up (msg, &reader);
  632       break;
  633     case PTP_MESSAGE_TYPE_DELAY_REQ:
  634       ret = parse_ptp_message_delay_req (msg, &reader);
  635       break;
  636     case PTP_MESSAGE_TYPE_DELAY_RESP:
  637       ret = parse_ptp_message_delay_resp (msg, &reader);
  638       break;
  639     case PTP_MESSAGE_TYPE_ANNOUNCE:
  640       ret = parse_ptp_message_announce (msg, &reader);
  641       break;
  642     default:
  643       /* ignore for now */
  644       break;
  645   }
  646 
  647   return ret;
  648 }
  649 
  650 static gint
  651 compare_announce_message (const PtpAnnounceMessage * a,
  652     const PtpAnnounceMessage * b)
  653 {
  654   /* IEEE 1588 Figure 27 */
  655   if (a->grandmaster_identity == b->grandmaster_identity) {
  656     if (a->steps_removed + 1 < b->steps_removed)
  657       return -1;
  658     else if (a->steps_removed > b->steps_removed + 1)
  659       return 1;
  660 
  661     /* Error cases are filtered out earlier */
  662     if (a->steps_removed < b->steps_removed)
  663       return -1;
  664     else if (a->steps_removed > b->steps_removed)
  665       return 1;
  666 
  667     /* Error cases are filtered out earlier */
  668     if (a->master_clock_identity.clock_identity <
  669         b->master_clock_identity.clock_identity)
  670       return -1;
  671     else if (a->master_clock_identity.clock_identity >
  672         b->master_clock_identity.clock_identity)
  673       return 1;
  674 
  675     /* Error cases are filtered out earlier */
  676     if (a->master_clock_identity.port_number <
  677         b->master_clock_identity.port_number)
  678       return -1;
  679     else if (a->master_clock_identity.port_number >
  680         b->master_clock_identity.port_number)
  681       return 1;
  682     else
  683       g_assert_not_reached ();
  684 
  685     return 0;
  686   }
  687 
  688   if (a->grandmaster_priority_1 < b->grandmaster_priority_1)
  689     return -1;
  690   else if (a->grandmaster_priority_1 > b->grandmaster_priority_1)
  691     return 1;
  692 
  693   if (a->grandmaster_clock_quality.clock_class <
  694       b->grandmaster_clock_quality.clock_class)
  695     return -1;
  696   else if (a->grandmaster_clock_quality.clock_class >
  697       b->grandmaster_clock_quality.clock_class)
  698     return 1;
  699 
  700   if (a->grandmaster_clock_quality.clock_accuracy <
  701       b->grandmaster_clock_quality.clock_accuracy)
  702     return -1;
  703   else if (a->grandmaster_clock_quality.clock_accuracy >
  704       b->grandmaster_clock_quality.clock_accuracy)
  705     return 1;
  706 
  707   if (a->grandmaster_clock_quality.offset_scaled_log_variance <
  708       b->grandmaster_clock_quality.offset_scaled_log_variance)
  709     return -1;
  710   else if (a->grandmaster_clock_quality.offset_scaled_log_variance >
  711       b->grandmaster_clock_quality.offset_scaled_log_variance)
  712     return 1;
  713 
  714   if (a->grandmaster_priority_2 < b->grandmaster_priority_2)
  715     return -1;
  716   else if (a->grandmaster_priority_2 > b->grandmaster_priority_2)
  717     return 1;
  718 
  719   if (a->grandmaster_identity < b->grandmaster_identity)
  720     return -1;
  721   else if (a->grandmaster_identity > b->grandmaster_identity)
  722     return 1;
  723   else
  724     g_assert_not_reached ();
  725 
  726   return 0;
  727 }
  728 
  729 static void
  730 select_best_master_clock (PtpDomainData * domain, GstClockTime now)
  731 {
  732   GList *qualified_messages = NULL;
  733   GList *l, *m;
  734   PtpAnnounceMessage *best = NULL;
  735 
  736   /* IEEE 1588 9.3.2.5 */
  737   for (l = domain->announce_senders; l; l = l->next) {
  738     PtpAnnounceSender *sender = l->data;
  739     GstClockTime window = 4 * sender->announce_interval;
  740     gint count = 0;
  741 
  742     for (m = sender->announce_messages.head; m; m = m->next) {
  743       PtpAnnounceMessage *msg = m->data;
  744 
  745       if (now - msg->receive_time <= window)
  746         count++;
  747     }
  748 
  749     /* Only include the newest message of announce senders that had at least 2
  750      * announce messages in the last 4 announce intervals. Which also means
  751      * that we wait at least 4 announce intervals before we select a master
  752      * clock. Until then we just report based on the newest SYNC we received
  753      */
  754     if (count >= 2) {
  755       qualified_messages =
  756           g_list_prepend (qualified_messages,
  757           g_queue_peek_tail (&sender->announce_messages));
  758     }
  759   }
  760 
  761   if (!qualified_messages) {
  762     GST_DEBUG
  763         ("No qualified announce messages for domain %u, can't select a master clock",
  764         domain->domain);
  765     domain->have_master_clock = FALSE;
  766     return;
  767   }
  768 
  769   for (l = qualified_messages; l; l = l->next) {
  770     PtpAnnounceMessage *msg = l->data;
  771 
  772     if (!best || compare_announce_message (msg, best) < 0)
  773       best = msg;
  774   }
  775 
  776   if (domain->have_master_clock
  777       && compare_clock_identity (&domain->master_clock_identity,
  778           &best->master_clock_identity) == 0) {
  779     GST_DEBUG ("Master clock in domain %u did not change", domain->domain);
  780   } else {
  781     GST_DEBUG ("Selected master clock for domain %u: 0x%016" G_GINT64_MODIFIER
  782         "x %u with grandmaster clock 0x%016" G_GINT64_MODIFIER "x",
  783         domain->domain, best->master_clock_identity.clock_identity,
  784         best->master_clock_identity.port_number, best->grandmaster_identity);
  785 
  786     domain->have_master_clock = TRUE;
  787     domain->grandmaster_identity = best->grandmaster_identity;
  788 
  789     /* Opportunistic master clock selection likely gave us the same master
  790      * clock before, no need to reset all statistics */
  791     if (compare_clock_identity (&domain->master_clock_identity,
  792             &best->master_clock_identity) != 0) {
  793       memcpy (&domain->master_clock_identity, &best->master_clock_identity,
  794           sizeof (PtpClockIdentity));
  795       domain->mean_path_delay = 0;
  796       domain->last_delay_req = 0;
  797       domain->last_path_delays_missing = 9;
  798       domain->min_delay_req_interval = 0;
  799       domain->sync_interval = 0;
  800       domain->last_ptp_sync_time = 0;
  801       domain->skipped_updates = 0;
  802       g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
  803           NULL);
  804       g_queue_clear (&domain->pending_syncs);
  805     }
  806 
  807     if (g_atomic_int_get (&domain_stats_n_hooks)) {
  808       GstStructure *stats =
  809           gst_structure_new (GST_PTP_STATISTICS_BEST_MASTER_CLOCK_SELECTED,
  810           "domain", G_TYPE_UINT, domain->domain,
  811           "master-clock-id", G_TYPE_UINT64,
  812           domain->master_clock_identity.clock_identity,
  813           "master-clock-port", G_TYPE_UINT,
  814           domain->master_clock_identity.port_number,
  815           "grandmaster-clock-id", G_TYPE_UINT64, domain->grandmaster_identity,
  816           NULL);
  817       emit_ptp_statistics (domain->domain, stats);
  818       gst_structure_free (stats);
  819     }
  820   }
  821 }
  822 
  823 static void
  824 handle_announce_message (PtpMessage * msg, GstClockTime receive_time)
  825 {
  826   GList *l;
  827   PtpDomainData *domain = NULL;
  828   PtpAnnounceSender *sender = NULL;
  829   PtpAnnounceMessage *announce;
  830 
  831   /* IEEE1588 9.3.2.2 e)
  832    * Don't consider messages with the alternate master flag set
  833    */
  834   if ((msg->flag_field & 0x0100))
  835     return;
  836 
  837   /* IEEE 1588 9.3.2.5 d)
  838    * Don't consider announce messages with steps_removed>=255
  839    */
  840   if (msg->message_specific.announce.steps_removed >= 255)
  841     return;
  842 
  843   for (l = domain_data; l; l = l->next) {
  844     PtpDomainData *tmp = l->data;
  845 
  846     if (tmp->domain == msg->domain_number) {
  847       domain = tmp;
  848       break;
  849     }
  850   }
  851 
  852   if (!domain) {
  853     gchar *clock_name;
  854 
  855     domain = g_new0 (PtpDomainData, 1);
  856     domain->domain = msg->domain_number;
  857     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
  858     domain->domain_clock =
  859         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
  860     gst_object_ref_sink (domain->domain_clock);
  861     g_free (clock_name);
  862     g_queue_init (&domain->pending_syncs);
  863     domain->last_path_delays_missing = 9;
  864     domain_data = g_list_prepend (domain_data, domain);
  865 
  866     g_mutex_lock (&domain_clocks_lock);
  867     domain_clocks = g_list_prepend (domain_clocks, domain);
  868     g_mutex_unlock (&domain_clocks_lock);
  869 
  870     if (g_atomic_int_get (&domain_stats_n_hooks)) {
  871       GstStructure *stats =
  872           gst_structure_new (GST_PTP_STATISTICS_NEW_DOMAIN_FOUND, "domain",
  873           G_TYPE_UINT, domain->domain, "clock", GST_TYPE_CLOCK,
  874           domain->domain_clock, NULL);
  875       emit_ptp_statistics (domain->domain, stats);
  876       gst_structure_free (stats);
  877     }
  878   }
  879 
  880   for (l = domain->announce_senders; l; l = l->next) {
  881     PtpAnnounceSender *tmp = l->data;
  882 
  883     if (compare_clock_identity (&tmp->master_clock_identity,
  884             &msg->source_port_identity) == 0) {
  885       sender = tmp;
  886       break;
  887     }
  888   }
  889 
  890   if (!sender) {
  891     sender = g_new0 (PtpAnnounceSender, 1);
  892 
  893     memcpy (&sender->master_clock_identity, &msg->source_port_identity,
  894         sizeof (PtpClockIdentity));
  895     g_queue_init (&sender->announce_messages);
  896     domain->announce_senders =
  897         g_list_prepend (domain->announce_senders, sender);
  898   }
  899 
  900   for (l = sender->announce_messages.head; l; l = l->next) {
  901     PtpAnnounceMessage *tmp = l->data;
  902 
  903     /* IEEE 1588 9.3.2.5 c)
  904      * Don't consider identical messages, i.e. duplicates
  905      */
  906     if (tmp->sequence_id == msg->sequence_id)
  907       return;
  908   }
  909 
  910   sender->announce_interval = log2_to_clock_time (msg->log_message_interval);
  911 
  912   announce = g_new0 (PtpAnnounceMessage, 1);
  913   announce->receive_time = receive_time;
  914   announce->sequence_id = msg->sequence_id;
  915   memcpy (&announce->master_clock_identity, &msg->source_port_identity,
  916       sizeof (PtpClockIdentity));
  917   announce->grandmaster_identity =
  918       msg->message_specific.announce.grandmaster_identity;
  919   announce->grandmaster_priority_1 =
  920       msg->message_specific.announce.grandmaster_priority_1;
  921   announce->grandmaster_clock_quality.clock_class =
  922       msg->message_specific.announce.grandmaster_clock_quality.clock_class;
  923   announce->grandmaster_clock_quality.clock_accuracy =
  924       msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy;
  925   announce->grandmaster_clock_quality.offset_scaled_log_variance =
  926       msg->message_specific.announce.
  927       grandmaster_clock_quality.offset_scaled_log_variance;
  928   announce->grandmaster_priority_2 =
  929       msg->message_specific.announce.grandmaster_priority_2;
  930   announce->steps_removed = msg->message_specific.announce.steps_removed;
  931   announce->time_source = msg->message_specific.announce.time_source;
  932   g_queue_push_tail (&sender->announce_messages, announce);
  933 
  934   select_best_master_clock (domain, receive_time);
  935 }
  936 
  937 static gboolean
  938 send_delay_req_timeout (PtpPendingSync * sync)
  939 {
  940   StdIOHeader header = { 0, };
  941   guint8 delay_req[44];
  942   GstByteWriter writer;
  943   GIOStatus status;
  944   gsize written;
  945   GError *err = NULL;
  946 
  947   header.type = TYPE_EVENT;
  948   header.size = 44;
  949 
  950   GST_TRACE ("Sending delay_req to domain %u", sync->domain);
  951 
  952   gst_byte_writer_init_with_data (&writer, delay_req, 44, FALSE);
  953   gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ);
  954   gst_byte_writer_put_uint8_unchecked (&writer, 2);
  955   gst_byte_writer_put_uint16_be_unchecked (&writer, 44);
  956   gst_byte_writer_put_uint8_unchecked (&writer, sync->domain);
  957   gst_byte_writer_put_uint8_unchecked (&writer, 0);
  958   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
  959   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
  960   gst_byte_writer_put_uint32_be_unchecked (&writer, 0);
  961   gst_byte_writer_put_uint64_be_unchecked (&writer,
  962       ptp_clock_id.clock_identity);
  963   gst_byte_writer_put_uint16_be_unchecked (&writer, ptp_clock_id.port_number);
  964   gst_byte_writer_put_uint16_be_unchecked (&writer, sync->delay_req_seqnum);
  965   gst_byte_writer_put_uint8_unchecked (&writer, 0x01);
  966   gst_byte_writer_put_uint8_unchecked (&writer, 0x7f);
  967   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
  968   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
  969 
  970   status =
  971       g_io_channel_write_chars (stdout_channel, (gchar *) & header,
  972       sizeof (header), &written, &err);
  973   if (status == G_IO_STATUS_ERROR) {
  974     g_warning ("Failed to write to stdout: %s", err->message);
  975     g_clear_error (&err);
  976     return G_SOURCE_REMOVE;
  977   } else if (status == G_IO_STATUS_EOF) {
  978     g_message ("EOF on stdout");
  979     g_main_loop_quit (main_loop);
  980     return G_SOURCE_REMOVE;
  981   } else if (status != G_IO_STATUS_NORMAL) {
  982     g_warning ("Unexpected stdout write status: %d", status);
  983     g_main_loop_quit (main_loop);
  984     return G_SOURCE_REMOVE;
  985   } else if (written != sizeof (header)) {
  986     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
  987     g_main_loop_quit (main_loop);
  988     return G_SOURCE_REMOVE;
  989   }
  990 
  991   sync->delay_req_send_time_local =
  992       gst_clock_get_time (observation_system_clock);
  993 
  994   status =
  995       g_io_channel_write_chars (stdout_channel,
  996       (const gchar *) delay_req, 44, &written, &err);
  997   if (status == G_IO_STATUS_ERROR) {
  998     g_warning ("Failed to write to stdout: %s", err->message);
  999     g_clear_error (&err);
 1000     g_main_loop_quit (main_loop);
 1001     return G_SOURCE_REMOVE;
 1002   } else if (status == G_IO_STATUS_EOF) {
 1003     g_message ("EOF on stdout");
 1004     g_main_loop_quit (main_loop);
 1005     return G_SOURCE_REMOVE;
 1006   } else if (status != G_IO_STATUS_NORMAL) {
 1007     g_warning ("Unexpected stdout write status: %d", status);
 1008     g_main_loop_quit (main_loop);
 1009     return G_SOURCE_REMOVE;
 1010   } else if (written != 44) {
 1011     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
 1012     g_main_loop_quit (main_loop);
 1013     return G_SOURCE_REMOVE;
 1014   }
 1015 
 1016   return G_SOURCE_REMOVE;
 1017 }
 1018 
 1019 static gboolean
 1020 send_delay_req (PtpDomainData * domain, PtpPendingSync * sync)
 1021 {
 1022   GstClockTime now = gst_clock_get_time (observation_system_clock);
 1023   guint timeout;
 1024   GSource *timeout_source;
 1025 
 1026   if (domain->last_delay_req != 0
 1027       && domain->last_delay_req + domain->min_delay_req_interval > now) {
 1028     GST_TRACE ("Too soon to send new DELAY_REQ");
 1029     return FALSE;
 1030   }
 1031 
 1032   domain->last_delay_req = now;
 1033   sync->delay_req_seqnum = domain->last_delay_req_seqnum++;
 1034 
 1035   /* IEEE 1588 9.5.11.2 */
 1036   if (domain->last_delay_req == 0 || domain->min_delay_req_interval == 0)
 1037     timeout = 0;
 1038   else
 1039     timeout =
 1040         g_rand_int_range (delay_req_rand, 0,
 1041         (domain->min_delay_req_interval * 2) / GST_MSECOND);
 1042 
 1043   sync->timeout_source = timeout_source = g_timeout_source_new (timeout);
 1044   g_source_set_priority (timeout_source, G_PRIORITY_DEFAULT);
 1045   g_source_set_callback (timeout_source, (GSourceFunc) send_delay_req_timeout,
 1046       sync, NULL);
 1047   g_source_attach (timeout_source, main_context);
 1048 
 1049   return TRUE;
 1050 }
 1051 
 1052 /* Filtering of outliers for RTT and time calculations inspired
 1053  * by the code from gstnetclientclock.c
 1054  */
 1055 static void
 1056 update_ptp_time (PtpDomainData * domain, PtpPendingSync * sync)
 1057 {
 1058   GstClockTime internal_time, external_time, rate_num, rate_den;
 1059   GstClockTime corrected_ptp_time, corrected_local_time;
 1060   gdouble r_squared = 0.0;
 1061   gboolean synced;
 1062   GstClockTimeDiff discont = 0;
 1063   GstClockTime estimated_ptp_time = GST_CLOCK_TIME_NONE;
 1064 #ifdef USE_MEASUREMENT_FILTERING
 1065   GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
 1066       orig_rate_den;
 1067   GstClockTime new_estimated_ptp_time;
 1068   GstClockTime max_discont, estimated_ptp_time_min, estimated_ptp_time_max;
 1069   gboolean now_synced;
 1070 #endif
 1071 #ifdef USE_ONLY_SYNC_WITH_DELAY
 1072   GstClockTime mean_path_delay;
 1073 #endif
 1074 
 1075   GST_TRACE ("Updating PTP time");
 1076 
 1077 #ifdef USE_ONLY_SYNC_WITH_DELAY
 1078   if (sync->delay_req_send_time_local == GST_CLOCK_TIME_NONE) {
 1079     GST_TRACE ("Not updating - no delay_req sent");
 1080     return;
 1081   }
 1082 
 1083   /* IEEE 1588 11.3 */
 1084   mean_path_delay =
 1085       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
 1086       sync->sync_recv_time_local - sync->delay_req_send_time_local -
 1087       (sync->correction_field_sync + sync->correction_field_delay +
 1088           32768) / 65536) / 2;
 1089 #endif
 1090 
 1091   /* IEEE 1588 11.2 */
 1092   corrected_ptp_time =
 1093       sync->sync_send_time_remote +
 1094       (sync->correction_field_sync + 32768) / 65536;
 1095 
 1096 #ifdef USE_ONLY_SYNC_WITH_DELAY
 1097   corrected_local_time = sync->sync_recv_time_local - mean_path_delay;
 1098 #else
 1099   corrected_local_time = sync->sync_recv_time_local - domain->mean_path_delay;
 1100 #endif
 1101 
 1102 #ifdef USE_MEASUREMENT_FILTERING
 1103   /* We check this here and when updating the mean path delay, because
 1104    * we can get here without a delay response too. The tolerance on
 1105    * accepting follow-up after a sync is high, because a PTP server
 1106    * doesn't have to prioritise sending FOLLOW_UP - its purpose is
 1107    * just to give us the accurate timestamp of the preceding SYNC */
 1108   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE
 1109       && sync->follow_up_recv_time_local >
 1110       sync->sync_recv_time_local + 20 * domain->mean_path_delay) {
 1111     GstClockTimeDiff delay =
 1112         sync->follow_up_recv_time_local - sync->sync_recv_time_local;
 1113     GST_WARNING ("Sync-follow-up delay for domain %u too big: %"
 1114         GST_STIME_FORMAT " > 20 * %" GST_TIME_FORMAT, domain->domain,
 1115         GST_STIME_ARGS (delay), GST_TIME_ARGS (domain->mean_path_delay));
 1116     synced = FALSE;
 1117     gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
 1118         &internal_time, &external_time, &rate_num, &rate_den);
 1119     goto out;
 1120   }
 1121 #endif
 1122 
 1123   /* Set an initial local-remote relation */
 1124   if (domain->last_ptp_time == 0)
 1125     gst_clock_set_calibration (domain->domain_clock, corrected_local_time,
 1126         corrected_ptp_time, 1, 1);
 1127 
 1128 #ifdef USE_MEASUREMENT_FILTERING
 1129   /* Check if the corrected PTP time is +/- 3/4 RTT around what we would
 1130    * estimate with our present knowledge about the clock
 1131    */
 1132   /* Store what the clock produced as 'now' before this update */
 1133   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
 1134       &orig_internal_time, &orig_external_time, &orig_rate_num, &orig_rate_den);
 1135   internal_time = orig_internal_time;
 1136   external_time = orig_external_time;
 1137   rate_num = orig_rate_num;
 1138   rate_den = orig_rate_den;
 1139 
 1140   /* 3/4 RTT window around the estimation */
 1141   max_discont = domain->mean_path_delay * 3 / 2;
 1142 
 1143   /* Check if the estimated sync time is inside our window */
 1144   estimated_ptp_time_min = corrected_local_time - max_discont;
 1145   estimated_ptp_time_min =
 1146       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
 1147       estimated_ptp_time_min, internal_time, external_time, rate_num, rate_den);
 1148   estimated_ptp_time_max = corrected_local_time + max_discont;
 1149   estimated_ptp_time_max =
 1150       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
 1151       estimated_ptp_time_max, internal_time, external_time, rate_num, rate_den);
 1152 
 1153   synced = (estimated_ptp_time_min < corrected_ptp_time
 1154       && corrected_ptp_time < estimated_ptp_time_max);
 1155 
 1156   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
 1157       GST_TIME_FORMAT, domain->domain,
 1158       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
 1159 
 1160   GST_DEBUG ("Synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
 1161       GST_TIME_FORMAT, synced, GST_TIME_ARGS (estimated_ptp_time_min),
 1162       GST_TIME_ARGS (corrected_ptp_time),
 1163       GST_TIME_ARGS (estimated_ptp_time_max));
 1164 
 1165   if (gst_clock_add_observation_unapplied (domain->domain_clock,
 1166           corrected_local_time, corrected_ptp_time, &r_squared,
 1167           &internal_time, &external_time, &rate_num, &rate_den)) {
 1168     GST_DEBUG ("Regression gave r_squared: %f", r_squared);
 1169 
 1170     /* Old estimated PTP time based on receive time and path delay */
 1171     estimated_ptp_time = corrected_local_time;
 1172     estimated_ptp_time =
 1173         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
 1174         (domain->domain_clock), estimated_ptp_time, orig_internal_time,
 1175         orig_external_time, orig_rate_num, orig_rate_den);
 1176 
 1177     /* New estimated PTP time based on receive time and path delay */
 1178     new_estimated_ptp_time = corrected_local_time;
 1179     new_estimated_ptp_time =
 1180         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
 1181         (domain->domain_clock), new_estimated_ptp_time, internal_time,
 1182         external_time, rate_num, rate_den);
 1183 
 1184     discont = GST_CLOCK_DIFF (estimated_ptp_time, new_estimated_ptp_time);
 1185     if (synced && ABS (discont) > max_discont) {
 1186       GstClockTimeDiff offset;
 1187       GST_DEBUG ("Too large a discont %s%" GST_TIME_FORMAT
 1188           ", clamping to 1/4 average RTT = %" GST_TIME_FORMAT,
 1189           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
 1190           GST_TIME_ARGS (max_discont));
 1191       if (discont > 0) {        /* Too large a forward step - add a -ve offset */
 1192         offset = max_discont - discont;
 1193         if (-offset > external_time)
 1194           external_time = 0;
 1195         else
 1196           external_time += offset;
 1197       } else {                  /* Too large a backward step - add a +ve offset */
 1198         offset = -(max_discont + discont);
 1199         external_time += offset;
 1200       }
 1201 
 1202       discont += offset;
 1203     } else {
 1204       GST_DEBUG ("Discont %s%" GST_TIME_FORMAT " (max: %" GST_TIME_FORMAT ")",
 1205           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
 1206           GST_TIME_ARGS (max_discont));
 1207     }
 1208 
 1209     /* Check if the estimated sync time is now (still) inside our window */
 1210     estimated_ptp_time_min = corrected_local_time - max_discont;
 1211     estimated_ptp_time_min =
 1212         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
 1213         (domain->domain_clock), estimated_ptp_time_min, internal_time,
 1214         external_time, rate_num, rate_den);
 1215     estimated_ptp_time_max = corrected_local_time + max_discont;
 1216     estimated_ptp_time_max =
 1217         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
 1218         (domain->domain_clock), estimated_ptp_time_max, internal_time,
 1219         external_time, rate_num, rate_den);
 1220 
 1221     now_synced = (estimated_ptp_time_min < corrected_ptp_time
 1222         && corrected_ptp_time < estimated_ptp_time_max);
 1223 
 1224     GST_DEBUG ("Now synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
 1225         GST_TIME_FORMAT, now_synced, GST_TIME_ARGS (estimated_ptp_time_min),
 1226         GST_TIME_ARGS (corrected_ptp_time),
 1227         GST_TIME_ARGS (estimated_ptp_time_max));
 1228 
 1229     if (synced || now_synced || domain->skipped_updates > MAX_SKIPPED_UPDATES) {
 1230       gst_clock_set_calibration (GST_CLOCK_CAST (domain->domain_clock),
 1231           internal_time, external_time, rate_num, rate_den);
 1232       domain->skipped_updates = 0;
 1233 
 1234       domain->last_ptp_time = corrected_ptp_time;
 1235       domain->last_local_time = corrected_local_time;
 1236     } else {
 1237       domain->skipped_updates++;
 1238     }
 1239   } else {
 1240     domain->last_ptp_time = corrected_ptp_time;
 1241     domain->last_local_time = corrected_local_time;
 1242   }
 1243 
 1244 #else
 1245   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
 1246       GST_TIME_FORMAT, domain->domain,
 1247       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
 1248 
 1249   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
 1250       &internal_time, &external_time, &rate_num, &rate_den);
 1251 
 1252   estimated_ptp_time = corrected_local_time;
 1253   estimated_ptp_time =
 1254       gst_clock_adjust_with_calibration (GST_CLOCK_CAST
 1255       (domain->domain_clock), estimated_ptp_time, internal_time,
 1256       external_time, rate_num, rate_den);
 1257 
 1258   gst_clock_add_observation (domain->domain_clock,
 1259       corrected_local_time, corrected_ptp_time, &r_squared);
 1260 
 1261   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
 1262       &internal_time, &external_time, &rate_num, &rate_den);
 1263 
 1264   synced = TRUE;
 1265   domain->last_ptp_time = corrected_ptp_time;
 1266   domain->last_local_time = corrected_local_time;
 1267 #endif
 1268 
 1269 #ifdef USE_MEASUREMENT_FILTERING
 1270 out:
 1271 #endif
 1272   if (g_atomic_int_get (&domain_stats_n_hooks)) {
 1273     GstStructure *stats = gst_structure_new (GST_PTP_STATISTICS_TIME_UPDATED,
 1274         "domain", G_TYPE_UINT, domain->domain,
 1275         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
 1276         "local-time", GST_TYPE_CLOCK_TIME, corrected_local_time,
 1277         "ptp-time", GST_TYPE_CLOCK_TIME, corrected_ptp_time,
 1278         "estimated-ptp-time", GST_TYPE_CLOCK_TIME, estimated_ptp_time,
 1279         "discontinuity", G_TYPE_INT64, discont,
 1280         "synced", G_TYPE_BOOLEAN, synced,
 1281         "r-squared", G_TYPE_DOUBLE, r_squared,
 1282         "internal-time", GST_TYPE_CLOCK_TIME, internal_time,
 1283         "external-time", GST_TYPE_CLOCK_TIME, external_time,
 1284         "rate-num", G_TYPE_UINT64, rate_num,
 1285         "rate-den", G_TYPE_UINT64, rate_den,
 1286         "rate", G_TYPE_DOUBLE, (gdouble) (rate_num) / rate_den,
 1287         NULL);
 1288     emit_ptp_statistics (domain->domain, stats);
 1289     gst_structure_free (stats);
 1290   }
 1291 
 1292 }
 1293 
 1294 #ifdef USE_MEDIAN_PRE_FILTERING
 1295 static gint
 1296 compare_clock_time (const GstClockTime * a, const GstClockTime * b)
 1297 {
 1298   if (*a < *b)
 1299     return -1;
 1300   else if (*a > *b)
 1301     return 1;
 1302   return 0;
 1303 }
 1304 #endif
 1305 
 1306 static gboolean
 1307 update_mean_path_delay (PtpDomainData * domain, PtpPendingSync * sync)
 1308 {
 1309 #ifdef USE_MEDIAN_PRE_FILTERING
 1310   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
 1311   GstClockTime median;
 1312   gint i;
 1313 #endif
 1314 
 1315   GstClockTime mean_path_delay, delay_req_delay = 0;
 1316   gboolean ret;
 1317 
 1318   /* IEEE 1588 11.3 */
 1319   mean_path_delay =
 1320       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
 1321       sync->sync_recv_time_local - sync->delay_req_send_time_local -
 1322       (sync->correction_field_sync + sync->correction_field_delay +
 1323           32768) / 65536) / 2;
 1324 
 1325 #ifdef USE_MEDIAN_PRE_FILTERING
 1326   for (i = 1; i < MEDIAN_PRE_FILTERING_WINDOW; i++)
 1327     domain->last_path_delays[i - 1] = domain->last_path_delays[i];
 1328   domain->last_path_delays[i - 1] = mean_path_delay;
 1329 
 1330   if (domain->last_path_delays_missing) {
 1331     domain->last_path_delays_missing--;
 1332   } else {
 1333     memcpy (&last_path_delays, &domain->last_path_delays,
 1334         sizeof (last_path_delays));
 1335     g_qsort_with_data (&last_path_delays,
 1336         MEDIAN_PRE_FILTERING_WINDOW, sizeof (GstClockTime),
 1337         (GCompareDataFunc) compare_clock_time, NULL);
 1338 
 1339     median = last_path_delays[MEDIAN_PRE_FILTERING_WINDOW / 2];
 1340 
 1341     /* FIXME: We might want to use something else here, like only allowing
 1342      * things in the interquartile range, or also filtering away delays that
 1343      * are too small compared to the median. This here worked well enough
 1344      * in tests so far.
 1345      */
 1346     if (mean_path_delay > 2 * median) {
 1347       GST_WARNING ("Path delay for domain %u too big compared to median: %"
 1348           GST_TIME_FORMAT " > 2 * %" GST_TIME_FORMAT, domain->domain,
 1349           GST_TIME_ARGS (mean_path_delay), GST_TIME_ARGS (median));
 1350       ret = FALSE;
 1351       goto out;
 1352     }
 1353   }
 1354 #endif
 1355 
 1356 #ifdef USE_RUNNING_AVERAGE_DELAY
 1357   /* Track an average round trip time, for a bit of smoothing */
 1358   /* Always update before discarding a sample, so genuine changes in
 1359    * the network get picked up, eventually */
 1360   if (domain->mean_path_delay == 0)
 1361     domain->mean_path_delay = mean_path_delay;
 1362   else if (mean_path_delay < domain->mean_path_delay)   /* Shorter RTTs carry more weight than longer */
 1363     domain->mean_path_delay =
 1364         (3 * domain->mean_path_delay + mean_path_delay) / 4;
 1365   else
 1366     domain->mean_path_delay =
 1367         (15 * domain->mean_path_delay + mean_path_delay) / 16;
 1368 #else
 1369   domain->mean_path_delay = mean_path_delay;
 1370 #endif
 1371 
 1372 #ifdef USE_MEASUREMENT_FILTERING
 1373   /* The tolerance on accepting follow-up after a sync is high, because
 1374    * a PTP server doesn't have to prioritise sending FOLLOW_UP - its purpose is
 1375    * just to give us the accurate timestamp of the preceding SYNC */
 1376   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE &&
 1377       domain->mean_path_delay != 0
 1378       && sync->follow_up_recv_time_local >
 1379       sync->sync_recv_time_local + 20 * domain->mean_path_delay) {
 1380     GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
 1381         " > 20 * %" GST_TIME_FORMAT, domain->domain,
 1382         GST_TIME_ARGS (sync->follow_up_recv_time_local -
 1383             sync->sync_recv_time_local),
 1384         GST_TIME_ARGS (domain->mean_path_delay));
 1385     ret = FALSE;
 1386     goto out;
 1387   }
 1388 
 1389   if (mean_path_delay > 2 * domain->mean_path_delay) {
 1390     GST_WARNING ("Mean path delay for domain %u too big: %" GST_TIME_FORMAT
 1391         " > 2 * %" GST_TIME_FORMAT, domain->domain,
 1392         GST_TIME_ARGS (mean_path_delay),
 1393         GST_TIME_ARGS (domain->mean_path_delay));
 1394     ret = FALSE;
 1395     goto out;
 1396   }
 1397 #endif
 1398 
 1399   delay_req_delay =
 1400       sync->delay_resp_recv_time_local - sync->delay_req_send_time_local;
 1401 
 1402 #ifdef USE_MEASUREMENT_FILTERING
 1403   /* delay_req_delay is a RTT, so 2 times the path delay is what we'd
 1404    * hope for, but some PTP systems don't prioritise sending DELAY_RESP,
 1405    * but they must still have placed an accurate reception timestamp.
 1406    * That means we should be quite tolerant about late DELAY_RESP, and
 1407    * mostly rely on filtering out jumps in the mean-path-delay elsewhere  */
 1408   if (delay_req_delay > 20 * domain->mean_path_delay) {
 1409     GST_WARNING ("Delay-request-response delay for domain %u too big: %"
 1410         GST_TIME_FORMAT " > 20 * %" GST_TIME_FORMAT, domain->domain,
 1411         GST_TIME_ARGS (delay_req_delay),
 1412         GST_TIME_ARGS (domain->mean_path_delay));
 1413     ret = FALSE;
 1414     goto out;
 1415   }
 1416 #endif
 1417 
 1418   ret = TRUE;
 1419 
 1420   GST_DEBUG ("Got mean path delay for domain %u: %" GST_TIME_FORMAT " (new: %"
 1421       GST_TIME_FORMAT ")", domain->domain,
 1422       GST_TIME_ARGS (domain->mean_path_delay), GST_TIME_ARGS (mean_path_delay));
 1423   GST_DEBUG ("Delay request delay for domain %u: %" GST_TIME_FORMAT,
 1424       domain->domain, GST_TIME_ARGS (delay_req_delay));
 1425 
 1426 #if defined(USE_MEASUREMENT_FILTERING) || defined(USE_MEDIAN_PRE_FILTERING)
 1427 out:
 1428 #endif
 1429   if (g_atomic_int_get (&domain_stats_n_hooks)) {
 1430     GstStructure *stats =
 1431         gst_structure_new (GST_PTP_STATISTICS_PATH_DELAY_MEASURED,
 1432         "domain", G_TYPE_UINT, domain->domain,
 1433         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
 1434         "mean-path-delay", GST_TYPE_CLOCK_TIME, mean_path_delay,
 1435         "delay-request-delay", GST_TYPE_CLOCK_TIME, delay_req_delay, NULL);
 1436     emit_ptp_statistics (domain->domain, stats);
 1437     gst_structure_free (stats);
 1438   }
 1439 
 1440   return ret;
 1441 }
 1442 
 1443 static void
 1444 handle_sync_message (PtpMessage * msg, GstClockTime receive_time)
 1445 {
 1446   GList *l;
 1447   PtpDomainData *domain = NULL;
 1448   PtpPendingSync *sync = NULL;
 1449 
 1450   /* Don't consider messages with the alternate master flag set */
 1451   if ((msg->flag_field & 0x0100)) {
 1452     GST_TRACE ("Ignoring sync message with alternate-master flag");
 1453     return;
 1454   }
 1455 
 1456   for (l = domain_data; l; l = l->next) {
 1457     PtpDomainData *tmp = l->data;
 1458 
 1459     if (msg->domain_number == tmp->domain) {
 1460       domain = tmp;
 1461       break;
 1462     }
 1463   }
 1464 
 1465   if (!domain) {
 1466     gchar *clock_name;
 1467 
 1468     domain = g_new0 (PtpDomainData, 1);
 1469     domain->domain = msg->domain_number;
 1470     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
 1471     domain->domain_clock =
 1472         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
 1473     gst_object_ref_sink (domain->domain_clock);
 1474     g_free (clock_name);
 1475     g_queue_init (&domain->pending_syncs);
 1476     domain->last_path_delays_missing = 9;
 1477     domain_data = g_list_prepend (domain_data, domain);
 1478 
 1479     g_mutex_lock (&domain_clocks_lock);
 1480     domain_clocks = g_list_prepend (domain_clocks, domain);
 1481     g_mutex_unlock (&domain_clocks_lock);
 1482   }
 1483 
 1484   /* If we have a master clock, ignore this message if it's not coming from there */
 1485   if (domain->have_master_clock
 1486       && compare_clock_identity (&domain->master_clock_identity,
 1487           &msg->source_port_identity) != 0)
 1488     return;
 1489 
 1490 #ifdef USE_OPPORTUNISTIC_CLOCK_SELECTION
 1491   /* Opportunistic selection of master clock */
 1492   if (!domain->have_master_clock)
 1493     memcpy (&domain->master_clock_identity, &msg->source_port_identity,
 1494         sizeof (PtpClockIdentity));
 1495 #else
 1496   if (!domain->have_master_clock)
 1497     return;
 1498 #endif
 1499 
 1500   domain->sync_interval = log2_to_clock_time (msg->log_message_interval);
 1501 
 1502   /* Check if duplicated */
 1503   for (l = domain->pending_syncs.head; l; l = l->next) {
 1504     PtpPendingSync *tmp = l->data;
 1505 
 1506     if (tmp->sync_seqnum == msg->sequence_id)
 1507       return;
 1508   }
 1509 
 1510   if (msg->message_specific.sync.origin_timestamp.seconds_field >
 1511       GST_CLOCK_TIME_NONE / GST_SECOND) {
 1512     GST_FIXME ("Unsupported sync message seconds field value: %"
 1513         G_GUINT64_FORMAT " > %" G_GUINT64_FORMAT,
 1514         msg->message_specific.sync.origin_timestamp.seconds_field,
 1515         GST_CLOCK_TIME_NONE / GST_SECOND);
 1516     return;
 1517   }
 1518 
 1519   sync = g_new0 (PtpPendingSync, 1);
 1520   sync->domain = domain->domain;
 1521   sync->sync_seqnum = msg->sequence_id;
 1522   sync->sync_recv_time_local = receive_time;
 1523   sync->sync_send_time_remote = GST_CLOCK_TIME_NONE;
 1524   sync->follow_up_recv_time_local = GST_CLOCK_TIME_NONE;
 1525   sync->delay_req_send_time_local = GST_CLOCK_TIME_NONE;
 1526   sync->delay_req_recv_time_remote = GST_CLOCK_TIME_NONE;
 1527   sync->delay_resp_recv_time_local = GST_CLOCK_TIME_NONE;
 1528 
 1529   /* 0.5 correction factor for division later */
 1530   sync->correction_field_sync = msg->correction_field;
 1531 
 1532   if ((msg->flag_field & 0x0200)) {
 1533     /* Wait for FOLLOW_UP */
 1534     GST_TRACE ("Waiting for FOLLOW_UP msg");
 1535   } else {
 1536     sync->sync_send_time_remote =
 1537         PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
 1538         sync.origin_timestamp);
 1539 
 1540     if (domain->last_ptp_sync_time != 0
 1541         && domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
 1542       GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
 1543           GST_TIME_FORMAT, domain->domain,
 1544           GST_TIME_ARGS (domain->last_ptp_sync_time),
 1545           GST_TIME_ARGS (sync->sync_send_time_remote));
 1546       ptp_pending_sync_free (sync);
 1547       sync = NULL;
 1548       return;
 1549     }
 1550     domain->last_ptp_sync_time = sync->sync_send_time_remote;
 1551 
 1552     if (send_delay_req (domain, sync)) {
 1553       /* Sent delay request */
 1554     } else {
 1555       update_ptp_time (domain, sync);
 1556       ptp_pending_sync_free (sync);
 1557       sync = NULL;
 1558     }
 1559   }
 1560 
 1561   if (sync)
 1562     g_queue_push_tail (&domain->pending_syncs, sync);
 1563 }
 1564 
 1565 static void
 1566 handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time)
 1567 {
 1568   GList *l;
 1569   PtpDomainData *domain = NULL;
 1570   PtpPendingSync *sync = NULL;
 1571 
 1572   GST_TRACE ("Processing FOLLOW_UP message");
 1573 
 1574   /* Don't consider messages with the alternate master flag set */
 1575   if ((msg->flag_field & 0x0100)) {
 1576     GST_TRACE ("Ignoring FOLLOW_UP with alternate-master flag");
 1577     return;
 1578   }
 1579 
 1580   for (l = domain_data; l; l = l->next) {
 1581     PtpDomainData *tmp = l->data;
 1582 
 1583     if (msg->domain_number == tmp->domain) {
 1584       domain = tmp;
 1585       break;
 1586     }
 1587   }
 1588 
 1589   if (!domain) {
 1590     GST_TRACE ("No domain match for FOLLOW_UP msg");
 1591     return;
 1592   }
 1593 
 1594   /* If we have a master clock, ignore this message if it's not coming from there */
 1595   if (domain->have_master_clock
 1596       && compare_clock_identity (&domain->master_clock_identity,
 1597           &msg->source_port_identity) != 0) {
 1598     GST_TRACE ("FOLLOW_UP msg not from current clock master. Ignoring");
 1599     return;
 1600   }
 1601 
 1602   /* Check if we know about this one */
 1603   for (l = domain->pending_syncs.head; l; l = l->next) {
 1604     PtpPendingSync *tmp = l->data;
 1605 
 1606     if (tmp->sync_seqnum == msg->sequence_id) {
 1607       sync = tmp;
 1608       break;
 1609     }
 1610   }
 1611 
 1612   if (!sync) {
 1613     GST_TRACE ("Ignoring FOLLOW_UP with no pending SYNC");
 1614     return;
 1615   }
 1616 
 1617   /* Got a FOLLOW_UP for this already */
 1618   if (sync->sync_send_time_remote != GST_CLOCK_TIME_NONE) {
 1619     GST_TRACE ("Got repeat FOLLOW_UP. Ignoring");
 1620     return;
 1621   }
 1622 
 1623   if (sync->sync_recv_time_local >= receive_time) {
 1624     GST_ERROR ("Got bogus follow up in domain %u: %" GST_TIME_FORMAT " > %"
 1625         GST_TIME_FORMAT, domain->domain,
 1626         GST_TIME_ARGS (sync->sync_recv_time_local),
 1627         GST_TIME_ARGS (receive_time));
 1628     g_queue_remove (&domain->pending_syncs, sync);
 1629     ptp_pending_sync_free (sync);
 1630     return;
 1631   }
 1632 
 1633   sync->correction_field_sync += msg->correction_field;
 1634   sync->sync_send_time_remote =
 1635       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
 1636       follow_up.precise_origin_timestamp);
 1637   sync->follow_up_recv_time_local = receive_time;
 1638 
 1639   if (domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
 1640     GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
 1641         GST_TIME_FORMAT, domain->domain,
 1642         GST_TIME_ARGS (domain->last_ptp_sync_time),
 1643         GST_TIME_ARGS (sync->sync_send_time_remote));
 1644     g_queue_remove (&domain->pending_syncs, sync);
 1645     ptp_pending_sync_free (sync);
 1646     sync = NULL;
 1647     return;
 1648   }
 1649   domain->last_ptp_sync_time = sync->sync_send_time_remote;
 1650 
 1651   if (send_delay_req (domain, sync)) {
 1652     /* Sent delay request */
 1653   } else {
 1654     update_ptp_time (domain, sync);
 1655     g_queue_remove (&domain->pending_syncs, sync);
 1656     ptp_pending_sync_free (sync);
 1657     sync = NULL;
 1658   }
 1659 }
 1660 
 1661 static void
 1662 handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time)
 1663 {
 1664   GList *l;
 1665   PtpDomainData *domain = NULL;
 1666   PtpPendingSync *sync = NULL;
 1667 
 1668   /* Don't consider messages with the alternate master flag set */
 1669   if ((msg->flag_field & 0x0100))
 1670     return;
 1671 
 1672   for (l = domain_data; l; l = l->next) {
 1673     PtpDomainData *tmp = l->data;
 1674 
 1675     if (msg->domain_number == tmp->domain) {
 1676       domain = tmp;
 1677       break;
 1678     }
 1679   }
 1680 
 1681   if (!domain)
 1682     return;
 1683 
 1684   /* If we have a master clock, ignore this message if it's not coming from there */
 1685   if (domain->have_master_clock
 1686       && compare_clock_identity (&domain->master_clock_identity,
 1687           &msg->source_port_identity) != 0)
 1688     return;
 1689 
 1690   /* Not for us */
 1691   if (msg->message_specific.delay_resp.
 1692       requesting_port_identity.clock_identity != ptp_clock_id.clock_identity
 1693       || msg->message_specific.delay_resp.
 1694       requesting_port_identity.port_number != ptp_clock_id.port_number)
 1695     return;
 1696 
 1697   domain->min_delay_req_interval =
 1698       log2_to_clock_time (msg->log_message_interval);
 1699 
 1700   /* Check if we know about this one */
 1701   for (l = domain->pending_syncs.head; l; l = l->next) {
 1702     PtpPendingSync *tmp = l->data;
 1703 
 1704     if (tmp->delay_req_seqnum == msg->sequence_id) {
 1705       sync = tmp;
 1706       break;
 1707     }
 1708   }
 1709 
 1710   if (!sync)
 1711     return;
 1712 
 1713   /* Got a DELAY_RESP for this already */
 1714   if (sync->delay_req_recv_time_remote != GST_CLOCK_TIME_NONE)
 1715     return;
 1716 
 1717   if (sync->delay_req_send_time_local > receive_time) {
 1718     GST_ERROR ("Got bogus delay response in domain %u: %" GST_TIME_FORMAT " > %"
 1719         GST_TIME_FORMAT, domain->domain,
 1720         GST_TIME_ARGS (sync->delay_req_send_time_local),
 1721         GST_TIME_ARGS (receive_time));
 1722     g_queue_remove (&domain->pending_syncs, sync);
 1723     ptp_pending_sync_free (sync);
 1724     return;
 1725   }
 1726 
 1727   sync->correction_field_delay = msg->correction_field;
 1728 
 1729   sync->delay_req_recv_time_remote =
 1730       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
 1731       delay_resp.receive_timestamp);
 1732   sync->delay_resp_recv_time_local = receive_time;
 1733 
 1734   if (domain->mean_path_delay != 0
 1735       && sync->sync_send_time_remote > sync->delay_req_recv_time_remote) {
 1736     GST_WARNING ("Sync send time after delay req receive time for domain %u: %"
 1737         GST_TIME_FORMAT " > %" GST_TIME_FORMAT, domain->domain,
 1738         GST_TIME_ARGS (sync->sync_send_time_remote),
 1739         GST_TIME_ARGS (sync->delay_req_recv_time_remote));
 1740     g_queue_remove (&domain->pending_syncs, sync);
 1741     ptp_pending_sync_free (sync);
 1742     return;
 1743   }
 1744 
 1745   if (update_mean_path_delay (domain, sync))
 1746     update_ptp_time (domain, sync);
 1747   g_queue_remove (&domain->pending_syncs, sync);
 1748   ptp_pending_sync_free (sync);
 1749 }
 1750 
 1751 static void
 1752 handle_ptp_message (PtpMessage * msg, GstClockTime receive_time)
 1753 {
 1754   /* Ignore our own messages */
 1755   if (msg->source_port_identity.clock_identity == ptp_clock_id.clock_identity &&
 1756       msg->source_port_identity.port_number == ptp_clock_id.port_number) {
 1757     GST_TRACE ("Ignoring our own message");
 1758     return;
 1759   }
 1760 
 1761   GST_TRACE ("Message type %d receive_time %" GST_TIME_FORMAT,
 1762       msg->message_type, GST_TIME_ARGS (receive_time));
 1763   switch (msg->message_type) {
 1764     case PTP_MESSAGE_TYPE_ANNOUNCE:
 1765       handle_announce_message (msg, receive_time);
 1766       break;
 1767     case PTP_MESSAGE_TYPE_SYNC:
 1768       handle_sync_message (msg, receive_time);
 1769       break;
 1770     case PTP_MESSAGE_TYPE_FOLLOW_UP:
 1771       handle_follow_up_message (msg, receive_time);
 1772       break;
 1773     case PTP_MESSAGE_TYPE_DELAY_RESP:
 1774       handle_delay_resp_message (msg, receive_time);
 1775       break;
 1776     default:
 1777       break;
 1778   }
 1779 }
 1780 
 1781 static gboolean
 1782 have_stdin_data_cb (GIOChannel * channel, GIOCondition condition,
 1783     gpointer user_data)
 1784 {
 1785   GIOStatus status;
 1786   StdIOHeader header;
 1787   gchar buffer[8192];
 1788   GError *err = NULL;
 1789   gsize read;
 1790 
 1791   if ((condition & G_IO_STATUS_EOF)) {
 1792     GST_ERROR ("Got EOF on stdin");
 1793     g_main_loop_quit (main_loop);
 1794     return G_SOURCE_REMOVE;
 1795   }
 1796 
 1797   status =
 1798       g_io_channel_read_chars (channel, (gchar *) & header, sizeof (header),
 1799       &read, &err);
 1800   if (status == G_IO_STATUS_ERROR) {
 1801     GST_ERROR ("Failed to read from stdin: %s", err->message);
 1802     g_clear_error (&err);
 1803     g_main_loop_quit (main_loop);
 1804     return G_SOURCE_REMOVE;
 1805   } else if (status == G_IO_STATUS_EOF) {
 1806     GST_ERROR ("Got EOF on stdin");
 1807     g_main_loop_quit (main_loop);
 1808     return G_SOURCE_REMOVE;
 1809   } else if (status != G_IO_STATUS_NORMAL) {
 1810     GST_ERROR ("Unexpected stdin read status: %d", status);
 1811     g_main_loop_quit (main_loop);
 1812     return G_SOURCE_REMOVE;
 1813   } else if (read != sizeof (header)) {
 1814     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
 1815     g_main_loop_quit (main_loop);
 1816     return G_SOURCE_REMOVE;
 1817   } else if (header.size > 8192) {
 1818     GST_ERROR ("Unexpected size: %u", header.size);
 1819     g_main_loop_quit (main_loop);
 1820     return G_SOURCE_REMOVE;
 1821   }
 1822 
 1823   status = g_io_channel_read_chars (channel, buffer, header.size, &read, &err);
 1824   if (status == G_IO_STATUS_ERROR) {
 1825     GST_ERROR ("Failed to read from stdin: %s", err->message);
 1826     g_clear_error (&err);
 1827     g_main_loop_quit (main_loop);
 1828     return G_SOURCE_REMOVE;
 1829   } else if (status == G_IO_STATUS_EOF) {
 1830     GST_ERROR ("EOF on stdin");
 1831     g_main_loop_quit (main_loop);
 1832     return G_SOURCE_REMOVE;
 1833   } else if (status != G_IO_STATUS_NORMAL) {
 1834     GST_ERROR ("Unexpected stdin read status: %d", status);
 1835     g_main_loop_quit (main_loop);
 1836     return G_SOURCE_REMOVE;
 1837   } else if (read != header.size) {
 1838     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
 1839     g_main_loop_quit (main_loop);
 1840     return G_SOURCE_REMOVE;
 1841   }
 1842 
 1843   switch (header.type) {
 1844     case TYPE_EVENT:
 1845     case TYPE_GENERAL:{
 1846       GstClockTime receive_time = gst_clock_get_time (observation_system_clock);
 1847       PtpMessage msg;
 1848 
 1849       if (parse_ptp_message (&msg, (const guint8 *) buffer, header.size)) {
 1850         dump_ptp_message (&msg);
 1851         handle_ptp_message (&msg, receive_time);
 1852       }
 1853       break;
 1854     }
 1855     default:
 1856     case TYPE_CLOCK_ID:{
 1857       if (header.size != 8) {
 1858         GST_ERROR ("Unexpected clock id size (%u != 8)", header.size);
 1859         g_main_loop_quit (main_loop);
 1860         return G_SOURCE_REMOVE;
 1861       }
 1862       g_mutex_lock (&ptp_lock);
 1863       ptp_clock_id.clock_identity = GST_READ_UINT64_BE (buffer);
 1864       ptp_clock_id.port_number = getpid ();
 1865       GST_DEBUG ("Got clock id 0x%016" G_GINT64_MODIFIER "x %u",
 1866           ptp_clock_id.clock_identity, ptp_clock_id.port_number);
 1867       g_cond_signal (&ptp_cond);
 1868       g_mutex_unlock (&ptp_lock);
 1869       break;
 1870     }
 1871   }
 1872 
 1873   return G_SOURCE_CONTINUE;
 1874 }
 1875 
 1876 /* Cleanup all announce messages and announce message senders
 1877  * that are timed out by now, and clean up all pending syncs
 1878  * that are missing their FOLLOW_UP or DELAY_RESP */
 1879 static gboolean
 1880 cleanup_cb (gpointer data)
 1881 {
 1882   GstClockTime now = gst_clock_get_time (observation_system_clock);
 1883   GList *l, *m, *n;
 1884 
 1885   for (l = domain_data; l; l = l->next) {
 1886     PtpDomainData *domain = l->data;
 1887 
 1888     for (n = domain->announce_senders; n;) {
 1889       PtpAnnounceSender *sender = n->data;
 1890       gboolean timed_out = TRUE;
 1891 
 1892       /* Keep only 5 messages per sender around */
 1893       while (g_queue_get_length (&sender->announce_messages) > 5) {
 1894         PtpAnnounceMessage *msg = g_queue_pop_head (&sender->announce_messages);
 1895         g_free (msg);
 1896       }
 1897 
 1898       for (m = sender->announce_messages.head; m; m = m->next) {
 1899         PtpAnnounceMessage *msg = m->data;
 1900 
 1901         if (msg->receive_time +
 1902             sender->announce_interval * PTP_ANNOUNCE_RECEIPT_TIMEOUT > now) {
 1903           timed_out = FALSE;
 1904           break;
 1905         }
 1906       }
 1907 
 1908       if (timed_out) {
 1909         GST_DEBUG ("Announce sender 0x%016" G_GINT64_MODIFIER "x %u timed out",
 1910             sender->master_clock_identity.clock_identity,
 1911             sender->master_clock_identity.port_number);
 1912         g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
 1913         g_queue_clear (&sender->announce_messages);
 1914       }
 1915 
 1916       if (g_queue_get_length (&sender->announce_messages) == 0) {
 1917         GList *tmp = n->next;
 1918 
 1919         if (compare_clock_identity (&sender->master_clock_identity,
 1920                 &domain->master_clock_identity) == 0)
 1921           GST_WARNING ("currently selected master clock timed out");
 1922         g_free (sender);
 1923         domain->announce_senders =
 1924             g_list_delete_link (domain->announce_senders, n);
 1925         n = tmp;
 1926       } else {
 1927         n = n->next;
 1928       }
 1929     }
 1930     select_best_master_clock (domain, now);
 1931 
 1932     /* Clean up any pending syncs */
 1933     for (n = domain->pending_syncs.head; n;) {
 1934       PtpPendingSync *sync = n->data;
 1935       gboolean timed_out = FALSE;
 1936 
 1937       /* Time out pending syncs after 4 sync intervals or 10 seconds,
 1938        * and pending delay reqs after 4 delay req intervals or 10 seconds
 1939        */
 1940       if (sync->delay_req_send_time_local != GST_CLOCK_TIME_NONE &&
 1941           ((domain->min_delay_req_interval != 0
 1942                   && sync->delay_req_send_time_local +
 1943                   4 * domain->min_delay_req_interval < now)
 1944               || (sync->delay_req_send_time_local + 10 * GST_SECOND < now))) {
 1945         timed_out = TRUE;
 1946       } else if ((domain->sync_interval != 0
 1947               && sync->sync_recv_time_local + 4 * domain->sync_interval < now)
 1948           || (sync->sync_recv_time_local + 10 * GST_SECOND < now)) {
 1949         timed_out = TRUE;
 1950       }
 1951 
 1952       if (timed_out) {
 1953         GList *tmp = n->next;
 1954         ptp_pending_sync_free (sync);
 1955         g_queue_delete_link (&domain->pending_syncs, n);
 1956         n = tmp;
 1957       } else {
 1958         n = n->next;
 1959       }
 1960     }
 1961   }
 1962 
 1963   return G_SOURCE_CONTINUE;
 1964 }
 1965 
 1966 static gpointer
 1967 ptp_helper_main (gpointer data)
 1968 {
 1969   GSource *cleanup_source;
 1970 
 1971   GST_DEBUG ("Starting PTP helper loop");
 1972 
 1973   /* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */
 1974   cleanup_source = g_timeout_source_new_seconds (5);
 1975   g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT);
 1976   g_source_set_callback (cleanup_source, (GSourceFunc) cleanup_cb, NULL, NULL);
 1977   g_source_attach (cleanup_source, main_context);
 1978   g_source_unref (cleanup_source);
 1979 
 1980   g_main_loop_run (main_loop);
 1981   GST_DEBUG ("Stopped PTP helper loop");
 1982 
 1983   g_mutex_lock (&ptp_lock);
 1984   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
 1985   ptp_clock_id.port_number = 0;
 1986   initted = FALSE;
 1987   g_cond_signal (&ptp_cond);
 1988   g_mutex_unlock (&ptp_lock);
 1989 
 1990   return NULL;
 1991 }
 1992 
 1993 /**
 1994  * gst_ptp_is_supported:
 1995  *
 1996  * Check if PTP clocks are generally supported on this system, and if previous
 1997  * initializations did not fail.
 1998  *
 1999  * Returns: %TRUE if PTP clocks are generally supported on this system, and
 2000  * previous initializations did not fail.
 2001  *
 2002  * Since: 1.6
 2003  */
 2004 gboolean
 2005 gst_ptp_is_supported (void)
 2006 {
 2007   return supported;
 2008 }
 2009 
 2010 /**
 2011  * gst_ptp_is_initialized:
 2012  *
 2013  * Check if the GStreamer PTP clock subsystem is initialized.
 2014  *
 2015  * Returns: %TRUE if the GStreamer PTP clock subsystem is initialized.
 2016  *
 2017  * Since: 1.6
 2018  */
 2019 gboolean
 2020 gst_ptp_is_initialized (void)
 2021 {
 2022   return initted;
 2023 }
 2024 
 2025 /**
 2026  * gst_ptp_init:
 2027  * @clock_id: PTP clock id of this process' clock or %GST_PTP_CLOCK_ID_NONE
 2028  * @interfaces: (transfer none) (array zero-terminated=1) (allow-none): network interfaces to run the clock on
 2029  *
 2030  * Initialize the GStreamer PTP subsystem and create a PTP ordinary clock in
 2031  * slave-only mode for all domains on the given @interfaces with the
 2032  * given @clock_id.
 2033  *
 2034  * If @clock_id is %GST_PTP_CLOCK_ID_NONE, a clock id is automatically
 2035  * generated from the MAC address of the first network interface.
 2036  *
 2037  * This function is automatically called by gst_ptp_clock_new() with default
 2038  * parameters if it wasn't called before.
 2039  *
 2040  * Returns: %TRUE if the GStreamer PTP clock subsystem could be initialized.
 2041  *
 2042  * Since: 1.6
 2043  */
 2044 gboolean
 2045 gst_ptp_init (guint64 clock_id, gchar ** interfaces)
 2046 {
 2047   gboolean ret;
 2048   const gchar *env;
 2049   gchar **argv = NULL;
 2050   gint argc, argc_c;
 2051   gint fd_r, fd_w;
 2052   GError *err = NULL;
 2053   GSource *stdin_source;
 2054 
 2055   GST_DEBUG_CATEGORY_INIT (ptp_debug, "ptp", 0, "PTP clock");
 2056 
 2057   g_mutex_lock (&ptp_lock);
 2058   if (!supported) {
 2059     GST_ERROR ("PTP not supported");
 2060     ret = FALSE;
 2061     goto done;
 2062   }
 2063 
 2064   if (initted) {
 2065     GST_DEBUG ("PTP already initialized");
 2066     ret = TRUE;
 2067     goto done;
 2068   }
 2069 
 2070   if (ptp_helper_pid) {
 2071     GST_DEBUG ("PTP currently initializing");
 2072     goto wait;
 2073   }
 2074 
 2075   if (!domain_stats_hooks_initted) {
 2076     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
 2077     domain_stats_hooks_initted = TRUE;
 2078   }
 2079 
 2080   argc = 1;
 2081   if (clock_id != GST_PTP_CLOCK_ID_NONE)
 2082     argc += 2;
 2083   if (interfaces != NULL)
 2084     argc += 2 * g_strv_length (interfaces);
 2085 
 2086   argv = g_new0 (gchar *, argc + 2);
 2087   argc_c = 0;
 2088 
 2089   env = g_getenv ("GST_PTP_HELPER_1_0");
 2090   if (env == NULL)
 2091     env = g_getenv ("GST_PTP_HELPER");
 2092   if (env != NULL && *env != '\0') {
 2093     GST_LOG ("Trying GST_PTP_HELPER env var: %s", env);
 2094     argv[argc_c++] = g_strdup (env);
 2095   } else {
 2096     argv[argc_c++] = g_strdup (GST_PTP_HELPER_INSTALLED);
 2097   }
 2098 
 2099   if (clock_id != GST_PTP_CLOCK_ID_NONE) {
 2100     argv[argc_c++] = g_strdup ("-c");
 2101     argv[argc_c++] = g_strdup_printf ("0x%016" G_GINT64_MODIFIER "x", clock_id);
 2102   }
 2103 
 2104   if (interfaces != NULL) {
 2105     gchar **ptr = interfaces;
 2106 
 2107     while (*ptr) {
 2108       argv[argc_c++] = g_strdup ("-i");
 2109       argv[argc_c++] = g_strdup (*ptr);
 2110       ptr++;
 2111     }
 2112   }
 2113 
 2114   main_context = g_main_context_new ();
 2115   main_loop = g_main_loop_new (main_context, FALSE);
 2116 
 2117   ptp_helper_thread =
 2118       g_thread_try_new ("ptp-helper-thread", ptp_helper_main, NULL, &err);
 2119   if (!ptp_helper_thread) {
 2120     GST_ERROR ("Failed to start PTP helper thread: %s", err->message);
 2121     g_clear_error (&err);
 2122     ret = FALSE;
 2123     goto done;
 2124   }
 2125 
 2126   if (!g_spawn_async_with_pipes (NULL, argv, NULL, 0, NULL, NULL,
 2127           &ptp_helper_pid, &fd_w, &fd_r, NULL, &err)) {
 2128     GST_ERROR ("Failed to start ptp helper process: %s", err->message);
 2129     g_clear_error (&err);
 2130     ret = FALSE;
 2131     supported = FALSE;
 2132     goto done;
 2133   }
 2134 
 2135   stdin_channel = g_io_channel_unix_new (fd_r);
 2136   g_io_channel_set_encoding (stdin_channel, NULL, NULL);
 2137   g_io_channel_set_buffered (stdin_channel, FALSE);
 2138   g_io_channel_set_close_on_unref (stdin_channel, TRUE);
 2139   stdin_source =
 2140       g_io_create_watch (stdin_channel, G_IO_IN | G_IO_PRI | G_IO_HUP);
 2141   g_source_set_priority (stdin_source, G_PRIORITY_DEFAULT);
 2142   g_source_set_callback (stdin_source, (GSourceFunc) have_stdin_data_cb, NULL,
 2143       NULL);
 2144   g_source_attach (stdin_source, main_context);
 2145   g_source_unref (stdin_source);
 2146 
 2147   /* Create stdout channel */
 2148   stdout_channel = g_io_channel_unix_new (fd_w);
 2149   g_io_channel_set_encoding (stdout_channel, NULL, NULL);
 2150   g_io_channel_set_close_on_unref (stdout_channel, TRUE);
 2151   g_io_channel_set_buffered (stdout_channel, FALSE);
 2152 
 2153   delay_req_rand = g_rand_new ();
 2154   observation_system_clock =
 2155       g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", "ptp-observation-clock",
 2156       NULL);
 2157   gst_object_ref_sink (observation_system_clock);
 2158 
 2159   initted = TRUE;
 2160 
 2161 wait:
 2162   GST_DEBUG ("Waiting for PTP to be initialized");
 2163 
 2164   while (ptp_clock_id.clock_identity == GST_PTP_CLOCK_ID_NONE && initted)
 2165     g_cond_wait (&ptp_cond, &ptp_lock);
 2166 
 2167   ret = initted;
 2168   if (ret) {
 2169     GST_DEBUG ("Initialized and got clock id 0x%016" G_GINT64_MODIFIER "x %u",
 2170         ptp_clock_id.clock_identity, ptp_clock_id.port_number);
 2171   } else {
 2172     GST_ERROR ("Failed to initialize");
 2173     supported = FALSE;
 2174   }
 2175 
 2176 done:
 2177   g_strfreev (argv);
 2178 
 2179   if (!ret) {
 2180     if (ptp_helper_pid) {
 2181 #ifndef G_OS_WIN32
 2182       kill (ptp_helper_pid, SIGKILL);
 2183       waitpid (ptp_helper_pid, NULL, 0);
 2184 #else
 2185       TerminateProcess (ptp_helper_pid, 1);
 2186       WaitForSingleObject (ptp_helper_pid, INFINITE);
 2187 #endif
 2188       g_spawn_close_pid (ptp_helper_pid);
 2189     }
 2190     ptp_helper_pid = 0;
 2191 
 2192     if (stdin_channel)
 2193       g_io_channel_unref (stdin_channel);
 2194     stdin_channel = NULL;
 2195     if (stdout_channel)
 2196       g_io_channel_unref (stdout_channel);
 2197     stdout_channel = NULL;
 2198 
 2199     if (main_loop && ptp_helper_thread) {
 2200       g_main_loop_quit (main_loop);
 2201       g_thread_join (ptp_helper_thread);
 2202     }
 2203     ptp_helper_thread = NULL;
 2204     if (main_loop)
 2205       g_main_loop_unref (main_loop);
 2206     main_loop = NULL;
 2207     if (main_context)
 2208       g_main_context_unref (main_context);
 2209     main_context = NULL;
 2210 
 2211     if (delay_req_rand)
 2212       g_rand_free (delay_req_rand);
 2213     delay_req_rand = NULL;
 2214 
 2215     if (observation_system_clock)
 2216       gst_object_unref (observation_system_clock);
 2217     observation_system_clock = NULL;
 2218   }
 2219 
 2220   g_mutex_unlock (&ptp_lock);
 2221 
 2222   return ret;
 2223 }
 2224 
 2225 /**
 2226  * gst_ptp_deinit:
 2227  *
 2228  * Deinitialize the GStreamer PTP subsystem and stop the PTP clock. If there
 2229  * are any remaining GstPtpClock instances, they won't be further synchronized
 2230  * to the PTP network clock.
 2231  *
 2232  * Since: 1.6
 2233  */
 2234 void
 2235 gst_ptp_deinit (void)
 2236 {
 2237   GList *l, *m;
 2238 
 2239   g_mutex_lock (&ptp_lock);
 2240 
 2241   if (ptp_helper_pid) {
 2242 #ifndef G_OS_WIN32
 2243     kill (ptp_helper_pid, SIGKILL);
 2244     waitpid (ptp_helper_pid, NULL, 0);
 2245 #else
 2246     TerminateProcess (ptp_helper_pid, 1);
 2247     WaitForSingleObject (ptp_helper_pid, INFINITE);
 2248 #endif
 2249     g_spawn_close_pid (ptp_helper_pid);
 2250   }
 2251   ptp_helper_pid = 0;
 2252 
 2253   if (stdin_channel)
 2254     g_io_channel_unref (stdin_channel);
 2255   stdin_channel = NULL;
 2256   if (stdout_channel)
 2257     g_io_channel_unref (stdout_channel);
 2258   stdout_channel = NULL;
 2259 
 2260   if (main_loop && ptp_helper_thread) {
 2261     GThread *tmp = ptp_helper_thread;
 2262     ptp_helper_thread = NULL;
 2263     g_mutex_unlock (&ptp_lock);
 2264     g_main_loop_quit (main_loop);
 2265     g_thread_join (tmp);
 2266     g_mutex_lock (&ptp_lock);
 2267   }
 2268   if (main_loop)
 2269     g_main_loop_unref (main_loop);
 2270   main_loop = NULL;
 2271   if (main_context)
 2272     g_main_context_unref (main_context);
 2273   main_context = NULL;
 2274 
 2275   if (delay_req_rand)
 2276     g_rand_free (delay_req_rand);
 2277   delay_req_rand = NULL;
 2278   if (observation_system_clock)
 2279     gst_object_unref (observation_system_clock);
 2280   observation_system_clock = NULL;
 2281 
 2282   for (l = domain_data; l; l = l->next) {
 2283     PtpDomainData *domain = l->data;
 2284 
 2285     for (m = domain->announce_senders; m; m = m->next) {
 2286       PtpAnnounceSender *sender = m->data;
 2287 
 2288       g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
 2289       g_queue_clear (&sender->announce_messages);
 2290       g_free (sender);
 2291     }
 2292     g_list_free (domain->announce_senders);
 2293 
 2294     g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
 2295         NULL);
 2296     g_queue_clear (&domain->pending_syncs);
 2297     gst_object_unref (domain->domain_clock);
 2298     g_free (domain);
 2299   }
 2300   g_list_free (domain_data);
 2301   domain_data = NULL;
 2302   g_list_foreach (domain_clocks, (GFunc) g_free, NULL);
 2303   g_list_free (domain_clocks);
 2304   domain_clocks = NULL;
 2305 
 2306   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
 2307   ptp_clock_id.port_number = 0;
 2308 
 2309   initted = FALSE;
 2310 
 2311   g_mutex_unlock (&ptp_lock);
 2312 }
 2313 
 2314 #define DEFAULT_DOMAIN 0
 2315 
 2316 enum
 2317 {
 2318   PROP_0,
 2319   PROP_DOMAIN,
 2320   PROP_INTERNAL_CLOCK,
 2321   PROP_MASTER_CLOCK_ID,
 2322   PROP_GRANDMASTER_CLOCK_ID
 2323 };
 2324 
 2325 struct _GstPtpClockPrivate
 2326 {
 2327   guint domain;
 2328   GstClock *domain_clock;
 2329   gulong domain_stats_id;
 2330 };
 2331 
 2332 #define gst_ptp_clock_parent_class parent_class
 2333 G_DEFINE_TYPE_WITH_PRIVATE (GstPtpClock, gst_ptp_clock, GST_TYPE_SYSTEM_CLOCK);
 2334 
 2335 static void gst_ptp_clock_set_property (GObject * object, guint prop_id,
 2336     const GValue * value, GParamSpec * pspec);
 2337 static void gst_ptp_clock_get_property (GObject * object, guint prop_id,
 2338     GValue * value, GParamSpec * pspec);
 2339 static void gst_ptp_clock_finalize (GObject * object);
 2340 
 2341 static GstClockTime gst_ptp_clock_get_internal_time (GstClock * clock);
 2342 
 2343 static void
 2344 gst_ptp_clock_class_init (GstPtpClockClass * klass)
 2345 {
 2346   GObjectClass *gobject_class;
 2347   GstClockClass *clock_class;
 2348 
 2349   gobject_class = G_OBJECT_CLASS (klass);
 2350   clock_class = GST_CLOCK_CLASS (klass);
 2351 
 2352   gobject_class->finalize = gst_ptp_clock_finalize;
 2353   gobject_class->get_property = gst_ptp_clock_get_property;
 2354   gobject_class->set_property = gst_ptp_clock_set_property;
 2355 
 2356   g_object_class_install_property (gobject_class, PROP_DOMAIN,
 2357       g_param_spec_uint ("domain", "Domain",
 2358           "The PTP domain", 0, G_MAXUINT8,
 2359           DEFAULT_DOMAIN,
 2360           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
 2361 
 2362   g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
 2363       g_param_spec_object ("internal-clock", "Internal Clock",
 2364           "Internal clock", GST_TYPE_CLOCK,
 2365           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 2366 
 2367   g_object_class_install_property (gobject_class, PROP_MASTER_CLOCK_ID,
 2368       g_param_spec_uint64 ("master-clock-id", "Master Clock ID",
 2369           "Master Clock ID", 0, G_MAXUINT64, 0,
 2370           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 2371 
 2372   g_object_class_install_property (gobject_class, PROP_GRANDMASTER_CLOCK_ID,
 2373       g_param_spec_uint64 ("grandmaster-clock-id", "Grand Master Clock ID",
 2374           "Grand Master Clock ID", 0, G_MAXUINT64, 0,
 2375           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 2376 
 2377   clock_class->get_internal_time = gst_ptp_clock_get_internal_time;
 2378 }
 2379 
 2380 static void
 2381 gst_ptp_clock_init (GstPtpClock * self)
 2382 {
 2383   GstPtpClockPrivate *priv;
 2384 
 2385   self->priv = priv = gst_ptp_clock_get_instance_private (self);
 2386 
 2387   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
 2388   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
 2389 
 2390   priv->domain = DEFAULT_DOMAIN;
 2391 }
 2392 
 2393 static gboolean
 2394 gst_ptp_clock_ensure_domain_clock (GstPtpClock * self)
 2395 {
 2396   gboolean got_clock = TRUE;
 2397 
 2398   if (G_UNLIKELY (!self->priv->domain_clock)) {
 2399     g_mutex_lock (&domain_clocks_lock);
 2400     if (!self->priv->domain_clock) {
 2401       GList *l;
 2402 
 2403       got_clock = FALSE;
 2404 
 2405       for (l = domain_clocks; l; l = l->next) {
 2406         PtpDomainData *clock_data = l->data;
 2407 
 2408         if (clock_data->domain == self->priv->domain &&
 2409             clock_data->have_master_clock && clock_data->last_ptp_time != 0) {
 2410           GST_DEBUG ("Switching domain clock on domain %d", clock_data->domain);
 2411           self->priv->domain_clock = clock_data->domain_clock;
 2412           got_clock = TRUE;
 2413           break;
 2414         }
 2415       }
 2416     }
 2417     g_mutex_unlock (&domain_clocks_lock);
 2418     if (got_clock) {
 2419       g_object_notify (G_OBJECT (self), "internal-clock");
 2420       gst_clock_set_synced (GST_CLOCK (self), TRUE);
 2421     }
 2422   }
 2423 
 2424   return got_clock;
 2425 }
 2426 
 2427 static gboolean
 2428 gst_ptp_clock_stats_callback (guint8 domain, const GstStructure * stats,
 2429     gpointer user_data)
 2430 {
 2431   GstPtpClock *self = user_data;
 2432 
 2433   if (domain != self->priv->domain
 2434       || !gst_structure_has_name (stats, GST_PTP_STATISTICS_TIME_UPDATED))
 2435     return TRUE;
 2436 
 2437   /* Let's set our internal clock */
 2438   if (!gst_ptp_clock_ensure_domain_clock (self))
 2439     return TRUE;
 2440 
 2441   self->priv->domain_stats_id = 0;
 2442 
 2443   return FALSE;
 2444 }
 2445 
 2446 static void
 2447 gst_ptp_clock_set_property (GObject * object, guint prop_id,
 2448     const GValue * value, GParamSpec * pspec)
 2449 {
 2450   GstPtpClock *self = GST_PTP_CLOCK (object);
 2451 
 2452   switch (prop_id) {
 2453     case PROP_DOMAIN:
 2454       self->priv->domain = g_value_get_uint (value);
 2455       gst_ptp_clock_ensure_domain_clock (self);
 2456       if (!self->priv->domain_clock)
 2457         self->priv->domain_stats_id =
 2458             gst_ptp_statistics_callback_add (gst_ptp_clock_stats_callback, self,
 2459             NULL);
 2460       break;
 2461     default:
 2462       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
 2463       break;
 2464   }
 2465 }
 2466 
 2467 static void
 2468 gst_ptp_clock_get_property (GObject * object, guint prop_id,
 2469     GValue * value, GParamSpec * pspec)
 2470 {
 2471   GstPtpClock *self = GST_PTP_CLOCK (object);
 2472 
 2473   switch (prop_id) {
 2474     case PROP_DOMAIN:
 2475       g_value_set_uint (value, self->priv->domain);
 2476       break;
 2477     case PROP_INTERNAL_CLOCK:
 2478       gst_ptp_clock_ensure_domain_clock (self);
 2479       g_value_set_object (value, self->priv->domain_clock);
 2480       break;
 2481     case PROP_MASTER_CLOCK_ID:
 2482     case PROP_GRANDMASTER_CLOCK_ID:{
 2483       GList *l;
 2484 
 2485       g_mutex_lock (&domain_clocks_lock);
 2486       g_value_set_uint64 (value, 0);
 2487 
 2488       for (l = domain_clocks; l; l = l->next) {
 2489         PtpDomainData *clock_data = l->data;
 2490 
 2491         if (clock_data->domain == self->priv->domain) {
 2492           if (prop_id == PROP_MASTER_CLOCK_ID)
 2493             g_value_set_uint64 (value,
 2494                 clock_data->master_clock_identity.clock_identity);
 2495           else
 2496             g_value_set_uint64 (value, clock_data->grandmaster_identity);
 2497           break;
 2498         }
 2499       }
 2500       g_mutex_unlock (&domain_clocks_lock);
 2501       break;
 2502     }
 2503     default:
 2504       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
 2505       break;
 2506   }
 2507 }
 2508 
 2509 static void
 2510 gst_ptp_clock_finalize (GObject * object)
 2511 {
 2512   GstPtpClock *self = GST_PTP_CLOCK (object);
 2513 
 2514   if (self->priv->domain_stats_id)
 2515     gst_ptp_statistics_callback_remove (self->priv->domain_stats_id);
 2516 
 2517   G_OBJECT_CLASS (gst_ptp_clock_parent_class)->finalize (object);
 2518 }
 2519 
 2520 static GstClockTime
 2521 gst_ptp_clock_get_internal_time (GstClock * clock)
 2522 {
 2523   GstPtpClock *self = GST_PTP_CLOCK (clock);
 2524 
 2525   gst_ptp_clock_ensure_domain_clock (self);
 2526 
 2527   if (!self->priv->domain_clock) {
 2528     GST_ERROR_OBJECT (self, "Domain %u has no clock yet and is not synced",
 2529         self->priv->domain);
 2530     return GST_CLOCK_TIME_NONE;
 2531   }
 2532 
 2533   return gst_clock_get_time (self->priv->domain_clock);
 2534 }
 2535 
 2536 /**
 2537  * gst_ptp_clock_new:
 2538  * @name: Name of the clock
 2539  * @domain: PTP domain
 2540  *
 2541  * Creates a new PTP clock instance that exports the PTP time of the master
 2542  * clock in @domain. This clock can be slaved to other clocks as needed.
 2543  *
 2544  * If gst_ptp_init() was not called before, this will call gst_ptp_init() with
 2545  * default parameters.
 2546  *
 2547  * This clock only returns valid timestamps after it received the first
 2548  * times from the PTP master clock on the network. Once this happens the
 2549  * GstPtpClock::internal-clock property will become non-NULL. You can
 2550  * check this with gst_clock_wait_for_sync(), the GstClock::synced signal and
 2551  * gst_clock_is_synced().
 2552  *
 2553  * Returns: (transfer full): A new #GstClock
 2554  *
 2555  * Since: 1.6
 2556  */
 2557 GstClock *
 2558 gst_ptp_clock_new (const gchar * name, guint domain)
 2559 {
 2560   GstClock *clock;
 2561 
 2562   g_return_val_if_fail (domain <= G_MAXUINT8, NULL);
 2563 
 2564   if (!initted && !gst_ptp_init (GST_PTP_CLOCK_ID_NONE, NULL)) {
 2565     GST_ERROR ("Failed to initialize PTP");
 2566     return NULL;
 2567   }
 2568 
 2569   clock = g_object_new (GST_TYPE_PTP_CLOCK, "name", name, "domain", domain,
 2570       NULL);
 2571 
 2572   /* Clear floating flag */
 2573   gst_object_ref_sink (clock);
 2574 
 2575   return clock;
 2576 }
 2577 
 2578 typedef struct
 2579 {
 2580   guint8 domain;
 2581   const GstStructure *stats;
 2582 } DomainStatsMarshalData;
 2583 
 2584 static void
 2585 domain_stats_marshaller (GHook * hook, DomainStatsMarshalData * data)
 2586 {
 2587   GstPtpStatisticsCallback callback = (GstPtpStatisticsCallback) hook->func;
 2588 
 2589   if (!callback (data->domain, data->stats, hook->data))
 2590     g_hook_destroy (&domain_stats_hooks, hook->hook_id);
 2591 }
 2592 
 2593 static void
 2594 emit_ptp_statistics (guint8 domain, const GstStructure * stats)
 2595 {
 2596   DomainStatsMarshalData data = { domain, stats };
 2597 
 2598   g_mutex_lock (&ptp_lock);
 2599   g_hook_list_marshal (&domain_stats_hooks, TRUE,
 2600       (GHookMarshaller) domain_stats_marshaller, &data);
 2601   g_mutex_unlock (&ptp_lock);
 2602 }
 2603 
 2604 /**
 2605  * gst_ptp_statistics_callback_add:
 2606  * @callback: GstPtpStatisticsCallback to call
 2607  * @user_data: Data to pass to the callback
 2608  * @destroy_data: GDestroyNotify to destroy the data
 2609  *
 2610  * Installs a new statistics callback for gathering PTP statistics. See
 2611  * GstPtpStatisticsCallback for a list of statistics that are provided.
 2612  *
 2613  * Returns: Id for the callback that can be passed to
 2614  * gst_ptp_statistics_callback_remove()
 2615  *
 2616  * Since: 1.6
 2617  */
 2618 gulong
 2619 gst_ptp_statistics_callback_add (GstPtpStatisticsCallback callback,
 2620     gpointer user_data, GDestroyNotify destroy_data)
 2621 {
 2622   GHook *hook;
 2623 
 2624   g_mutex_lock (&ptp_lock);
 2625 
 2626   if (!domain_stats_hooks_initted) {
 2627     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
 2628     domain_stats_hooks_initted = TRUE;
 2629   }
 2630 
 2631   hook = g_hook_alloc (&domain_stats_hooks);
 2632   hook->func = callback;
 2633   hook->data = user_data;
 2634   hook->destroy = destroy_data;
 2635   g_hook_prepend (&domain_stats_hooks, hook);
 2636   g_atomic_int_add (&domain_stats_n_hooks, 1);
 2637 
 2638   g_mutex_unlock (&ptp_lock);
 2639 
 2640   return hook->hook_id;
 2641 }
 2642 
 2643 /**
 2644  * gst_ptp_statistics_callback_remove:
 2645  * @id: Callback id to remove
 2646  *
 2647  * Removes a PTP statistics callback that was previously added with
 2648  * gst_ptp_statistics_callback_add().
 2649  *
 2650  * Since: 1.6
 2651  */
 2652 void
 2653 gst_ptp_statistics_callback_remove (gulong id)
 2654 {
 2655   g_mutex_lock (&ptp_lock);
 2656   if (g_hook_destroy (&domain_stats_hooks, id))
 2657     g_atomic_int_add (&domain_stats_n_hooks, -1);
 2658   g_mutex_unlock (&ptp_lock);
 2659 }