"Fossies" - the Fresh Open Source Software Archive

Member "ponyc-0.33.0/src/libponyrt/actor/actor.c" (1 Nov 2019, 30043 Bytes) of package /linux/misc/ponyc-0.33.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "actor.c" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 0.31.0_vs_0.32.0.

    1 #define PONY_WANT_ATOMIC_DEFS
    2 
    3 #include "actor.h"
    4 #include "../sched/scheduler.h"
    5 #include "../sched/cpu.h"
    6 #include "../mem/pool.h"
    7 #include "../gc/cycle.h"
    8 #include "../gc/trace.h"
    9 #include "ponyassert.h"
   10 #include <assert.h>
   11 #include <string.h>
   12 #include <dtrace.h>
   13 
   14 #ifdef USE_VALGRIND
   15 #include <valgrind/helgrind.h>
   16 #endif
   17 
   18 // default actor batch size
   19 #define PONY_SCHED_BATCH 100
   20 
   21 // Ignore padding at the end of the type.
   22 pony_static_assert((offsetof(pony_actor_t, gc) + sizeof(gc_t)) ==
   23    sizeof(pony_actor_pad_t), "Wrong actor pad size!");
   24 
   25 static bool actor_noblock = false;
   26 
   27 // The flags of a given actor cannot be mutated from more than one actor at
   28 // once, so these operations need not be atomic RMW.
   29 
   30 bool has_flag(pony_actor_t* actor, uint8_t flag)
   31 {
   32   uint8_t flags = atomic_load_explicit(&actor->flags, memory_order_relaxed);
   33   return (flags & flag) != 0;
   34 }
   35 
   36 static void set_flag(pony_actor_t* actor, uint8_t flag)
   37 {
   38   uint8_t flags = atomic_load_explicit(&actor->flags, memory_order_relaxed);
   39   atomic_store_explicit(&actor->flags, flags | flag, memory_order_relaxed);
   40 }
   41 
   42 static void unset_flag(pony_actor_t* actor, uint8_t flag)
   43 {
   44   uint8_t flags = atomic_load_explicit(&actor->flags, memory_order_relaxed);
   45   atomic_store_explicit(&actor->flags, flags & (uint8_t)~flag,
   46     memory_order_relaxed);
   47 }
   48 
   49 #ifndef PONY_NDEBUG
   50 static bool well_formed_msg_chain(pony_msg_t* first, pony_msg_t* last)
   51 {
   52   // A message chain is well formed if last is reachable from first and is the
   53   // end of the chain. first should also be the start of the chain but we can't
   54   // verify that.
   55 
   56   if((first == NULL) || (last == NULL) ||
   57     (atomic_load_explicit(&last->next, memory_order_relaxed) != NULL))
   58     return false;
   59 
   60   pony_msg_t* m1 = first;
   61   pony_msg_t* m2 = first;
   62 
   63   while((m1 != NULL) && (m2 != NULL))
   64   {
   65     if(m2 == last)
   66       return true;
   67 
   68     m2 = atomic_load_explicit(&m2->next, memory_order_relaxed);
   69 
   70     if(m2 == last)
   71       return true;
   72     if(m2 == NULL)
   73       return false;
   74 
   75     m1 = atomic_load_explicit(&m1->next, memory_order_relaxed);
   76     m2 = atomic_load_explicit(&m2->next, memory_order_relaxed);
   77 
   78     if(m1 == m2)
   79       return false;
   80   }
   81 
   82   return false;
   83 }
   84 #endif
   85 
   86 static void send_unblock(pony_ctx_t* ctx, pony_actor_t* actor)
   87 {
   88   // Send unblock before continuing.
   89   unset_flag(actor, FLAG_BLOCKED | FLAG_BLOCKED_SENT);
   90   ponyint_cycle_unblock(ctx, actor);
   91 }
   92 
   93 static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
   94   pony_msg_t* msg)
   95 {
   96 #ifdef USE_MEMTRACK_MESSAGES
   97   ctx->num_messages--;
   98 #endif
   99 
  100   switch(msg->id)
  101   {
  102     case ACTORMSG_ACQUIRE:
  103     {
  104 #ifdef USE_MEMTRACK_MESSAGES
  105       ctx->mem_used_messages -= sizeof(pony_msgp_t);
  106       ctx->mem_allocated_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
  107 #endif
  108 
  109       pony_assert(!ponyint_is_cycle(actor));
  110       pony_msgp_t* m = (pony_msgp_t*)msg;
  111 
  112 #ifdef USE_MEMTRACK
  113       ctx->mem_used_actors -= (sizeof(actorref_t)
  114         + ponyint_objectmap_total_mem_size(&((actorref_t*)m->p)->map));
  115       ctx->mem_allocated_actors -= (POOL_ALLOC_SIZE(actorref_t)
  116         + ponyint_objectmap_total_alloc_size(&((actorref_t*)m->p)->map));
  117 #endif
  118 
  119       if(ponyint_gc_acquire(&actor->gc, (actorref_t*)m->p) &&
  120         has_flag(actor, FLAG_BLOCKED_SENT))
  121       {
  122         // send unblock if we've sent a block
  123         send_unblock(ctx, actor);
  124       }
  125 
  126       return false;
  127     }
  128 
  129     case ACTORMSG_RELEASE:
  130     {
  131 #ifdef USE_MEMTRACK_MESSAGES
  132       ctx->mem_used_messages -= sizeof(pony_msgp_t);
  133       ctx->mem_allocated_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
  134 #endif
  135 
  136       pony_assert(!ponyint_is_cycle(actor));
  137       pony_msgp_t* m = (pony_msgp_t*)msg;
  138 
  139 #ifdef USE_MEMTRACK
  140       ctx->mem_used_actors -= (sizeof(actorref_t)
  141         + ponyint_objectmap_total_mem_size(&((actorref_t*)m->p)->map));
  142       ctx->mem_allocated_actors -= (POOL_ALLOC_SIZE(actorref_t)
  143         + ponyint_objectmap_total_alloc_size(&((actorref_t*)m->p)->map));
  144 #endif
  145 
  146       if(ponyint_gc_release(&actor->gc, (actorref_t*)m->p) &&
  147         has_flag(actor, FLAG_BLOCKED_SENT))
  148       {
  149         // send unblock if we've sent a block
  150         send_unblock(ctx, actor);
  151       }
  152 
  153       return false;
  154     }
  155 
  156     case ACTORMSG_ACK:
  157     {
  158 #ifdef USE_MEMTRACK_MESSAGES
  159       ctx->mem_used_messages -= sizeof(pony_msgi_t);
  160       ctx->mem_allocated_messages -= POOL_ALLOC_SIZE(pony_msgi_t);
  161 #endif
  162 
  163       pony_assert(ponyint_is_cycle(actor));
  164       DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
  165       actor->type->dispatch(ctx, actor, msg);
  166       return false;
  167     }
  168 
  169     case ACTORMSG_CONF:
  170     {
  171 #ifdef USE_MEMTRACK_MESSAGES
  172       ctx->mem_used_messages -= sizeof(pony_msgi_t);
  173       ctx->mem_allocated_messages -= POOL_ALLOC_SIZE(pony_msgi_t);
  174 #endif
  175 
  176       pony_assert(!ponyint_is_cycle(actor));
  177       if(has_flag(actor, FLAG_BLOCKED_SENT))
  178       {
  179         // We've sent a block message, send confirm.
  180         pony_msgi_t* m = (pony_msgi_t*)msg;
  181         ponyint_cycle_ack(ctx, m->i);
  182       }
  183 
  184       return false;
  185     }
  186 
  187     case ACTORMSG_ISBLOCKED:
  188     {
  189 #ifdef USE_MEMTRACK_MESSAGES
  190       ctx->mem_used_messages -= sizeof(pony_msg_t);
  191       ctx->mem_allocated_messages -= POOL_ALLOC_SIZE(pony_msg_t);
  192 #endif
  193 
  194       pony_assert(!ponyint_is_cycle(actor));
  195       if(has_flag(actor, FLAG_BLOCKED) && !has_flag(actor, FLAG_BLOCKED_SENT))
  196       {
  197         // We're blocked, send block message.
  198         set_flag(actor, FLAG_BLOCKED_SENT);
  199         ponyint_cycle_block(ctx, actor, &actor->gc);
  200       }
  201 
  202       return false;
  203     }
  204 
  205     case ACTORMSG_BLOCK:
  206     {
  207       // memtrack messages tracked in cycle detector
  208 
  209       pony_assert(ponyint_is_cycle(actor));
  210       DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
  211       actor->type->dispatch(ctx, actor, msg);
  212       return false;
  213     }
  214 
  215     case ACTORMSG_UNBLOCK:
  216     {
  217 #ifdef USE_MEMTRACK_MESSAGES
  218       ctx->mem_used_messages -= sizeof(pony_msgp_t);
  219       ctx->mem_allocated_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
  220 #endif
  221 
  222       pony_assert(ponyint_is_cycle(actor));
  223       DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
  224       actor->type->dispatch(ctx, actor, msg);
  225       return false;
  226     }
  227 
  228     case ACTORMSG_CREATED:
  229     {
  230 #ifdef USE_MEMTRACK_MESSAGES
  231       ctx->mem_used_messages -= sizeof(pony_msgp_t);
  232       ctx->mem_allocated_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
  233 #endif
  234 
  235       pony_assert(ponyint_is_cycle(actor));
  236       DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
  237       actor->type->dispatch(ctx, actor, msg);
  238       return false;
  239     }
  240 
  241     case ACTORMSG_DESTROYED:
  242     {
  243 #ifdef USE_MEMTRACK_MESSAGES
  244       ctx->mem_used_messages -= sizeof(pony_msgp_t);
  245       ctx->mem_allocated_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
  246 #endif
  247 
  248       pony_assert(ponyint_is_cycle(actor));
  249       DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
  250       actor->type->dispatch(ctx, actor, msg);
  251       return false;
  252     }
  253 
  254     case ACTORMSG_CHECKBLOCKED:
  255     {
  256 #ifdef USE_MEMTRACK_MESSAGES
  257       ctx->mem_used_messages -= sizeof(pony_msg_t);
  258       ctx->mem_allocated_messages -= POOL_ALLOC_SIZE(pony_msg_t);
  259 #endif
  260 
  261       pony_assert(ponyint_is_cycle(actor));
  262       DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
  263       actor->type->dispatch(ctx, actor, msg);
  264       return false;
  265     }
  266 
  267     default:
  268     {
  269 #ifdef USE_MEMTRACK_MESSAGES
  270       ctx->mem_used_messages -= POOL_SIZE(msg->index);
  271       ctx->mem_allocated_messages -= POOL_SIZE(msg->index);
  272 #endif
  273 
  274       pony_assert(!ponyint_is_cycle(actor));
  275       if(has_flag(actor, FLAG_BLOCKED_SENT))
  276       {
  277         // send unblock if we've sent a block
  278         send_unblock(ctx, actor);
  279       }
  280 
  281       DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
  282       actor->type->dispatch(ctx, actor, msg);
  283       return true;
  284     }
  285   }
  286 }
  287 
  288 static void try_gc(pony_ctx_t* ctx, pony_actor_t* actor)
  289 {
  290   if(!ponyint_heap_startgc(&actor->heap))
  291     return;
  292 
  293   DTRACE1(GC_START, (uintptr_t)ctx->scheduler);
  294 
  295   ponyint_gc_mark(ctx);
  296 
  297   if(actor->type->trace != NULL)
  298     actor->type->trace(ctx, actor);
  299 
  300   ponyint_mark_done(ctx);
  301   ponyint_heap_endgc(&actor->heap);
  302 
  303   DTRACE1(GC_END, (uintptr_t)ctx->scheduler);
  304 }
  305 
  306 // return true if mute occurs
  307 static bool maybe_mute(pony_actor_t* actor)
  308 {
  309   // if we become muted as a result of handling a message, bail out now.
  310   // we aren't set to "muted" at this point. setting to muted during a
  311   // a behavior can lead to race conditions that might result in a
  312   // deadlock.
  313   // Given that actor's are not run when they are muted, then when we
  314   // started out batch, actor->muted would have been 0. If any of our
  315   // message sends would result in the actor being muted, that value will
  316   // have changed to greater than 0.
  317   //
  318   // We will then set the actor to "muted". Once set, any actor sending
  319   // a message to it will be also be muted unless said sender is marked
  320   // as overloaded.
  321   //
  322   // The key points here is that:
  323   //   1. We can't set the actor to "muted" until after its finished running
  324   //   a behavior.
  325   //   2. We should bail out from running the actor and return false so that
  326   //   it won't be rescheduled.
  327   if(atomic_load_explicit(&actor->muted, memory_order_relaxed) > 0)
  328   {
  329     ponyint_mute_actor(actor);
  330     return true;
  331   }
  332 
  333   return false;
  334 }
  335 
  336 static bool batch_limit_reached(pony_actor_t* actor, bool polling)
  337 {
  338   if(!has_flag(actor, FLAG_OVERLOADED) && !polling)
  339   {
  340     // If we hit our batch size, consider this actor to be overloaded
  341     // only if we're not polling from C code.
  342     // Overloaded actors are allowed to send to other overloaded actors
  343     // and to muted actors without being muted themselves.
  344     ponyint_actor_setoverloaded(actor);
  345   }
  346 
  347   return !has_flag(actor, FLAG_UNSCHEDULED);
  348 }
  349 
  350 bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)
  351 {
  352   pony_assert(!ponyint_is_muted(actor));
  353   ctx->current = actor;
  354   size_t batch = PONY_SCHED_BATCH;
  355 
  356   pony_msg_t* msg;
  357   size_t app = 0;
  358 
  359 #ifdef USE_ACTOR_CONTINUATIONS
  360   while(actor->continuation != NULL)
  361   {
  362     msg = actor->continuation;
  363     actor->continuation = atomic_load_explicit(&msg->next,
  364       memory_order_relaxed);
  365     bool ret = handle_message(ctx, actor, msg);
  366     ponyint_pool_free(msg->index, msg);
  367 
  368     if(ret)
  369     {
  370       // If we handle an application message, try to gc.
  371       app++;
  372       try_gc(ctx, actor);
  373 
  374       // maybe mute actor
  375       if(maybe_mute(actor))
  376         return false;
  377 
  378       // if we've reached our batch limit
  379       // or if we're polling where we want to stop after one app message
  380       if(app == batch || polling)
  381         return batch_limit_reached(actor, polling);
  382     }
  383   }
  384 #endif
  385 
  386   // If we have been scheduled, the head will not be marked as empty.
  387   pony_msg_t* head = atomic_load_explicit(&actor->q.head, memory_order_relaxed);
  388 
  389   while((msg = ponyint_actor_messageq_pop(&actor->q
  390 #ifdef USE_DYNAMIC_TRACE
  391     , ctx->scheduler, ctx->current
  392 #endif
  393     )) != NULL)
  394   {
  395     if(handle_message(ctx, actor, msg))
  396     {
  397       // If we handle an application message, try to gc.
  398       app++;
  399       try_gc(ctx, actor);
  400 
  401       // maybe mute actor; returns true if mute occurs
  402       if(maybe_mute(actor))
  403         return false;
  404 
  405       // if we've reached our batch limit
  406       // or if we're polling where we want to stop after one app message
  407       if(app == batch || polling)
  408         return batch_limit_reached(actor, polling);
  409     }
  410 
  411     // Stop handling a batch if we reach the head we found when we were
  412     // scheduled.
  413     if(msg == head)
  414       break;
  415   }
  416 
  417   // We didn't hit our app message batch limit. We now believe our queue to be
  418   // empty, but we may have received further messages.
  419   pony_assert(app < batch);
  420   pony_assert(!ponyint_is_muted(actor));
  421 
  422   if(has_flag(actor, FLAG_OVERLOADED))
  423   {
  424     // if we were overloaded and didn't process a full batch, set ourselves as
  425     // no longer overloaded. Once this is done:
  426     // 1- sending to this actor is no longer grounds for an actor being muted
  427     // 2- this actor can no longer send to other actors free from muting should
  428     //    the receiver be overloaded or muted
  429     ponyint_actor_unsetoverloaded(actor);
  430   }
  431 
  432   try_gc(ctx, actor);
  433 
  434   if(has_flag(actor, FLAG_UNSCHEDULED))
  435   {
  436     // When unscheduling, don't mark the queue as empty, since we don't want
  437     // to get rescheduled if we receive a message.
  438     return false;
  439   }
  440 
  441   // If we have processed any application level messages, defer blocking.
  442   if(app > 0)
  443     return true;
  444 
  445   // note that we're logically blocked
  446   if(!has_flag(actor, FLAG_BLOCKED | FLAG_SYSTEM | FLAG_BLOCKED_SENT))
  447   {
  448     set_flag(actor, FLAG_BLOCKED);
  449   }
  450 
  451   bool empty = ponyint_messageq_markempty(&actor->q);
  452   if (empty && actor_noblock && (actor->gc.rc == 0))
  453   {
  454     // when 'actor_noblock` is true, the cycle detector isn't running.
  455     // this means actors won't be garbage collected unless we take special
  456     // action. Here, we know that:
  457     // - the actor has no messages in its queue
  458     // - there's no references to this actor
  459     // therefore if `noblock` is on, we should garbage collect the actor.
  460     ponyint_actor_setpendingdestroy(actor);
  461     ponyint_actor_final(ctx, actor);
  462     ponyint_actor_destroy(actor);
  463   }
  464 
  465   // Return true (i.e. reschedule immediately) if our queue isn't empty.
  466   return !empty;
  467 }
  468 
  469 void ponyint_actor_destroy(pony_actor_t* actor)
  470 {
  471   pony_assert(has_flag(actor, FLAG_PENDINGDESTROY));
  472 
  473   // Make sure the actor being destroyed has finished marking its queue
  474   // as empty. Otherwise, it may spuriously see that tail and head are not
  475   // the same and fail to mark the queue as empty, resulting in it getting
  476   // rescheduled.
  477   pony_msg_t* head = NULL;
  478   do
  479   {
  480     head = atomic_load_explicit(&actor->q.head, memory_order_relaxed);
  481   } while(((uintptr_t)head & (uintptr_t)1) != (uintptr_t)1);
  482 
  483   atomic_thread_fence(memory_order_acquire);
  484 #ifdef USE_VALGRIND
  485   ANNOTATE_HAPPENS_AFTER(&actor->q.head);
  486 #endif
  487 
  488   ponyint_messageq_destroy(&actor->q);
  489   ponyint_gc_destroy(&actor->gc);
  490   ponyint_heap_destroy(&actor->heap);
  491 
  492 #ifdef USE_MEMTRACK
  493   pony_ctx_t* ctx = pony_ctx();
  494   ctx->mem_used_actors -= actor->type->size;
  495   ctx->mem_allocated_actors -= ponyint_pool_used_size(actor->type->size);
  496 #endif
  497 
  498   // Free variable sized actors correctly.
  499   ponyint_pool_free_size(actor->type->size, actor);
  500 }
  501 
  502 gc_t* ponyint_actor_gc(pony_actor_t* actor)
  503 {
  504   return &actor->gc;
  505 }
  506 
  507 heap_t* ponyint_actor_heap(pony_actor_t* actor)
  508 {
  509   return &actor->heap;
  510 }
  511 
  512 bool ponyint_actor_pendingdestroy(pony_actor_t* actor)
  513 {
  514   return has_flag(actor, FLAG_PENDINGDESTROY);
  515 }
  516 
  517 void ponyint_actor_setpendingdestroy(pony_actor_t* actor)
  518 {
  519   // This is thread-safe, even though the flag is set from the cycle detector.
  520   // The function is only called after the cycle detector has detected a true
  521   // cycle and an actor won't change its flags if it is part of a true cycle.
  522   // The synchronisation is done through the ACK message sent by the actor to
  523   // the cycle detector.
  524   set_flag(actor, FLAG_PENDINGDESTROY);
  525 }
  526 
  527 void ponyint_actor_final(pony_ctx_t* ctx, pony_actor_t* actor)
  528 {
  529   // This gets run while the cycle detector is handling a message. Set the
  530   // current actor before running anything.
  531   pony_actor_t* prev = ctx->current;
  532   ctx->current = actor;
  533 
  534   // Run the actor finaliser if it has one.
  535   if(actor->type->final != NULL)
  536     actor->type->final(actor);
  537 
  538   // Run all outstanding object finalisers.
  539   ponyint_heap_final(&actor->heap);
  540 
  541   // Restore the current actor.
  542   ctx->current = prev;
  543 }
  544 
  545 void ponyint_actor_sendrelease(pony_ctx_t* ctx, pony_actor_t* actor)
  546 {
  547   ponyint_gc_sendrelease(ctx, &actor->gc);
  548 }
  549 
  550 void ponyint_actor_setsystem(pony_actor_t* actor)
  551 {
  552   set_flag(actor, FLAG_SYSTEM);
  553 }
  554 
  555 void ponyint_actor_setnoblock(bool state)
  556 {
  557   actor_noblock = state;
  558 }
  559 
  560 bool ponyint_actor_getnoblock()
  561 {
  562   return actor_noblock;
  563 }
  564 
  565 PONY_API pony_actor_t* pony_create(pony_ctx_t* ctx, pony_type_t* type)
  566 {
  567   pony_assert(type != NULL);
  568 
  569   // allocate variable sized actors correctly
  570   pony_actor_t* actor = (pony_actor_t*)ponyint_pool_alloc_size(type->size);
  571   memset(actor, 0, type->size);
  572   actor->type = type;
  573 
  574 #ifdef USE_MEMTRACK
  575   ctx->mem_used_actors += type->size;
  576   ctx->mem_allocated_actors += ponyint_pool_used_size(type->size);
  577 #endif
  578 
  579   ponyint_messageq_init(&actor->q);
  580   ponyint_heap_init(&actor->heap);
  581   ponyint_gc_done(&actor->gc);
  582 
  583   if(actor_noblock)
  584     ponyint_actor_setsystem(actor);
  585 
  586   if(ctx->current != NULL)
  587   {
  588     // actors begin unblocked and referenced by the creating actor
  589     actor->gc.rc = GC_INC_MORE;
  590     ponyint_gc_createactor(ctx->current, actor);
  591   } else {
  592     // no creator, so the actor isn't referenced by anything
  593     actor->gc.rc = 0;
  594   }
  595 
  596   // tell the cycle detector we exist if block messages are enabled
  597   if(!actor_noblock)
  598     ponyint_cycle_actor_created(ctx, actor);
  599 
  600   DTRACE2(ACTOR_ALLOC, (uintptr_t)ctx->scheduler, (uintptr_t)actor);
  601   return actor;
  602 }
  603 
  604 PONY_API void ponyint_destroy(pony_ctx_t* ctx, pony_actor_t* actor)
  605 {
  606   // This destroys an actor immediately.
  607   // The finaliser is not called.
  608 
  609   // Notify cycle detector of actor being destroyed
  610   ponyint_cycle_actor_destroyed(ctx, actor);
  611 
  612   ponyint_actor_setpendingdestroy(actor);
  613   ponyint_actor_destroy(actor);
  614 }
  615 
  616 PONY_API pony_msg_t* pony_alloc_msg(uint32_t index, uint32_t id)
  617 {
  618 #ifdef USE_MEMTRACK_MESSAGES
  619   pony_ctx_t* ctx = pony_ctx();
  620   ctx->mem_used_messages += POOL_SIZE(index);
  621   ctx->mem_allocated_messages += POOL_SIZE(index);
  622   ctx->num_messages++;
  623 #endif
  624 
  625   pony_msg_t* msg = (pony_msg_t*)ponyint_pool_alloc(index);
  626   msg->index = index;
  627   msg->id = id;
  628 #ifndef PONY_NDEBUG
  629   atomic_store_explicit(&msg->next, NULL, memory_order_relaxed);
  630 #endif
  631 
  632   return msg;
  633 }
  634 
  635 PONY_API pony_msg_t* pony_alloc_msg_size(size_t size, uint32_t id)
  636 {
  637   return pony_alloc_msg((uint32_t)ponyint_pool_index(size), id);
  638 }
  639 
  640 PONY_API void pony_sendv(pony_ctx_t* ctx, pony_actor_t* to, pony_msg_t* first,
  641   pony_msg_t* last, bool has_app_msg)
  642 {
  643   // The function takes a prebuilt chain instead of varargs because the latter
  644   // is expensive and very hard to optimise.
  645 
  646   pony_assert(well_formed_msg_chain(first, last));
  647 
  648   if(DTRACE_ENABLED(ACTOR_MSG_SEND))
  649   {
  650     pony_msg_t* m = first;
  651 
  652     while(m != last)
  653     {
  654       DTRACE4(ACTOR_MSG_SEND, (uintptr_t)ctx->scheduler, m->id,
  655         (uintptr_t)ctx->current, (uintptr_t)to);
  656       m = atomic_load_explicit(&m->next, memory_order_relaxed);
  657     }
  658 
  659     DTRACE4(ACTOR_MSG_SEND, (uintptr_t)ctx->scheduler, last->id,
  660         (uintptr_t)ctx->current, (uintptr_t)to);
  661   }
  662 
  663   if(has_app_msg)
  664     ponyint_maybe_mute(ctx, to);
  665 
  666   if(ponyint_actor_messageq_push(&to->q, first, last
  667 #ifdef USE_DYNAMIC_TRACE
  668     , ctx->scheduler, ctx->current, to
  669 #endif
  670     ))
  671   {
  672     if(!has_flag(to, FLAG_UNSCHEDULED) && !ponyint_is_muted(to))
  673     {
  674       ponyint_sched_add(ctx, to);
  675     }
  676   }
  677 }
  678 
  679 PONY_API void pony_sendv_single(pony_ctx_t* ctx, pony_actor_t* to,
  680   pony_msg_t* first, pony_msg_t* last, bool has_app_msg)
  681 {
  682   // The function takes a prebuilt chain instead of varargs because the latter
  683   // is expensive and very hard to optimise.
  684 
  685   pony_assert(well_formed_msg_chain(first, last));
  686 
  687   if(DTRACE_ENABLED(ACTOR_MSG_SEND))
  688   {
  689     pony_msg_t* m = first;
  690 
  691     while(m != last)
  692     {
  693       DTRACE4(ACTOR_MSG_SEND, (uintptr_t)ctx->scheduler, m->id,
  694         (uintptr_t)ctx->current, (uintptr_t)to);
  695       m = atomic_load_explicit(&m->next, memory_order_relaxed);
  696     }
  697 
  698     DTRACE4(ACTOR_MSG_SEND, (uintptr_t)ctx->scheduler, last->id,
  699         (uintptr_t)ctx->current, (uintptr_t)to);
  700   }
  701 
  702   if(has_app_msg)
  703     ponyint_maybe_mute(ctx, to);
  704 
  705   if(ponyint_actor_messageq_push_single(&to->q, first, last
  706 #ifdef USE_DYNAMIC_TRACE
  707     , ctx->scheduler, ctx->current, to
  708 #endif
  709     ))
  710   {
  711     if(!has_flag(to, FLAG_UNSCHEDULED) && !ponyint_is_muted(to))
  712     {
  713       // if the receiving actor is currently not unscheduled AND it's not
  714       // muted, schedule it.
  715       ponyint_sched_add(ctx, to);
  716     }
  717   }
  718 }
  719 
  720 void ponyint_maybe_mute(pony_ctx_t* ctx, pony_actor_t* to)
  721 {
  722   if(ctx->current != NULL)
  723   {
  724     // only mute a sender IF:
  725     // 1. the receiver is overloaded/under pressure/muted
  726     // AND
  727     // 2. the sender isn't overloaded or under pressure
  728     // AND
  729     // 3. we are sending to another actor (as compared to sending to self)
  730     if(ponyint_triggers_muting(to) &&
  731        !has_flag(ctx->current, FLAG_OVERLOADED) &&
  732        !has_flag(ctx->current, FLAG_UNDER_PRESSURE) &&
  733        ctx->current != to)
  734     {
  735       ponyint_sched_mute(ctx, ctx->current, to);
  736     }
  737   }
  738 }
  739 
  740 PONY_API void pony_chain(pony_msg_t* prev, pony_msg_t* next)
  741 {
  742   pony_assert(atomic_load_explicit(&prev->next, memory_order_relaxed) == NULL);
  743   atomic_store_explicit(&prev->next, next, memory_order_relaxed);
  744 }
  745 
  746 PONY_API void pony_send(pony_ctx_t* ctx, pony_actor_t* to, uint32_t id)
  747 {
  748 #ifdef USE_MEMTRACK_MESSAGES
  749   ctx->mem_used_messages += sizeof(pony_msg_t);
  750   ctx->mem_used_messages -= POOL_ALLOC_SIZE(pony_msg_t);
  751 #endif
  752 
  753   pony_msg_t* m = pony_alloc_msg(POOL_INDEX(sizeof(pony_msg_t)), id);
  754   pony_sendv(ctx, to, m, m, id <= ACTORMSG_APPLICATION_START);
  755 }
  756 
  757 PONY_API void pony_sendp(pony_ctx_t* ctx, pony_actor_t* to, uint32_t id,
  758   void* p)
  759 {
  760 #ifdef USE_MEMTRACK_MESSAGES
  761   ctx->mem_used_messages += sizeof(pony_msgp_t);
  762   ctx->mem_used_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
  763 #endif
  764 
  765   pony_msgp_t* m = (pony_msgp_t*)pony_alloc_msg(
  766     POOL_INDEX(sizeof(pony_msgp_t)), id);
  767   m->p = p;
  768 
  769   pony_sendv(ctx, to, &m->msg, &m->msg, id <= ACTORMSG_APPLICATION_START);
  770 }
  771 
  772 PONY_API void pony_sendi(pony_ctx_t* ctx, pony_actor_t* to, uint32_t id,
  773   intptr_t i)
  774 {
  775 #ifdef USE_MEMTRACK_MESSAGES
  776   ctx->mem_used_messages += sizeof(pony_msgi_t);
  777   ctx->mem_used_messages -= POOL_ALLOC_SIZE(pony_msgi_t);
  778 #endif
  779 
  780   pony_msgi_t* m = (pony_msgi_t*)pony_alloc_msg(
  781     POOL_INDEX(sizeof(pony_msgi_t)), id);
  782   m->i = i;
  783 
  784   pony_sendv(ctx, to, &m->msg, &m->msg, id <= ACTORMSG_APPLICATION_START);
  785 }
  786 
  787 #ifdef USE_ACTOR_CONTINUATIONS
  788 PONY_API void pony_continuation(pony_ctx_t* ctx, pony_msg_t* m)
  789 {
  790   pony_assert(ctx->current != NULL);
  791   pony_actor_t* self = ctx->current;
  792   atomic_store_explicit(&m->next, self->continuation, memory_order_relaxed);
  793   self->continuation = m;
  794 }
  795 #endif
  796 
  797 PONY_API void* pony_alloc(pony_ctx_t* ctx, size_t size)
  798 {
  799   pony_assert(ctx->current != NULL);
  800   DTRACE2(HEAP_ALLOC, (uintptr_t)ctx->scheduler, size);
  801 
  802   return ponyint_heap_alloc(ctx->current, &ctx->current->heap, size);
  803 }
  804 
  805 PONY_API void* pony_alloc_small(pony_ctx_t* ctx, uint32_t sizeclass)
  806 {
  807   pony_assert(ctx->current != NULL);
  808   DTRACE2(HEAP_ALLOC, (uintptr_t)ctx->scheduler, HEAP_MIN << sizeclass);
  809 
  810   return ponyint_heap_alloc_small(ctx->current, &ctx->current->heap, sizeclass);
  811 }
  812 
  813 PONY_API void* pony_alloc_large(pony_ctx_t* ctx, size_t size)
  814 {
  815   pony_assert(ctx->current != NULL);
  816   DTRACE2(HEAP_ALLOC, (uintptr_t)ctx->scheduler, size);
  817 
  818   return ponyint_heap_alloc_large(ctx->current, &ctx->current->heap, size);
  819 }
  820 
  821 PONY_API void* pony_realloc(pony_ctx_t* ctx, void* p, size_t size)
  822 {
  823   pony_assert(ctx->current != NULL);
  824   DTRACE2(HEAP_ALLOC, (uintptr_t)ctx->scheduler, size);
  825 
  826   return ponyint_heap_realloc(ctx->current, &ctx->current->heap, p, size);
  827 }
  828 
  829 PONY_API void* pony_alloc_final(pony_ctx_t* ctx, size_t size)
  830 {
  831   pony_assert(ctx->current != NULL);
  832   DTRACE2(HEAP_ALLOC, (uintptr_t)ctx->scheduler, size);
  833 
  834   return ponyint_heap_alloc_final(ctx->current, &ctx->current->heap, size);
  835 }
  836 
  837 void* pony_alloc_small_final(pony_ctx_t* ctx, uint32_t sizeclass)
  838 {
  839   pony_assert(ctx->current != NULL);
  840   DTRACE2(HEAP_ALLOC, (uintptr_t)ctx->scheduler, HEAP_MIN << sizeclass);
  841 
  842   return ponyint_heap_alloc_small_final(ctx->current, &ctx->current->heap,
  843     sizeclass);
  844 }
  845 
  846 void* pony_alloc_large_final(pony_ctx_t* ctx, size_t size)
  847 {
  848   pony_assert(ctx->current != NULL);
  849   DTRACE2(HEAP_ALLOC, (uintptr_t)ctx->scheduler, size);
  850 
  851   return ponyint_heap_alloc_large_final(ctx->current, &ctx->current->heap,
  852     size);
  853 }
  854 
  855 PONY_API void pony_triggergc(pony_ctx_t* ctx)
  856 {
  857   pony_assert(ctx->current != NULL);
  858   ctx->current->heap.next_gc = 0;
  859 }
  860 
  861 PONY_API void pony_schedule(pony_ctx_t* ctx, pony_actor_t* actor)
  862 {
  863   if(!has_flag(actor, FLAG_UNSCHEDULED) || ponyint_is_muted(actor))
  864     return;
  865 
  866   unset_flag(actor, FLAG_UNSCHEDULED);
  867   ponyint_sched_add(ctx, actor);
  868 }
  869 
  870 PONY_API void pony_unschedule(pony_ctx_t* ctx, pony_actor_t* actor)
  871 {
  872   if(has_flag(actor, FLAG_BLOCKED_SENT))
  873   {
  874     // send unblock if we've sent a block
  875     if(!actor_noblock)
  876       send_unblock(ctx, actor);
  877   }
  878 
  879   set_flag(actor, FLAG_UNSCHEDULED);
  880 }
  881 
  882 PONY_API void pony_become(pony_ctx_t* ctx, pony_actor_t* actor)
  883 {
  884   ctx->current = actor;
  885 }
  886 
  887 PONY_API void pony_poll(pony_ctx_t* ctx)
  888 {
  889   // TODO: this seems like it could allow muted actors to get `ponyint_actor_run`
  890   // which shouldn't be allowed. Fixing might require API changes.
  891   pony_assert(ctx->current != NULL);
  892   ponyint_actor_run(ctx, ctx->current, true);
  893 }
  894 
  895 void ponyint_actor_setoverloaded(pony_actor_t* actor)
  896 {
  897   pony_assert(!ponyint_is_cycle(actor));
  898   set_flag(actor, FLAG_OVERLOADED);
  899   DTRACE1(ACTOR_OVERLOADED, (uintptr_t)actor);
  900 }
  901 
  902 bool ponyint_actor_overloaded(pony_actor_t* actor)
  903 {
  904   return has_flag(actor, FLAG_OVERLOADED);
  905 }
  906 
  907 void ponyint_actor_unsetoverloaded(pony_actor_t* actor)
  908 {
  909   pony_ctx_t* ctx = pony_ctx();
  910   unset_flag(actor, FLAG_OVERLOADED);
  911   DTRACE1(ACTOR_OVERLOADED_CLEARED, (uintptr_t)actor);
  912   if (!has_flag(actor, FLAG_UNDER_PRESSURE))
  913   {
  914     ponyint_sched_start_global_unmute(ctx->scheduler->index, actor);
  915   }
  916 }
  917 
  918 PONY_API void pony_apply_backpressure()
  919 {
  920   pony_ctx_t* ctx = pony_ctx();
  921   set_flag(ctx->current, FLAG_UNDER_PRESSURE);
  922   DTRACE1(ACTOR_UNDER_PRESSURE, (uintptr_t)ctx->current);
  923 }
  924 
  925 PONY_API void pony_release_backpressure()
  926 {
  927   pony_ctx_t* ctx = pony_ctx();
  928   unset_flag(ctx->current, FLAG_UNDER_PRESSURE);
  929   DTRACE1(ACTOR_PRESSURE_RELEASED, (uintptr_t)ctx->current);
  930   if (!has_flag(ctx->current, FLAG_OVERLOADED))
  931     ponyint_sched_start_global_unmute(ctx->scheduler->index, ctx->current);
  932 }
  933 
  934 bool ponyint_triggers_muting(pony_actor_t* actor)
  935 {
  936   return has_flag(actor, FLAG_OVERLOADED) ||
  937     has_flag(actor, FLAG_UNDER_PRESSURE) ||
  938     ponyint_is_muted(actor);
  939 }
  940 
  941 //
  942 // Mute/Unmute/Check mute status functions
  943 //
  944 // For backpressure related muting and unmuting to work correctly, the following
  945 // rules have to be maintained.
  946 //
  947 // 1. Across schedulers, an actor should never been seen as muted when it is not
  948 // in fact muted.
  949 // 2. It's ok for a muted actor to be seen as unmuted in a transient fashion
  950 // across actors
  951 //
  952 // If rule #1 is violated, we might end up deadlocking because an actor was
  953 // muted for sending to an actor that might never be unmuted (because it isn't
  954 // muted). The actor muted actor would continue to remain muted and the actor
  955 // incorrectly seen as muted became actually muted and then unmuted.
  956 //
  957 // If rule #2 is violated, then a muted actor will receive from 1 to a few
  958 // additional messages and the sender won't be muted. As this is a transient
  959 // situtation that should be shortly rectified, there's no harm done.
  960 //
  961 // Our handling of atomic operations in `ponyint_is_muted` and
  962 // `ponyint_unmute_actor` are to assure that rule #1 isn't violated.
  963 // We have far more relaxed usage of atomics in `ponyint_mute_actor` given the
  964 // far more relaxed rule #2.
  965 //
  966 // An actor's `is_muted` field is effectly a `bool` value. However, by using a
  967 // `uint8_t`, we use the same amount of space that we would for a boolean but
  968 // can use more efficient atomic operations. Given how often these methods are
  969 // called (at least once per message send), efficiency is of primary
  970 // importance.
  971 
  972 bool ponyint_is_muted(pony_actor_t* actor)
  973 {
  974   return (atomic_load_explicit(&actor->is_muted, memory_order_relaxed) > 0);
  975 }
  976 
  977 void ponyint_mute_actor(pony_actor_t* actor)
  978 {
  979    uint8_t is_muted = atomic_fetch_add_explicit(&actor->is_muted, 1, memory_order_relaxed);
  980    pony_assert(is_muted == 0);
  981    DTRACE1(ACTOR_MUTED, (uintptr_t)actor);
  982    (void)is_muted;
  983 }
  984 
  985 void ponyint_unmute_actor(pony_actor_t* actor)
  986 {
  987   uint8_t is_muted = atomic_fetch_sub_explicit(&actor->is_muted, 1, memory_order_relaxed);
  988   pony_assert(is_muted == 1);
  989   DTRACE1(ACTOR_UNMUTED, (uintptr_t)actor);
  990   (void)is_muted;
  991 }
  992 
  993 #ifdef USE_MEMTRACK
  994 size_t ponyint_actor_mem_size(pony_actor_t* actor)
  995 {
  996   return actor->type->size;
  997 }
  998 
  999 size_t ponyint_actor_alloc_size(pony_actor_t* actor)
 1000 {
 1001   return ponyint_pool_used_size(actor->type->size);
 1002 }
 1003 
 1004 size_t ponyint_actor_total_mem_size(pony_actor_t* actor)
 1005 {
 1006   // memeory categories:
 1007   //   used - memory allocated that is actively being used by the runtime
 1008   return
 1009       // actor struct size (maybe this shouldn't be counted to avoid double
 1010       // counting since it is counted as part of the scheduler thread mem used?)
 1011       actor->type->size
 1012       // cycle detector memory used (or 0 if not cycle detector)
 1013     + ( ponyint_is_cycle(actor) ? ponyint_cycle_mem_size(actor) : 0)
 1014       // actor heap memory used
 1015     + ponyint_heap_mem_size(&actor->heap)
 1016       // actor gc total memory used
 1017     + ponyint_gc_total_mem_size(&actor->gc)
 1018       // size of stub message when message_q is initialized
 1019     + sizeof(pony_msg_t);
 1020 }
 1021 
 1022 size_t ponyint_actor_total_alloc_size(pony_actor_t* actor)
 1023 {
 1024   // memeory categories:
 1025   //   alloc - memory allocated whether it is actively being used or not
 1026   return
 1027       // allocation for actor struct size (maybe this shouldn't be counted to
 1028       // avoid double counting since it is counted as part of the scheduler
 1029       // thread mem allocated?)
 1030       ponyint_pool_used_size(actor->type->size)
 1031       // cycle detector memory allocated (or 0 if not cycle detector)
 1032     + ( ponyint_is_cycle(actor) ? ponyint_cycle_alloc_size(actor) : 0)
 1033       // actor heap memory allocated
 1034     + ponyint_heap_alloc_size(&actor->heap)
 1035       // actor gc total memory allocated
 1036     + ponyint_gc_total_alloc_size(&actor->gc)
 1037       // allocation of stub message when message_q is initialized
 1038     + POOL_ALLOC_SIZE(pony_msg_t);
 1039 }
 1040 #endif