from __future__ import division
import logging
-from time import time as get_timestamp
+from time import time as get_timestamp, sleep
import socket
+from threading import Thread, Lock
-__all__ = [ 'OutPeers', 'outpeers_from_config' ]
+from ais.ntools import formataddr
-class OutPeer:
+__all__ = [ 'OutPeers', 'outpeers_from_config', 'TcpInOutPeer', 'TcpInServiceOut', 'tcpin_outpeers' ]
+
+class UdpOutPeer:
'''
UDP output feed.
'''
self.sock = None
+class TcpInOutPeer:
+ def __init__(self, conn, addr, port):
+ self.sock = conn
+ self.sock.setblocking(0)
+ self.addr = addr
+ self.port = port
+ #logging.debug('Set linger time for outputtcpinlistener socket to %s seconds', SERVER_LINGER_TIME)
+ #self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
+ # struct.pack('ii', 1, SERVER_LINGER_TIME))
+
+ def send_line(self, line):
+ try:
+ self.sock.send(line+'\r\n')
+ except socket.error, err:
+ if err.errno == 32: # Broken pipe
+ logging.info('TcpInOutPeer %s:%s disconnect', formataddr(self.addr), self.port)
+ tcpin_outpeers.remove_peer(self)
+ else:
+ logging.error('socket.send() to TcpInOutPeer %s:%s failed %s', formataddr(self.addr), port, repr(err))
+
+ def shutdown(self):
+ logging.info('Shuting down TcpInOutPeer socket %s:%s', formataddr(self.addr), self.port)
+ self.sock.setblocking(1)
+ self.sock.shutdown(socket.SHUT_RDWR)
+
+
+
+class TcpInServiceOut(Thread):
+ '''
+ Service that listen to TCP port and create TcpInOutPeer on the fly.
+ '''
+ def __init__(self, port):
+ Thread.__init__(self)
+ self.port = port
+ self.sock = None
+
+ def run(self):
+ self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+ self.sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
+ while True:
+ try:
+ self.sock.bind(('', self.port))
+ break
+ except socket.error, err:
+ if err.errno == 98: # Address already in use
+ logging.error('TCP port %s already in use. Waiting...', self.port)
+ sleep(1)
+ elif err.errno == 9: # Bad file descriptor
+ logging.error('TcpInServiceOut socket is not opened. Can\'t bind. Exiting service.')
+ return
+ else:
+ raise
+ logging.info('Bound to socket on port %s for listeners', self.port)
+ #logging.debug('Set linger time for tcpinoutpeerlisten socket to %s seconds', SERVER_LINGER_TIME)
+ #self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
+ # struct.pack('ii', 1, SERVER_LINGER_TIME))
+ self.sock.listen(2)
+ while True:
+ try:
+ conn, addr = self.sock.accept()
+ except socket.error, err:
+ if err.errno == 22: # Invalid argument
+ logging.info('TcpInServiceOut socket was closed. Exiting thread.')
+ return
+ logging.info('Incoming TcpInOutPeer %s:%s', formataddr(addr[0]), addr[1])
+ tcpin_outpeers.add_peer(TcpInOutPeer(conn, addr[0], addr[1]))
+
+ def shutdown(self):
+ if self.sock:
+ try:
+ logging.info('Shuting down tcpinoutpeerlisten socket')
+ self.sock.shutdown(socket.SHUT_RDWR)
+ except socket.error, err:
+ logging.info('Ignoring error %s while shuting down TcpInServiceOut socket.', err.errno)
+ self.sock.close()
+
class OutPeers:
'''
A group of UDP peers for output.
+ This class is thread safe.
'''
def __init__(self):
- self.peers = []
- self.sock = None
+ self._peers = set()
+ self._lock = Lock()
def __repr__(self):
- return '(' + ', '.join([ repr(peer) for peer in self.peers]) + ')'
+ return '(' + ', '.join([ repr(peer) for peer in self._peers]) + ')'
+ def add_peer(self, peer):
+ self._lock.acquire()
+ self._peers.add(peer)
+ self._lock.release()
+
+ def remove_peer(self, peer):
+ self._lock.acquire()
+ self._peers.remove(peer)
+ self._lock.release()
+
+ def safe_iter(self):
+ # make a copy of the set, because its size may change during iteration (RuntimeError)
+ self._lock.acquire()
+ tmp_peers = [ peer for peer in self._peers ]
+ self._lock.release()
+ for peer in tmp_peers:
+ yield peer
+
def send_line(self, line):
'''
Send a line to all the output peers registered.
'''
- for outpeer in self.peers:
+ for outpeer in self.safe_iter():
logging.debug('OUT %s %s', outpeer, repr(line))
outpeer.send_line(line)
for str_config in list_from_config:
assert str_config.startswith('udp:'), \
'Unsupported output peer protocol %s' % str_config
- peer = OutPeer(str_config[4:])
- outpeers.peers.append(peer)
+ peer = UdpOutPeer(str_config[4:])
+ outpeers.add_peer(peer)
return outpeers
+
+
+tcpin_outpeers = OutPeers()