"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "bench/bench.py" between
nsq-1.2.0.tar.gz and nsq-1.2.1.tar.gz

About: nsq is a realtime distributed and and decentralized messaging platform.

bench.py  (nsq-1.2.0):bench.py  (nsq-1.2.1)
#!/usr/bin/env python #!/usr/bin/env python3
#
# This script bootstraps an NSQ cluster in EC2 and runs benchmarks.
#
# Requires python3 and the following packages:
# - boto3
# - paramiko
# - tornado
#
# AWS authentication is delegated entirely to the boto3 environment, see:
#
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.ht
ml
#
# EC2 instances are launched into EC2 Classic, expecting a 'default' security gr
oup
# that allows allows SSH (port 22) from 0.0.0.0/0 and an EC2 key pair created
# (named 'default', but configurable via --ssh-key-name).
#
import sys import sys
import logging import logging
import time import time
import datetime import datetime
import socket import socket
import warnings import warnings
import hashlib import hashlib
import boto.ec2 import boto3
import paramiko.client import paramiko.client
import paramiko.ssh_exception import paramiko.ssh_exception
import tornado.options import tornado.options
def ssh_connect_with_retries(host, retries=3, timeout=30): def ssh_connect_with_retries(host, retries=3, timeout=30):
for i in range(retries): for i in range(retries):
try: try:
ssh_client = paramiko.client.SSHClient() ssh_client = paramiko.client.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.client.WarningPolicy ()) ssh_client.set_missing_host_key_policy(paramiko.client.WarningPolicy ())
ssh_client.connect(host, username='ubuntu', timeout=timeout) ssh_client.connect(host, username='ubuntu', timeout=timeout)
skipping to change at line 40 skipping to change at line 58
chan = transport.open_session() chan = transport.open_session()
chan.exec_command(cmd) chan.exec_command(cmd)
return chan return chan
def ssh_cmd(ssh_client, cmd, timeout=2): def ssh_cmd(ssh_client, cmd, timeout=2):
transport = ssh_client.get_transport() transport = ssh_client.get_transport()
chan = transport.open_session() chan = transport.open_session()
chan.settimeout(timeout) chan.settimeout(timeout)
chan.exec_command(cmd) chan.exec_command(cmd)
stdout = '' stdout = b''
stderr = '' stderr = b''
while True: while True:
if chan.recv_ready(): if chan.recv_ready():
stdout += chan.recv(4096) stdout += chan.recv(4096)
continue continue
if chan.recv_stderr_ready(): if chan.recv_stderr_ready():
stderr += chan.recv_stderr(4096) stderr += chan.recv_stderr(4096)
continue continue
if chan.exit_status_ready(): if chan.exit_status_ready():
exit_status = chan.recv_exit_status() exit_status = chan.recv_exit_status()
break break
time.sleep(0.1) time.sleep(0.1)
if exit_status != 0: if exit_status != 0:
raise Exception('%r' % stderr) raise Exception('%r' % stderr)
return stdout, stderr return stdout, stderr
def connect_to_ec2(): def get_session():
return boto.ec2.connect_to_region( return boto3.session.Session(region_name=tornado.options.options.region)
tornado.options.options.region,
aws_access_key_id=tornado.options.options.access_key,
aws_secret_access_key=tornado.options.options.secret_key)
def _bootstrap(addr): def _bootstrap(addr):
commit = tornado.options.options.commit commit = tornado.options.options.commit
golang_version = tornado.options.options.golang_version golang_version = tornado.options.options.golang_version
ssh_client = ssh_connect_with_retries(addr) ssh_client = ssh_connect_with_retries(addr)
for cmd in [ for cmd in [
'wget https://storage.googleapis.com/golang/go%s.linux-amd64.tar.gz' % golang_version, 'wget https://storage.googleapis.com/golang/go%s.linux-amd64.tar.gz' % golang_version,
'sudo -S tar -C /usr/local -xzf go%s.linux-amd64.tar.gz' % golang_ve rsion, 'sudo -S tar -C /usr/local -xzf go%s.linux-amd64.tar.gz' % golang_ve rsion,
'sudo -S apt-get update', 'sudo -S apt-get update',
'sudo -S apt-get -y install git mercurial', 'sudo -S apt-get -y install git mercurial',
'mkdir -p go/src/github.com/nsqio', 'mkdir -p go/src/github.com/nsqio',
'cd go/src/github.com/nsqio && git clone https://github.com/nsqio/ns q', 'cd go/src/github.com/nsqio && git clone https://github.com/nsqio/ns q',
'cd go/src/github.com/nsqio/nsq && git checkout %s' % commit, 'cd go/src/github.com/nsqio/nsq && git checkout %s' % commit,
'sudo -S curl -s -o /usr/local/bin/gpm \ 'cd go/src/github.com/nsqio/nsq/apps/nsqd && GO111MODULE=on /usr/loc
https://raw.githubusercontent.com/pote/gpm/v1.2.3/bin/gpm', al/go/bin/go build',
'sudo -S chmod +x /usr/local/bin/gpm', 'cd go/src/github.com/nsqio/nsq/bench/bench_writer && GO111MODULE=on
'cd go/src/github.com/nsqio/nsq && \ /usr/local/go/bin/go build',
GOPATH=/home/ubuntu/go PATH=$PATH:/usr/local/go/bin gpm install' 'cd go/src/github.com/nsqio/nsq/bench/bench_reader && GO111MODULE=on
, /usr/local/go/bin/go build',
'cd go/src/github.com/nsqio/nsq/apps/nsqd && \
GOPATH=/home/ubuntu/go /usr/local/go/bin/go build',
'cd go/src/github.com/nsqio/nsq/bench/bench_writer && \
GOPATH=/home/ubuntu/go /usr/local/go/bin/go build',
'cd go/src/github.com/nsqio/nsq/bench/bench_reader && \
GOPATH=/home/ubuntu/go /usr/local/go/bin/go build',
'sudo -S mkdir -p /mnt/nsq', 'sudo -S mkdir -p /mnt/nsq',
'sudo -S chmod 777 /mnt/nsq']: 'sudo -S chmod 777 /mnt/nsq']:
ssh_cmd(ssh_client, cmd, timeout=10) ssh_cmd(ssh_client, cmd, timeout=10)
def bootstrap(): def bootstrap():
conn = connect_to_ec2() session = get_session()
ec2 = session.resource('ec2')
total_count = tornado.options.options.nsqd_count + tornado.options.options.w orker_count total_count = tornado.options.options.nsqd_count + tornado.options.options.w orker_count
logging.info('launching %d instances', total_count) logging.info('launching %d instances', total_count)
run = conn.run_instances( instances = ec2.create_instances(
tornado.options.options.ami, ImageId=tornado.options.options.ami,
min_count=total_count, MinCount=total_count,
max_count=total_count, MaxCount=total_count,
key_name=tornado.options.options.ssh_key_name, KeyName=tornado.options.options.ssh_key_name,
instance_type=tornado.options.options.instance_type, InstanceType=tornado.options.options.instance_type,
security_groups=['default']) SecurityGroups=['default'])
logging.info('waiting for instances to launch...') logging.info('waiting for instances to launch...')
while all(i.state != 'running' for i in run.instances): while any(i.state['Name'] != 'running' for i in instances):
waiting_for = [i.id for i in run.instances if i.state != 'running'] waiting_for = [i.id for i in instances if i.state['Name'] != 'running']
logging.info('... sleeping for 5s (waiting for %s)', ', '.join(waiting_f or)) logging.info('... sleeping for 5s (waiting for %s)', ', '.join(waiting_f or))
time.sleep(5) time.sleep(5)
for instance in run.instances: for instance in instances:
instance.update() instance.load()
for instance in run.instances: for instance in instances:
if not instance.tags: if not instance.tags:
conn.create_tags([instance.id], {'nsq_bench': '1'}) instance.create_tags(Tags=[{'Key': 'nsq_bench', 'Value': '1'}])
hosts = [(i.id, i.public_dns_name) for i in run.instances]
try: try:
c = 0 c = 0
for id, addr in hosts: for i in instances:
c += 1 c += 1
logging.info('(%d) bootstrapping %s (%s)', c, addr, id) logging.info('(%d) bootstrapping %s (%s)', c, i.public_dns_name, i.i
_bootstrap(addr) d)
_bootstrap(i.public_dns_name)
except Exception: except Exception:
logging.exception('bootstrap failed') logging.exception('bootstrap failed')
decomm() decomm()
def run(): def run():
hosts = _find_hosts() instances = _find_instances()
logging.info('launching nsqd on %d host(s)', tornado.options.options.nsqd_co unt) logging.info('launching nsqd on %d host(s)', tornado.options.options.nsqd_co unt)
nsqd_chans = [] nsqd_chans = []
nsqd_hosts = hosts[:tornado.options.options.nsqd_count] nsqd_hosts = instances[:tornado.options.options.nsqd_count]
for id, addr in nsqd_hosts: for instance in nsqd_hosts:
try: try:
ssh_client = ssh_connect_with_retries(addr) ssh_client = ssh_connect_with_retries(instance.public_dns_name)
for cmd in [ for cmd in [
'sudo -S pkill -f nsqd', 'sudo -S pkill -f nsqd',
'sudo -S rm -f /mnt/nsq/*.dat', 'sudo -S rm -f /mnt/nsq/*.dat',
'GOMAXPROCS=32 ./go/src/github.com/nsqio/nsq/apps/nsqd/nsqd \ 'GOMAXPROCS=32 ./go/src/github.com/nsqio/nsq/apps/nsqd/nsqd \
--data-path=/mnt/nsq --mem-queue-size=10000000 --max-rdy --data-path=/mnt/nsq \
-count=%s' % ( --mem-queue-size=10000000 \
tornado.options.options.rdy --max-rdy-count=%s' % (tornado.options.options.rdy)]:
)]:
nsqd_chans.append((ssh_client, ssh_cmd_async(ssh_client, cmd))) nsqd_chans.append((ssh_client, ssh_cmd_async(ssh_client, cmd)))
except Exception: except Exception:
logging.exception('failed') logging.exception('failed')
nsqd_tcp_addrs = [h[1] for h in nsqd_hosts] nsqd_tcp_addrs = [i.public_dns_name for i in nsqd_hosts]
dt = datetime.datetime.utcnow() dt = datetime.datetime.utcnow()
deadline = dt + datetime.timedelta(seconds=30) deadline = dt + datetime.timedelta(seconds=30)
logging.info('launching %d producer(s) on %d host(s)', logging.info('launching %d producer(s) on %d host(s)',
tornado.options.options.nsqd_count * tornado.options.options.wo rker_count, tornado.options.options.nsqd_count * tornado.options.options.wo rker_count,
tornado.options.options.worker_count) tornado.options.options.worker_count)
worker_chans = [] worker_chans = []
producer_hosts = hosts[tornado.options.options.nsqd_count:] producer_hosts = instances[tornado.options.options.nsqd_count:]
for id, addr in producer_hosts: for instance in producer_hosts:
for nsqd_tcp_addr in nsqd_tcp_addrs: for nsqd_tcp_addr in nsqd_tcp_addrs:
topic = hashlib.md5(addr).hexdigest() topic = hashlib.md5(instance.public_dns_name.encode('utf-8')).hexdig est()
try: try:
ssh_client = ssh_connect_with_retries(addr) ssh_client = ssh_connect_with_retries(instance.public_dns_name)
for cmd in [ for cmd in [
'GOMAXPROCS=2 \ 'GOMAXPROCS=2 \
./go/src/github.com/nsqio/nsq/bench/bench_writer/ben ch_writer \ ./go/src/github.com/nsqio/nsq/bench/bench_writer/ben ch_writer \
--topic=%s --nsqd-tcp-address=%s:4150 --deadline=\'% s\' --size=%d' % ( --topic=%s --nsqd-tcp-address=%s:4150 --deadline=\'% s\' --size=%d' % (
topic, nsqd_tcp_addr, deadline.strftime('%Y-%m-%d %H :%M:%S'), topic, nsqd_tcp_addr, deadline.strftime('%Y-%m-%d %H :%M:%S'),
tornado.options.options.msg_size)]: tornado.options.options.msg_size)]:
worker_chans.append((ssh_client, ssh_cmd_async(ssh_client, c md))) worker_chans.append((ssh_client, ssh_cmd_async(ssh_client, c md)))
except Exception: except Exception:
logging.exception('failed') logging.exception('failed')
if tornado.options.options.mode == 'pubsub': if tornado.options.options.mode == 'pubsub':
logging.info('launching %d consumer(s) on %d host(s)', logging.info('launching %d consumer(s) on %d host(s)',
tornado.options.options.nsqd_count * tornado.options.option s.worker_count, tornado.options.options.nsqd_count * tornado.options.option s.worker_count,
tornado.options.options.worker_count) tornado.options.options.worker_count)
consumer_hosts = hosts[tornado.options.options.nsqd_count:] consumer_hosts = instances[tornado.options.options.nsqd_count:]
for id, addr in consumer_hosts: for instance in consumer_hosts:
for nsqd_tcp_addr in nsqd_tcp_addrs: for nsqd_tcp_addr in nsqd_tcp_addrs:
topic = hashlib.md5(addr).hexdigest() topic = hashlib.md5(instance.public_dns_name.encode('utf-8')).he xdigest()
try: try:
ssh_client = ssh_connect_with_retries(addr) ssh_client = ssh_connect_with_retries(instance.public_dns_na me)
for cmd in [ for cmd in [
'GOMAXPROCS=8 \ 'GOMAXPROCS=8 \
./go/src/github.com/nsqio/nsq/bench/bench_reader /bench_reader \ ./go/src/github.com/nsqio/nsq/bench/bench_reader /bench_reader \
--topic=%s --nsqd-tcp-address=%s:4150 --deadline =\'%s\' --size=%d \ --topic=%s --nsqd-tcp-address=%s:4150 --deadline =\'%s\' --size=%d \
--rdy=%d' % ( --rdy=%d' % (
topic, nsqd_tcp_addr, deadline.strftime('%Y-%m-% d %H:%M:%S'), topic, nsqd_tcp_addr, deadline.strftime('%Y-%m-% d %H:%M:%S'),
tornado.options.options.msg_size, tornado.option s.options.rdy)]: tornado.options.options.msg_size, tornado.option s.options.rdy)]:
worker_chans.append((ssh_client, ssh_cmd_async(ssh_clien t, cmd))) worker_chans.append((ssh_client, ssh_cmd_async(ssh_clien t, cmd)))
except Exception: except Exception:
logging.exception('failed') logging.exception('failed')
skipping to change at line 219 skipping to change at line 226
'ops': [] 'ops': []
} }
} }
while worker_chans: while worker_chans:
for ssh_client, chan in worker_chans[:]: for ssh_client, chan in worker_chans[:]:
if chan.recv_ready(): if chan.recv_ready():
sys.stdout.write(chan.recv(4096)) sys.stdout.write(chan.recv(4096))
sys.stdout.flush() sys.stdout.flush()
continue continue
if chan.recv_stderr_ready(): if chan.recv_stderr_ready():
line = chan.recv_stderr(4096) line = chan.recv_stderr(4096).decode('utf-8')
if 'duration:' in line: if 'duration:' in line:
kind = line.split(' ')[0][1:-1] kind = line.split(' ')[0][1:-1]
parts = line.rsplit('duration:')[1].split('-') parts = line.rsplit('duration:')[1].split('-')
stats[kind]['durations'].append(float(parts[0].strip()[:-1]) ) stats[kind]['durations'].append(float(parts[0].strip()[:-1]) )
stats[kind]['mbytes'].append(float(parts[1].strip()[:-4])) stats[kind]['mbytes'].append(float(parts[1].strip()[:-4]))
stats[kind]['ops'].append(float(parts[2].strip()[:-5])) stats[kind]['ops'].append(float(parts[2].strip()[:-5]))
sys.stdout.write(line) sys.stdout.write(line)
sys.stdout.flush() sys.stdout.flush()
continue continue
if chan.exit_status_ready(): if chan.exit_status_ready():
skipping to change at line 248 skipping to change at line 255
total_mb = sum(data['mbytes']) total_mb = sum(data['mbytes'])
total_ops = sum(data['ops']) total_ops = sum(data['ops'])
logging.info('[%s] %fs - %fmb/s - %fops/s - %fus/op', logging.info('[%s] %fs - %fmb/s - %fops/s - %fus/op',
kind, max_duration, total_mb, total_ops, kind, max_duration, total_mb, total_ops,
max_duration / total_ops * 1000 * 1000) max_duration / total_ops * 1000 * 1000)
for ssh_client, chan in nsqd_chans: for ssh_client, chan in nsqd_chans:
chan.close() chan.close()
def _find_hosts(): def _find_instances():
conn = connect_to_ec2() session = get_session()
reservations = conn.get_all_instances() ec2 = session.resource('ec2')
instances = [inst for res in reservations for inst in res.instances] return [i for i in ec2.instances.all() if
i.state['Name'] == 'running' and any(t['Key'] == 'nsq_bench' for t i
hosts = [] n i.tags)]
for instance in instances:
if not instance.tags or instance.state != 'running':
continue
if 'nsq_bench' in instance.tags:
hosts.append((instance.id, instance.public_dns_name))
return hosts
def decomm(): def decomm():
conn = connect_to_ec2() instances = _find_instances()
hosts = _find_hosts() logging.info('terminating instances %s' % ','.join(i.id for i in instances))
host_ids = [h[0] for h in hosts] for instance in instances:
logging.info('terminating instances %s' % ','.join(host_ids)) instance.terminate()
conn.terminate_instances(host_ids)
if __name__ == '__main__': if __name__ == '__main__':
tornado.options.define('region', type=str, default='us-east-1', tornado.options.define('region', type=str, default='us-east-1',
help='EC2 region to launch instances') help='EC2 region to launch instances')
tornado.options.define('nsqd_count', type=int, default=3, tornado.options.define('nsqd_count', type=int, default=3,
help='how many nsqd instances to launch') help='how many nsqd instances to launch')
tornado.options.define('worker_count', type=int, default=3, tornado.options.define('worker_count', type=int, default=3,
help='how many worker instances to launch') help='how many worker instances to launch')
tornado.options.define('access_key', type=str, default='', # ubuntu 18.04 HVM instance store us-east-1
help='AWS account access key') tornado.options.define('ami', type=str, default='ami-0938f2289b3fa3f5b',
tornado.options.define('secret_key', type=str, default='',
help='AWS account secret key')
tornado.options.define('ami', type=str, default='ami-48fd2120',
help='AMI ID') help='AMI ID')
tornado.options.define('ssh_key_name', type=str, default='default', tornado.options.define('ssh_key_name', type=str, default='default',
help='SSH key name') help='SSH key name')
tornado.options.define('instance_type', type=str, default='c3.2xlarge', tornado.options.define('instance_type', type=str, default='c3.2xlarge',
help='EC2 instance type') help='EC2 instance type')
tornado.options.define('msg_size', type=int, default=200, tornado.options.define('msg_size', type=int, default=200,
help='size of message') help='size of message')
tornado.options.define('rdy', type=int, default=10000, tornado.options.define('rdy', type=int, default=10000,
help='RDY count to use for bench_reader') help='RDY count to use for bench_reader')
tornado.options.define('mode', type=str, default='pubsub', tornado.options.define('mode', type=str, default='pubsub',
help='the benchmark mode (pub, pubsub)') help='the benchmark mode (pub, pubsub)')
tornado.options.define('commit', type=str, default='master', tornado.options.define('commit', type=str, default='master',
help='the git commit') help='the git commit')
tornado.options.define('golang_version', type=str, default='1.5.1', tornado.options.define('golang_version', type=str, default='1.14.3',
help='the go version') help='the go version')
tornado.options.parse_command_line() tornado.options.parse_command_line()
logging.getLogger('paramiko').setLevel(logging.WARNING) logging.getLogger('paramiko').setLevel(logging.WARNING)
warnings.simplefilter('ignore') warnings.simplefilter('ignore')
cmd_name = sys.argv[-1] cmd_name = sys.argv[-1]
cmd_map = { cmd_map = {
'bootstrap': bootstrap, 'bootstrap': bootstrap,
'run': run, 'run': run,
 End of changes. 29 change blocks. 
82 lines changed or deleted 82 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)