jnosal

On programming and stuff

Locust.io with nanomsg ? Easy peasy.

| Comments

The Need

locust.io is a modern python tool for load testing. I have couple of small services working on development server and running on nanomsg, although nanomsg is still in beta phase I was tempted to move them to production environment (basically because most of them did one thing and did it well). But before deployment I wanted to test my architecture and I needed benchmarks for that.

It turns out that locust, which by default plays nicely with http based services, provides also a neat way to hook custom clients to its core features. So its fairly easy to test xml-rpc, zeromq, rabbitmq or nanomsg based apps.

Code at the bottom is pretty straightforward - it's a simple nanomsg client that serves as an example, to which we have to hook locust events in order to collect locust metrics. 20-30 lines of simple magic and we're done!

We can now run:

$ locust

and enjoy :-)

zdjecie.png

The Code

import json
import time
import nanomsg

from locust import Locust, events, task, TaskSet


class SteroidSocket(nanomsg.Socket):

    def send_json(self, msg, flags=0, **kwargs):
        msg = json.dumps(msg, **kwargs).encode('utf8')
        self.send(msg, flags)

    def recv_json(self, buf=None, flags=0):
        msg = self.recv(buf, flags)
        return json.loads(msg)


class NanomsgClient(object):
    socket_type = nanomsg.REQ
    default_send_timeout = 100

    def __init__(self, address, **kwargs):
        self.address = address
        self.setup()

    def get_socket(self):
        return SteroidSocket(self.socket_type)

    def setup(self):
        self.socket = self.get_socket()
        self.socket.connect(self.address)
        self.socket._set_send_timeout(self.default_send_timeout)

    def get(self, msg):
        start_time = time.time()
        try:
            self.socket.send_json(msg)
            result = self.socket.recv_json()
            print result
        except nanomsg.NanoMsgAPIError as e:
            total_time = int((time.time() - start_time) * 1000)
            events.request_failure.fire(
                request_type="nanomsg",
                name=msg.get('executable', ''),
                response_time=total_time,
                exception=e
            )
        else:
            total_time = int((time.time() - start_time) * 1000)
            events.request_success.fire(
                request_type="nanomsg",
                name=msg.get('executable', ''),
                response_time=total_time,
                response_length=0
            )

    def close(self):
        self.socket.close()


class NanomsgLocust(Locust):

    def __init__(self, *args, **kwargs):
        super(NanomsgLocust, self).__init__(*args, **kwargs)
        self.client = NanomsgClient(self.address)


class NanomsgUser(NanomsgLocust):
    address = "tcp://127.0.0.1:5001"
    min_wait = 100
    max_wait = 1000

    class task_set(TaskSet):

        @task(1)
        def ping(self):
            self.client.get({'executable': 'ping'})

        @task(1)
        def pong(self):
            self.client.get({'executable': 'pong'})
            

The Conclustion

Whenever in need to test custom architecture, find single points of failure or simply experiment with your stack - use locust.io to put your code under pressure :-)

Comments

comments powered by Disqus