Source code for zeroless.zeroless

"""
The Zeroless module API.

.. data:: log

A global Logger object. To use it, just add an Handler object
and set an appropriate logging level.
"""

import zmq
import logging

from time import sleep
from copy import deepcopy
from warnings import warn
from functools import partial

log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())

def _check_valid_port_range(port):
    if port < 1024 or port > 65535:
        error = 'Port {0} is invalid, choose one between 1024 and 65535'
        error = error.format(port)
        log.error(error)
        raise ValueError(error)

def _check_valid_num_connections(socket_type, num_connections):
    if socket_type == zmq.PAIR and num_connections > 1:
        error = 'A client cannot connect more than once in the PAIR pattern'
        log.error(error)
        raise RuntimeError(error)

def _connect_zmq_sock(sock, ip, port):
    log.info('Connecting to {0} on port {1}'.format(ip, port))
    sock.connect('tcp://' + ip + ':' + str(port))

def _disconnect_zmq_sock(sock, ip, port):
    log.info('Disconnecting from {0} on port {1}'.format(ip, port))

    try:
        sock.disconnect('tcp://' + ip + ':' + str(port))
    except zmq.ZMQError:
        error = 'There was no connection to {0} on port {1}'.format(ip, port)
        log.exception(error)
        raise ValueError(error)

def _bind_zmq_sock(sock, port):
    log.info('Binding on port {0}'.format(port))

    try:
        sock.bind('tcp://*:' + str(port))
    except zmq.ZMQError:
        error = 'Port {0} is already in use'.format(port)
        log.exception(error)
        raise ValueError(error)

def _recv(sock):
    while True:
        frames = sock.recv_multipart()
        log.debug('Receiving: {0}'.format(frames))
        yield frames if len(frames) > 1 else frames[0]

[docs]class Sock: def __init__(self): pass def __sock(self, pattern): sock = zmq.Context().instance().socket(pattern) self._setup(sock) log.info('Ready...') return sock def __send_function(self, sock, topic=None, embed_topic=False): def _send(*data): log.debug('Sending: {0}'.format(data)) try: sock.send_multipart(data) except TypeError: error = 'Data must be bytes, so try again' log.exception(error) raise TypeError(error) if sock.socket_type == zmq.PUB and embed_topic: if topic: return partial(_send, topic) else: return partial(_send, b'') return _send def __recv_generator(self, sock): return _recv(sock) # PubSub pattern
[docs] def pub(self, topic=b'', embed_topic=False): """ Returns a callable that can be used to transmit a message, with a given ``topic``, in a publisher-subscriber fashion. Note that the sender function has a ``print`` like signature, with an infinite number of arguments. Each one being a part of the complete message. :param topic: the topic that will be published to (default=b'') :type topic: bytes :param embed_topic: set to embed the topic into published messages (default=False) :type embed_topic bool :rtype: function """ if not isinstance(topic, bytes): error = 'Topic must be bytes' log.error(error) raise TypeError(error) sock = self.__sock(zmq.PUB) return self.__send_function(sock, topic, embed_topic)
[docs] def sub(self, topics=(b'',)): """ Returns an iterable that can be used to iterate over incoming messages, that were published with one of the topics specified in ``topics``. Note that the iterable returns as many parts as sent by subscribed publishers. :param topics: a list of topics to subscribe to (default=b'') :type topics: list of bytes :rtype: generator """ sock = self.__sock(zmq.SUB) for topic in topics: if not isinstance(topic, bytes): error = 'Topics must be a list of bytes' log.error(error) raise TypeError(error) sock.setsockopt(zmq.SUBSCRIBE, topic) return self.__recv_generator(sock) # PushPull pattern
[docs] def push(self): """ Returns a callable that can be used to transmit a message in a push-pull fashion. Note that the sender function has a ``print`` like signature, with an infinite number of arguments. Each one being a part of the complete message. :rtype: function """ sock = self.__sock(zmq.PUSH) return self.__send_function(sock)
[docs] def pull(self): """ Returns an iterable that can be used to iterate over incoming messages, that were pushed by a push socket. Note that the iterable returns as many parts as sent by pushers. :rtype: generator """ sock = self.__sock(zmq.PULL) return self.__recv_generator(sock) # ReqRep pattern
[docs] def request(self): """ Returns a callable and an iterable respectively. Those can be used to both transmit a message and/or iterate over incoming messages, that were replied by a reply socket. Note that the iterable returns as many parts as sent by repliers. Also, the sender function has a ``print`` like signature, with an infinite number of arguments. Each one being a part of the complete message. :rtype: (function, generator) """ sock = self.__sock(zmq.REQ) return self.__send_function(sock), self.__recv_generator(sock)
[docs] def reply(self): """ Returns a callable and an iterable respectively. Those can be used to both transmit a message and/or iterate over incoming messages, that were requested by a request socket. Note that the iterable returns as many parts as sent by requesters. Also, the sender function has a ``print`` like signature, with an infinite number of arguments. Each one being a part of the complete message. :rtype: (function, generator) """ sock = self.__sock(zmq.REP) return self.__send_function(sock), self.__recv_generator(sock) # Pair pattern
[docs] def pair(self): """ Returns a callable and an iterable respectively. Those can be used to both transmit a message and/or iterate over incoming messages, that were sent by a pair socket. Note that the iterable returns as many parts as sent by a pair. Also, the sender function has a ``print`` like signature, with an infinite number of arguments. Each one being a part of the complete message. :rtype: (function, generator) """ sock = self.__sock(zmq.PAIR) return self.__send_function(sock), self.__recv_generator(sock)
def _setup(self, sock): raise NotImplementedError()
[docs]class Client(Sock): """ A client that can connect to a set of servers. """ def __init__(self): """ Constructor of the Client. """ self._sock = None self._is_ready = False self._addresses = [] Sock.__init__(self) def _setup(self, sock): self._sock = sock self._is_ready = True _check_valid_num_connections(self._sock.socket_type, len(self._addresses)) for ip, port in self._addresses: _connect_zmq_sock(self._sock, ip, port) @property def addresses(self): """ Returns a tuple containing all the connected addresses. Each address is a tuple with an ip address and a port. :rtype: (addresses) """ return tuple(self._addresses)
[docs] def connect(self, ip, port): """ Connects to a server at the specified ip and port. :param ip: an IP address :type ip: str or unicode :param port: port number from 1024 up to 65535 :type port: int """ _check_valid_port_range(port) address = (ip, port) if address in self._addresses: error = 'Already connected to {0} on port {1}'.format(ip, port) log.exception(error) raise ValueError(error) self._addresses.append(address) if self._is_ready: _check_valid_num_connections(self._sock.socket_type, len(self._addresses)) _connect_zmq_sock(self._sock, ip, port)
[docs] def connect_local(self, port): """ Connects to a server in localhost at the specified port. :param port: port number from 1024 up to 65535 :type port: int """ self.connect('127.0.0.1', port)
[docs] def disconnect(self, ip, port): """ Disconnects from a server at the specified ip and port. :param ip: an IP address :type ip: str or unicode :param port: port number from 1024 up to 65535 :type port: int """ _check_valid_port_range(port) address = (ip, port) try: self._addresses.remove(address) except ValueError: error = 'There was no connection to {0} on port {1}'.format(ip, port) log.exception(error) raise ValueError(error) if self._is_ready: _disconnect_zmq_sock(self._sock, ip, port)
[docs] def disconnect_local(self, port): """ Disconnects from a server in localhost at the specified port. :param port: port number from 1024 up to 65535 :type port: int """ self.disconnect('127.0.0.1', port)
[docs] def disconnect_all(self): """ Disconnects from all connected servers. """ addresses = deepcopy(self._addresses) for ip, port in addresses: self.disconnect(ip, port)
[docs]class Server(Sock): """ A server that clients can connect to. """ def __init__(self, port): """ Constructor of the Server. :param port: port number from 1024 up to 65535 :type port: int """ _check_valid_port_range(port) self._port = port Sock.__init__(self) def _setup(self, sock): if sock.socket_type == zmq.SUB: warning = 'SUB sockets that bind will not get any message before ' warning += 'they first ask for via the provided generator, so ' warning += 'prefer to bind PUB sockets if missing some messages ' warning += 'is not an option' warn(warning) _bind_zmq_sock(sock, self._port) @property def port(self): """ Returns the port. :rtype: int """ return self._port