bench.py (nsq-1.2.0) | : | bench.py (nsq-1.2.1) | ||
---|---|---|---|---|
#!/usr/bin/env python | #!/usr/bin/env python3 | |||
# | ||||
# This script bootstraps an NSQ cluster in EC2 and runs benchmarks. | ||||
# | ||||
# Requires python3 and the following packages: | ||||
# - boto3 | ||||
# - paramiko | ||||
# - tornado | ||||
# | ||||
# AWS authentication is delegated entirely to the boto3 environment, see: | ||||
# | ||||
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.ht | ||||
ml | ||||
# | ||||
# EC2 instances are launched into EC2 Classic, expecting a 'default' security gr | ||||
oup | ||||
# that allows allows SSH (port 22) from 0.0.0.0/0 and an EC2 key pair created | ||||
# (named 'default', but configurable via --ssh-key-name). | ||||
# | ||||
import sys | import sys | |||
import logging | import logging | |||
import time | import time | |||
import datetime | import datetime | |||
import socket | import socket | |||
import warnings | import warnings | |||
import hashlib | import hashlib | |||
import boto.ec2 | import boto3 | |||
import paramiko.client | import paramiko.client | |||
import paramiko.ssh_exception | import paramiko.ssh_exception | |||
import tornado.options | import tornado.options | |||
def ssh_connect_with_retries(host, retries=3, timeout=30): | def ssh_connect_with_retries(host, retries=3, timeout=30): | |||
for i in range(retries): | for i in range(retries): | |||
try: | try: | |||
ssh_client = paramiko.client.SSHClient() | ssh_client = paramiko.client.SSHClient() | |||
ssh_client.set_missing_host_key_policy(paramiko.client.WarningPolicy ()) | ssh_client.set_missing_host_key_policy(paramiko.client.WarningPolicy ()) | |||
ssh_client.connect(host, username='ubuntu', timeout=timeout) | ssh_client.connect(host, username='ubuntu', timeout=timeout) | |||
skipping to change at line 40 | skipping to change at line 58 | |||
chan = transport.open_session() | chan = transport.open_session() | |||
chan.exec_command(cmd) | chan.exec_command(cmd) | |||
return chan | return chan | |||
def ssh_cmd(ssh_client, cmd, timeout=2): | def ssh_cmd(ssh_client, cmd, timeout=2): | |||
transport = ssh_client.get_transport() | transport = ssh_client.get_transport() | |||
chan = transport.open_session() | chan = transport.open_session() | |||
chan.settimeout(timeout) | chan.settimeout(timeout) | |||
chan.exec_command(cmd) | chan.exec_command(cmd) | |||
stdout = '' | stdout = b'' | |||
stderr = '' | stderr = b'' | |||
while True: | while True: | |||
if chan.recv_ready(): | if chan.recv_ready(): | |||
stdout += chan.recv(4096) | stdout += chan.recv(4096) | |||
continue | continue | |||
if chan.recv_stderr_ready(): | if chan.recv_stderr_ready(): | |||
stderr += chan.recv_stderr(4096) | stderr += chan.recv_stderr(4096) | |||
continue | continue | |||
if chan.exit_status_ready(): | if chan.exit_status_ready(): | |||
exit_status = chan.recv_exit_status() | exit_status = chan.recv_exit_status() | |||
break | break | |||
time.sleep(0.1) | time.sleep(0.1) | |||
if exit_status != 0: | if exit_status != 0: | |||
raise Exception('%r' % stderr) | raise Exception('%r' % stderr) | |||
return stdout, stderr | return stdout, stderr | |||
def connect_to_ec2(): | def get_session(): | |||
return boto.ec2.connect_to_region( | return boto3.session.Session(region_name=tornado.options.options.region) | |||
tornado.options.options.region, | ||||
aws_access_key_id=tornado.options.options.access_key, | ||||
aws_secret_access_key=tornado.options.options.secret_key) | ||||
def _bootstrap(addr): | def _bootstrap(addr): | |||
commit = tornado.options.options.commit | commit = tornado.options.options.commit | |||
golang_version = tornado.options.options.golang_version | golang_version = tornado.options.options.golang_version | |||
ssh_client = ssh_connect_with_retries(addr) | ssh_client = ssh_connect_with_retries(addr) | |||
for cmd in [ | for cmd in [ | |||
'wget https://storage.googleapis.com/golang/go%s.linux-amd64.tar.gz' % golang_version, | 'wget https://storage.googleapis.com/golang/go%s.linux-amd64.tar.gz' % golang_version, | |||
'sudo -S tar -C /usr/local -xzf go%s.linux-amd64.tar.gz' % golang_ve rsion, | 'sudo -S tar -C /usr/local -xzf go%s.linux-amd64.tar.gz' % golang_ve rsion, | |||
'sudo -S apt-get update', | 'sudo -S apt-get update', | |||
'sudo -S apt-get -y install git mercurial', | 'sudo -S apt-get -y install git mercurial', | |||
'mkdir -p go/src/github.com/nsqio', | 'mkdir -p go/src/github.com/nsqio', | |||
'cd go/src/github.com/nsqio && git clone https://github.com/nsqio/ns q', | 'cd go/src/github.com/nsqio && git clone https://github.com/nsqio/ns q', | |||
'cd go/src/github.com/nsqio/nsq && git checkout %s' % commit, | 'cd go/src/github.com/nsqio/nsq && git checkout %s' % commit, | |||
'sudo -S curl -s -o /usr/local/bin/gpm \ | 'cd go/src/github.com/nsqio/nsq/apps/nsqd && GO111MODULE=on /usr/loc | |||
https://raw.githubusercontent.com/pote/gpm/v1.2.3/bin/gpm', | al/go/bin/go build', | |||
'sudo -S chmod +x /usr/local/bin/gpm', | 'cd go/src/github.com/nsqio/nsq/bench/bench_writer && GO111MODULE=on | |||
'cd go/src/github.com/nsqio/nsq && \ | /usr/local/go/bin/go build', | |||
GOPATH=/home/ubuntu/go PATH=$PATH:/usr/local/go/bin gpm install' | 'cd go/src/github.com/nsqio/nsq/bench/bench_reader && GO111MODULE=on | |||
, | /usr/local/go/bin/go build', | |||
'cd go/src/github.com/nsqio/nsq/apps/nsqd && \ | ||||
GOPATH=/home/ubuntu/go /usr/local/go/bin/go build', | ||||
'cd go/src/github.com/nsqio/nsq/bench/bench_writer && \ | ||||
GOPATH=/home/ubuntu/go /usr/local/go/bin/go build', | ||||
'cd go/src/github.com/nsqio/nsq/bench/bench_reader && \ | ||||
GOPATH=/home/ubuntu/go /usr/local/go/bin/go build', | ||||
'sudo -S mkdir -p /mnt/nsq', | 'sudo -S mkdir -p /mnt/nsq', | |||
'sudo -S chmod 777 /mnt/nsq']: | 'sudo -S chmod 777 /mnt/nsq']: | |||
ssh_cmd(ssh_client, cmd, timeout=10) | ssh_cmd(ssh_client, cmd, timeout=10) | |||
def bootstrap(): | def bootstrap(): | |||
conn = connect_to_ec2() | session = get_session() | |||
ec2 = session.resource('ec2') | ||||
total_count = tornado.options.options.nsqd_count + tornado.options.options.w orker_count | total_count = tornado.options.options.nsqd_count + tornado.options.options.w orker_count | |||
logging.info('launching %d instances', total_count) | logging.info('launching %d instances', total_count) | |||
run = conn.run_instances( | instances = ec2.create_instances( | |||
tornado.options.options.ami, | ImageId=tornado.options.options.ami, | |||
min_count=total_count, | MinCount=total_count, | |||
max_count=total_count, | MaxCount=total_count, | |||
key_name=tornado.options.options.ssh_key_name, | KeyName=tornado.options.options.ssh_key_name, | |||
instance_type=tornado.options.options.instance_type, | InstanceType=tornado.options.options.instance_type, | |||
security_groups=['default']) | SecurityGroups=['default']) | |||
logging.info('waiting for instances to launch...') | logging.info('waiting for instances to launch...') | |||
while all(i.state != 'running' for i in run.instances): | while any(i.state['Name'] != 'running' for i in instances): | |||
waiting_for = [i.id for i in run.instances if i.state != 'running'] | waiting_for = [i.id for i in instances if i.state['Name'] != 'running'] | |||
logging.info('... sleeping for 5s (waiting for %s)', ', '.join(waiting_f or)) | logging.info('... sleeping for 5s (waiting for %s)', ', '.join(waiting_f or)) | |||
time.sleep(5) | time.sleep(5) | |||
for instance in run.instances: | for instance in instances: | |||
instance.update() | instance.load() | |||
for instance in run.instances: | for instance in instances: | |||
if not instance.tags: | if not instance.tags: | |||
conn.create_tags([instance.id], {'nsq_bench': '1'}) | instance.create_tags(Tags=[{'Key': 'nsq_bench', 'Value': '1'}]) | |||
hosts = [(i.id, i.public_dns_name) for i in run.instances] | ||||
try: | try: | |||
c = 0 | c = 0 | |||
for id, addr in hosts: | for i in instances: | |||
c += 1 | c += 1 | |||
logging.info('(%d) bootstrapping %s (%s)', c, addr, id) | logging.info('(%d) bootstrapping %s (%s)', c, i.public_dns_name, i.i | |||
_bootstrap(addr) | d) | |||
_bootstrap(i.public_dns_name) | ||||
except Exception: | except Exception: | |||
logging.exception('bootstrap failed') | logging.exception('bootstrap failed') | |||
decomm() | decomm() | |||
def run(): | def run(): | |||
hosts = _find_hosts() | instances = _find_instances() | |||
logging.info('launching nsqd on %d host(s)', tornado.options.options.nsqd_co unt) | logging.info('launching nsqd on %d host(s)', tornado.options.options.nsqd_co unt) | |||
nsqd_chans = [] | nsqd_chans = [] | |||
nsqd_hosts = hosts[:tornado.options.options.nsqd_count] | nsqd_hosts = instances[:tornado.options.options.nsqd_count] | |||
for id, addr in nsqd_hosts: | for instance in nsqd_hosts: | |||
try: | try: | |||
ssh_client = ssh_connect_with_retries(addr) | ssh_client = ssh_connect_with_retries(instance.public_dns_name) | |||
for cmd in [ | for cmd in [ | |||
'sudo -S pkill -f nsqd', | 'sudo -S pkill -f nsqd', | |||
'sudo -S rm -f /mnt/nsq/*.dat', | 'sudo -S rm -f /mnt/nsq/*.dat', | |||
'GOMAXPROCS=32 ./go/src/github.com/nsqio/nsq/apps/nsqd/nsqd \ | 'GOMAXPROCS=32 ./go/src/github.com/nsqio/nsq/apps/nsqd/nsqd \ | |||
--data-path=/mnt/nsq --mem-queue-size=10000000 --max-rdy | --data-path=/mnt/nsq \ | |||
-count=%s' % ( | --mem-queue-size=10000000 \ | |||
tornado.options.options.rdy | --max-rdy-count=%s' % (tornado.options.options.rdy)]: | |||
)]: | ||||
nsqd_chans.append((ssh_client, ssh_cmd_async(ssh_client, cmd))) | nsqd_chans.append((ssh_client, ssh_cmd_async(ssh_client, cmd))) | |||
except Exception: | except Exception: | |||
logging.exception('failed') | logging.exception('failed') | |||
nsqd_tcp_addrs = [h[1] for h in nsqd_hosts] | nsqd_tcp_addrs = [i.public_dns_name for i in nsqd_hosts] | |||
dt = datetime.datetime.utcnow() | dt = datetime.datetime.utcnow() | |||
deadline = dt + datetime.timedelta(seconds=30) | deadline = dt + datetime.timedelta(seconds=30) | |||
logging.info('launching %d producer(s) on %d host(s)', | logging.info('launching %d producer(s) on %d host(s)', | |||
tornado.options.options.nsqd_count * tornado.options.options.wo rker_count, | tornado.options.options.nsqd_count * tornado.options.options.wo rker_count, | |||
tornado.options.options.worker_count) | tornado.options.options.worker_count) | |||
worker_chans = [] | worker_chans = [] | |||
producer_hosts = hosts[tornado.options.options.nsqd_count:] | producer_hosts = instances[tornado.options.options.nsqd_count:] | |||
for id, addr in producer_hosts: | for instance in producer_hosts: | |||
for nsqd_tcp_addr in nsqd_tcp_addrs: | for nsqd_tcp_addr in nsqd_tcp_addrs: | |||
topic = hashlib.md5(addr).hexdigest() | topic = hashlib.md5(instance.public_dns_name.encode('utf-8')).hexdig est() | |||
try: | try: | |||
ssh_client = ssh_connect_with_retries(addr) | ssh_client = ssh_connect_with_retries(instance.public_dns_name) | |||
for cmd in [ | for cmd in [ | |||
'GOMAXPROCS=2 \ | 'GOMAXPROCS=2 \ | |||
./go/src/github.com/nsqio/nsq/bench/bench_writer/ben ch_writer \ | ./go/src/github.com/nsqio/nsq/bench/bench_writer/ben ch_writer \ | |||
--topic=%s --nsqd-tcp-address=%s:4150 --deadline=\'% s\' --size=%d' % ( | --topic=%s --nsqd-tcp-address=%s:4150 --deadline=\'% s\' --size=%d' % ( | |||
topic, nsqd_tcp_addr, deadline.strftime('%Y-%m-%d %H :%M:%S'), | topic, nsqd_tcp_addr, deadline.strftime('%Y-%m-%d %H :%M:%S'), | |||
tornado.options.options.msg_size)]: | tornado.options.options.msg_size)]: | |||
worker_chans.append((ssh_client, ssh_cmd_async(ssh_client, c md))) | worker_chans.append((ssh_client, ssh_cmd_async(ssh_client, c md))) | |||
except Exception: | except Exception: | |||
logging.exception('failed') | logging.exception('failed') | |||
if tornado.options.options.mode == 'pubsub': | if tornado.options.options.mode == 'pubsub': | |||
logging.info('launching %d consumer(s) on %d host(s)', | logging.info('launching %d consumer(s) on %d host(s)', | |||
tornado.options.options.nsqd_count * tornado.options.option s.worker_count, | tornado.options.options.nsqd_count * tornado.options.option s.worker_count, | |||
tornado.options.options.worker_count) | tornado.options.options.worker_count) | |||
consumer_hosts = hosts[tornado.options.options.nsqd_count:] | consumer_hosts = instances[tornado.options.options.nsqd_count:] | |||
for id, addr in consumer_hosts: | for instance in consumer_hosts: | |||
for nsqd_tcp_addr in nsqd_tcp_addrs: | for nsqd_tcp_addr in nsqd_tcp_addrs: | |||
topic = hashlib.md5(addr).hexdigest() | topic = hashlib.md5(instance.public_dns_name.encode('utf-8')).he xdigest() | |||
try: | try: | |||
ssh_client = ssh_connect_with_retries(addr) | ssh_client = ssh_connect_with_retries(instance.public_dns_na me) | |||
for cmd in [ | for cmd in [ | |||
'GOMAXPROCS=8 \ | 'GOMAXPROCS=8 \ | |||
./go/src/github.com/nsqio/nsq/bench/bench_reader /bench_reader \ | ./go/src/github.com/nsqio/nsq/bench/bench_reader /bench_reader \ | |||
--topic=%s --nsqd-tcp-address=%s:4150 --deadline =\'%s\' --size=%d \ | --topic=%s --nsqd-tcp-address=%s:4150 --deadline =\'%s\' --size=%d \ | |||
--rdy=%d' % ( | --rdy=%d' % ( | |||
topic, nsqd_tcp_addr, deadline.strftime('%Y-%m-% d %H:%M:%S'), | topic, nsqd_tcp_addr, deadline.strftime('%Y-%m-% d %H:%M:%S'), | |||
tornado.options.options.msg_size, tornado.option s.options.rdy)]: | tornado.options.options.msg_size, tornado.option s.options.rdy)]: | |||
worker_chans.append((ssh_client, ssh_cmd_async(ssh_clien t, cmd))) | worker_chans.append((ssh_client, ssh_cmd_async(ssh_clien t, cmd))) | |||
except Exception: | except Exception: | |||
logging.exception('failed') | logging.exception('failed') | |||
skipping to change at line 219 | skipping to change at line 226 | |||
'ops': [] | 'ops': [] | |||
} | } | |||
} | } | |||
while worker_chans: | while worker_chans: | |||
for ssh_client, chan in worker_chans[:]: | for ssh_client, chan in worker_chans[:]: | |||
if chan.recv_ready(): | if chan.recv_ready(): | |||
sys.stdout.write(chan.recv(4096)) | sys.stdout.write(chan.recv(4096)) | |||
sys.stdout.flush() | sys.stdout.flush() | |||
continue | continue | |||
if chan.recv_stderr_ready(): | if chan.recv_stderr_ready(): | |||
line = chan.recv_stderr(4096) | line = chan.recv_stderr(4096).decode('utf-8') | |||
if 'duration:' in line: | if 'duration:' in line: | |||
kind = line.split(' ')[0][1:-1] | kind = line.split(' ')[0][1:-1] | |||
parts = line.rsplit('duration:')[1].split('-') | parts = line.rsplit('duration:')[1].split('-') | |||
stats[kind]['durations'].append(float(parts[0].strip()[:-1]) ) | stats[kind]['durations'].append(float(parts[0].strip()[:-1]) ) | |||
stats[kind]['mbytes'].append(float(parts[1].strip()[:-4])) | stats[kind]['mbytes'].append(float(parts[1].strip()[:-4])) | |||
stats[kind]['ops'].append(float(parts[2].strip()[:-5])) | stats[kind]['ops'].append(float(parts[2].strip()[:-5])) | |||
sys.stdout.write(line) | sys.stdout.write(line) | |||
sys.stdout.flush() | sys.stdout.flush() | |||
continue | continue | |||
if chan.exit_status_ready(): | if chan.exit_status_ready(): | |||
skipping to change at line 248 | skipping to change at line 255 | |||
total_mb = sum(data['mbytes']) | total_mb = sum(data['mbytes']) | |||
total_ops = sum(data['ops']) | total_ops = sum(data['ops']) | |||
logging.info('[%s] %fs - %fmb/s - %fops/s - %fus/op', | logging.info('[%s] %fs - %fmb/s - %fops/s - %fus/op', | |||
kind, max_duration, total_mb, total_ops, | kind, max_duration, total_mb, total_ops, | |||
max_duration / total_ops * 1000 * 1000) | max_duration / total_ops * 1000 * 1000) | |||
for ssh_client, chan in nsqd_chans: | for ssh_client, chan in nsqd_chans: | |||
chan.close() | chan.close() | |||
def _find_hosts(): | def _find_instances(): | |||
conn = connect_to_ec2() | session = get_session() | |||
reservations = conn.get_all_instances() | ec2 = session.resource('ec2') | |||
instances = [inst for res in reservations for inst in res.instances] | return [i for i in ec2.instances.all() if | |||
i.state['Name'] == 'running' and any(t['Key'] == 'nsq_bench' for t i | ||||
hosts = [] | n i.tags)] | |||
for instance in instances: | ||||
if not instance.tags or instance.state != 'running': | ||||
continue | ||||
if 'nsq_bench' in instance.tags: | ||||
hosts.append((instance.id, instance.public_dns_name)) | ||||
return hosts | ||||
def decomm(): | def decomm(): | |||
conn = connect_to_ec2() | instances = _find_instances() | |||
hosts = _find_hosts() | logging.info('terminating instances %s' % ','.join(i.id for i in instances)) | |||
host_ids = [h[0] for h in hosts] | for instance in instances: | |||
logging.info('terminating instances %s' % ','.join(host_ids)) | instance.terminate() | |||
conn.terminate_instances(host_ids) | ||||
if __name__ == '__main__': | if __name__ == '__main__': | |||
tornado.options.define('region', type=str, default='us-east-1', | tornado.options.define('region', type=str, default='us-east-1', | |||
help='EC2 region to launch instances') | help='EC2 region to launch instances') | |||
tornado.options.define('nsqd_count', type=int, default=3, | tornado.options.define('nsqd_count', type=int, default=3, | |||
help='how many nsqd instances to launch') | help='how many nsqd instances to launch') | |||
tornado.options.define('worker_count', type=int, default=3, | tornado.options.define('worker_count', type=int, default=3, | |||
help='how many worker instances to launch') | help='how many worker instances to launch') | |||
tornado.options.define('access_key', type=str, default='', | # ubuntu 18.04 HVM instance store us-east-1 | |||
help='AWS account access key') | tornado.options.define('ami', type=str, default='ami-0938f2289b3fa3f5b', | |||
tornado.options.define('secret_key', type=str, default='', | ||||
help='AWS account secret key') | ||||
tornado.options.define('ami', type=str, default='ami-48fd2120', | ||||
help='AMI ID') | help='AMI ID') | |||
tornado.options.define('ssh_key_name', type=str, default='default', | tornado.options.define('ssh_key_name', type=str, default='default', | |||
help='SSH key name') | help='SSH key name') | |||
tornado.options.define('instance_type', type=str, default='c3.2xlarge', | tornado.options.define('instance_type', type=str, default='c3.2xlarge', | |||
help='EC2 instance type') | help='EC2 instance type') | |||
tornado.options.define('msg_size', type=int, default=200, | tornado.options.define('msg_size', type=int, default=200, | |||
help='size of message') | help='size of message') | |||
tornado.options.define('rdy', type=int, default=10000, | tornado.options.define('rdy', type=int, default=10000, | |||
help='RDY count to use for bench_reader') | help='RDY count to use for bench_reader') | |||
tornado.options.define('mode', type=str, default='pubsub', | tornado.options.define('mode', type=str, default='pubsub', | |||
help='the benchmark mode (pub, pubsub)') | help='the benchmark mode (pub, pubsub)') | |||
tornado.options.define('commit', type=str, default='master', | tornado.options.define('commit', type=str, default='master', | |||
help='the git commit') | help='the git commit') | |||
tornado.options.define('golang_version', type=str, default='1.5.1', | tornado.options.define('golang_version', type=str, default='1.14.3', | |||
help='the go version') | help='the go version') | |||
tornado.options.parse_command_line() | tornado.options.parse_command_line() | |||
logging.getLogger('paramiko').setLevel(logging.WARNING) | logging.getLogger('paramiko').setLevel(logging.WARNING) | |||
warnings.simplefilter('ignore') | warnings.simplefilter('ignore') | |||
cmd_name = sys.argv[-1] | cmd_name = sys.argv[-1] | |||
cmd_map = { | cmd_map = { | |||
'bootstrap': bootstrap, | 'bootstrap': bootstrap, | |||
'run': run, | 'run': run, | |||
End of changes. 29 change blocks. | ||||
82 lines changed or deleted | 82 lines changed or added |