"Fossies" - the Fresh Open Source Software Archive

Member "ponyc-0.33.0/examples/fan-in/main.pony" (1 Nov 2019, 8332 Bytes) of package /linux/misc/ponyc-0.33.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Pony source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file.

    1 """
    2 A microbenchmark for testing thundering herd/fan-in type workloads and how
    3 backpressure impacts them in the Pony runtime. Based on `message-ubench` and
    4 the description in issue #2980 to reproduce the thundering herd/fan-in behavior
    5 in issue #2980.
    6 
    7 The topology of this microbenchmark is the following:
    8 
    9   N `Sender` actors => M `Analyzer` actors => 1 `Receiver` actor
   10 
   11 The logic is as follows:
   12 
   13 * The `Sender` actors send messages as fast as they can to the `Analyzer`
   14   actors. The number of `Sender` actors is controlled by the `--senders` cli
   15   argument.
   16 * The `Analyzer` actors receive messages from `Sender` actors and increment a
   17   count. They only send messages to the `Receiver` actor when a tick fires. The
   18   number of `Analyzer` actors is controlled by the `--analyzers` cli argument.
   19 * The `Receiver` actor receives messages from the `Analyzer` actors and does
   20   some "work" (simulated by `usleep`). The amount of "work" is controlled by the
   21   `--receiver-workload` cli argument.
   22 * The `Coordinator` actor manages when ticks get fired using a timer and when a
   23   tick is fired it asks all `Analyzer` actors for a status. If an `Analyzer`
   24   actor is muted due to sending to the `Receiver` actor, it will not respond
   25   promptly and the reports printed by the `Coordinator` actor will go up and
   26   down as backpressure kicks in and out when the `Receiver` actor falls behind
   27   and catches up.
   28 """
   29 
   30 use "assert"
   31 use "cli"
   32 use "collections"
   33 use "random"
   34 use "time"
   35 
   36 actor Main
   37   new create(env: Env) =>
   38     """
   39     Parse the command line arguments, then create a SyncLeader actor
   40     and an interval timer that will coordinate all further computation.
   41     """
   42     try
   43       let cs =
   44         CommandSpec.leaf("do",
   45           "A message-passing micro-benchmark for the Pony runtime",
   46           [
   47             OptionSpec.i64("senders",
   48               "Number of sender actors"
   49               where default' = 100)
   50             OptionSpec.i64("analyzers",
   51               "Number of analyzer actors"
   52               where default' = 1000)
   53             OptionSpec.i64("analyzer-interval",
   54               "How often analyzers send messages to receiver in centiseconds (10 centiseconds = 1 second)"
   55               where default' = 100)
   56             OptionSpec.i64("analyzer-report-count",
   57               "Number of times analyzers send messages to receiver before shutting down, 0 is infinite"
   58               where default' = 10)
   59             OptionSpec.i64("receiver-workload",
   60               "Number of microseconds the receiver takes to process each message it receives"
   61               where default' = 10000)
   62           ],
   63           [
   64             ArgSpec.string_seq("", "")
   65           ])?.>add_help()?
   66       let cmd =
   67       match CommandParser(cs).parse(env.args, env.vars)
   68       | let c: Command => c
   69       | let ch: CommandHelp =>
   70         ch.print_help(env.out)
   71         error
   72       | let se: SyntaxError =>
   73         env.out.print(se.string())
   74         error
   75       end
   76 
   77       let num_senders = cmd.option("senders").i64()
   78       let num_analyzers = cmd.option("analyzers").i64()
   79       let analyzer_interval = cmd.option("analyzer-interval").i64()
   80       let analyzer_report_count = cmd.option("analyzer-report-count").i64().u64()
   81       let receiver_workload = cmd.option("receiver-workload").i64().u64()
   82 
   83       env.out.print("# " +
   84         "senders " + num_senders.string() + ", " +
   85         "analyzers " + num_analyzers.string() + ", " +
   86         "analyzer-interval " + analyzer_interval.string() + ", " +
   87         "analyzer-report-count " + analyzer_report_count.string() + ", " +
   88         "receiver-workload " + receiver_workload.string())
   89       env.out.print("time,run-ns,rate")
   90 
   91       let coordinator = Coordinator(env,
   92         num_senders.i32(), num_analyzers.i32(), analyzer_report_count, receiver_workload)
   93 
   94       let interval: U64 = (analyzer_interval.u64() * 1_000_000_000) / 10
   95       let timers = Timers
   96       let timer = Timer(Tick(env, coordinator, analyzer_report_count), interval, interval)
   97       timers(consume timer)
   98     else
   99       env.exitcode(1)
  100     end
  101 
  102 
  103 actor Coordinator
  104   let _receiver: Receiver
  105   let _analyzers: Array[Analyzer] val
  106   let _senders: Array[Sender] val
  107   var _current_t: I64 = 0
  108   var _last_t: I64 = 0
  109   let _set_analyzers: Map[I64, (U64, U64)]
  110   let _num_analyzers: U64
  111   let _env: Env
  112   var _done: Bool = false
  113 
  114 
  115   new create(env: Env, num_senders: I32, num_analyzers: I32, analyzer_report_count: U64, receiver_workload: U64) =>
  116     _receiver = Receiver(receiver_workload)
  117     _set_analyzers = Map[I64, (U64, U64)].create()
  118     _num_analyzers = num_analyzers.u64()
  119     _env = env
  120 
  121     var i: I32 = 0
  122     let analyzers: Array[Analyzer] iso = recover Array[Analyzer](num_analyzers.usize()) end
  123 
  124     while (i < num_analyzers) do
  125       analyzers.push(Analyzer(_receiver))
  126       i = i + 1
  127     end
  128 
  129     _analyzers = consume analyzers
  130 
  131 
  132     i = 0
  133     let senders: Array[Sender] iso = recover Array[Sender](num_senders.usize()) end
  134 
  135     while (i < num_senders) do
  136       senders.push(Sender(_analyzers))
  137       i = i + 1
  138     end
  139 
  140     _senders = consume senders
  141 
  142     (let t_s: I64, let t_ns: I64) = Time.now()
  143     _last_t = to_ns(t_s, t_ns)
  144     _current_t = _last_t
  145 
  146   be tick_fired(done: Bool, tick_count: U64) =>
  147     _last_t = _current_t
  148 
  149     (let t_s: I64, let t_ns: I64) = Time.now()
  150     _current_t = to_ns(t_s, t_ns)
  151 
  152     for analyzer in _analyzers.values() do
  153       analyzer.tick_fired(this, _current_t, _last_t)
  154     end
  155 
  156     if done then
  157       for sender in _senders.values() do
  158         sender.done()
  159       end
  160       _done = done
  161     end
  162 
  163   fun to_ns(t_s: I64, t_ns: I64): I64 =>
  164     (t_s * 1_000_000_000) + t_ns
  165 
  166 
  167   be msg_from_analyzer(a: Analyzer, num_msgs: U64, ts: I64, old_ts: I64) =>
  168     (var num_received, var total_msgs) = 
  169       try
  170         _set_analyzers(ts)?
  171       else
  172         (0, 0)
  173       end
  174 
  175     num_received = num_received + 1
  176     total_msgs = total_msgs + num_msgs
  177 
  178     _set_analyzers(ts) = (num_received, total_msgs)
  179 
  180     if num_received == _num_analyzers then
  181       let run_ns: I64 = ts - old_ts
  182       let rate: I64 = (total_msgs.i64() * 1_000_000_000) / run_ns
  183       _env.out.print(ts.string() + "," + run_ns.string() + "," + rate.string())
  184 
  185       if _done and (ts == _current_t) then
  186         _env.out.print("Done with message sending... Waiting for Receiver to work through its backlog...")
  187       end
  188 
  189       try
  190         _set_analyzers.remove(ts)?
  191       end
  192     end
  193 
  194 
  195 actor Receiver
  196   let _workload: U32
  197 
  198   new create(workload: U64) =>
  199     _workload = workload.u32()
  200 
  201   be msg_from_analyzer() =>
  202     ifdef windows then
  203       // There is no usleep() on Windows
  204       var countdown: I64 = -10 * _workload.i64()
  205       let timer: USize = @CreateWaitableTimerW[USize](USize(0), USize(1), USize(0))
  206       @SetWaitableTimer[USize](timer, addressof countdown, I32(0), USize(0), USize(0), USize(0))
  207       @WaitForSingleObject[USize](timer, U32(0xFFFFFFFF))
  208       @CloseHandle[USize](timer)
  209     else
  210       @usleep[I32](_workload)
  211     end
  212 
  213 
  214 
  215 actor Analyzer
  216   var _msgs_received: U64 = 0
  217   let _receiver: Receiver
  218 
  219   new create(receiver: Receiver) =>
  220     _receiver = receiver
  221 
  222   be msg_from_sender() =>
  223     _msgs_received = _msgs_received + 1
  224 
  225   be tick_fired(coordinator: Coordinator, ts: I64, old_ts: I64) =>
  226     coordinator.msg_from_analyzer(this, _msgs_received, ts, old_ts)
  227     _receiver.msg_from_analyzer()
  228     _msgs_received = 0
  229 
  230 
  231 
  232 actor Sender
  233   let _analyzers: Array[Analyzer] val
  234   var _done: Bool = false
  235   let _rand: Rand = Rand()
  236 
  237   new create(analyzers: Array[Analyzer] val) =>
  238     _analyzers = analyzers
  239     send_msgs()
  240 
  241   be send_msgs() =>
  242     try
  243       _analyzers(_rand.int_unbiased[USize](_analyzers.size()))?.msg_from_sender()
  244     else
  245       @printf[I32]("BBBBAAADDDD\n".cstring())
  246     end
  247 
  248     if not _done then
  249       send_msgs()
  250     end
  251 
  252   be done() =>
  253     _done = true
  254 
  255 
  256 
  257 class Tick is TimerNotify
  258   let _env: Env
  259   let _coordinator: Coordinator
  260   let _report_count: U64
  261   var _tick_count: U64 = 0
  262 
  263   new iso create(env: Env, coordinator: Coordinator, report_count: U64) =>
  264     _env = env
  265     _coordinator = coordinator
  266     _report_count = report_count
  267 
  268     fun ref apply(timer: Timer, count: U64): Bool =>
  269       _tick_count = _tick_count + count
  270       let done = (_report_count > 0) and (_tick_count >= _report_count)
  271       _coordinator.tick_fired(done, _tick_count)
  272       not (done)