Moved TcpInOutpeer classes to outpeers
authorJean-Michel Nirgal Vourgère <jmv@nirgal.com>
Fri, 22 Apr 2011 14:16:32 +0000 (14:16 +0000)
committerJean-Michel Nirgal Vourgère <jmv@nirgal.com>
Fri, 22 Apr 2011 14:16:32 +0000 (14:16 +0000)
bin/inputs/outpeer.py
bin/inputs/run.py
bin/inputs/virtual.py

index 7d0b031c5a325e623b5cee7d1f09c4e76e001f46..eb3779dcad88f5b802f4b458bcd247f19865cffb 100644 (file)
@@ -5,10 +5,13 @@ UDP out peers module
 
 from __future__ import division
 import logging
-from time import time as get_timestamp
+from time import time as get_timestamp, sleep
 import socket
+from threading import Thread, Lock
 
-__all__ = [ 'OutPeers', 'outpeers_from_config' ]
+from ais.ntools import formataddr
+
+__all__ = [ 'OutPeers', 'outpeers_from_config', 'TcpInOutPeer', 'TcpInOutPeerListen', 'tcpin_outpeers' ]
 
 class UdpOutPeer:
     '''
@@ -95,22 +98,109 @@ class UdpOutPeer:
                 self.sock = None
 
 
+class TcpInOutPeer:
+    def __init__(self, conn, addr, port):
+        self.sock = conn
+        self.sock.setblocking(0)
+        self.addr = addr
+        self.port = port
+        #logging.debug('Set linger time for outputtcpinlistener socket to %s seconds', SERVER_LINGER_TIME)
+        #self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
+        #                      struct.pack('ii', 1, SERVER_LINGER_TIME))
+
+    def send_line(self, line):
+        try:
+            self.sock.send(line+'\r\n')
+        except socket.error, err:
+            if err.errno == 32: # Broken pipe
+                logging.info('TcpInOutPeer %s:%s disconnect', formataddr(self.addr), self.port)
+                tcpin_outpeers.remove_peer(self)
+            else:
+                logging.error('socket.send() to TcpInOutPeer %s:%s failed %s', formataddr(self.addr), port, repr(err))
+
+    def shutdown(self):
+        logging.info('Shuting down TcpInOutPeer socket %s:%s', formataddr(self.addr), self.port)
+        self.sock.setblocking(1)
+        self.sock.shutdown(socket.SHUT_RDWR)
+
+
+
+class TcpInOutPeerListen(Thread):
+    def __init__(self, port):
+        Thread.__init__(self)
+        self.port = port
+        self.sock = None
+
+    def run(self):
+        self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+        self.sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
+        while True:
+            try:
+                self.sock.bind(('', self.port))
+                break
+            except socket.error, err:
+                if err.errno == 98: # Address already in use
+                    logging.error('TCP port %s already in use. Waiting...', self.port)
+                    sleep(1)
+                else:
+                    raise
+        logging.info('Bound to socket on port %s for listeners', self.port)
+        #logging.debug('Set linger time for tcpinoutpeerlisten socket to %s seconds', SERVER_LINGER_TIME)
+        #self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
+        #                      struct.pack('ii', 1, SERVER_LINGER_TIME))
+        self.sock.listen(2)
+        while True:
+            try:
+                conn, addr = self.sock.accept()
+            except socket.error, err:
+                if err.errno == 22: # Invalid argument
+                    logging.info('TcpInListen socket was closed. Exiting thread.')
+                    return
+                raise
+            logging.info('Incoming TcpInOutPeer %s:%s', formataddr(addr[0]), addr[1])
+            tcpin_outpeers.add_peer(TcpInOutPeer(conn, addr[0], addr[1]))
+    
+    def shutdown(self):
+        if self.sock:
+            logging.info('Shuting down tcpinoutpeerlisten socket')
+            self.sock.shutdown(socket.SHUT_RDWR)
+        
+
 class OutPeers:
     '''
     A group of UDP peers for output.
+    This class is thread safe.
     '''
     def __init__(self):
-        self.peers = []
-        self.sock = None
+        self._peers = set()
+        self._lock = Lock()
 
     def __repr__(self):
-        return '(' + ', '.join([ repr(peer) for peer in self.peers]) + ')'
+        return '(' + ', '.join([ repr(peer) for peer in self._peers]) + ')'
         
+    def add_peer(self, peer):
+        self._lock.acquire()
+        self._peers.add(peer)
+        self._lock.release()
+
+    def remove_peer(self, peer):
+        self._lock.acquire()
+        self._peers.remove(peer)
+        self._lock.release()
+
+    def safe_iter(self):
+        # make a copy of the set, because its size may change during iteration (RuntimeError)
+        self._lock.acquire()
+        tmp_peers = [ peer for peer in self._peers ]
+        self._lock.release()
+        for peer in tmp_peers:
+            yield peer
+
     def send_line(self, line):
         '''
         Send a line to all the output peers registered.
         '''
-        for outpeer in self.peers:
+        for outpeer in self.safe_iter():
             logging.debug('OUT %s %s', outpeer, repr(line))
             outpeer.send_line(line)
 
@@ -124,5 +214,8 @@ def outpeers_from_config(list_from_config):
         assert str_config.startswith('udp:'), \
                'Unsupported output peer protocol %s' % str_config
         peer = UdpOutPeer(str_config[4:])
-        outpeers.peers.append(peer)
+        outpeers.add_peer(peer)
     return outpeers
+
+
+tcpin_outpeers = OutPeers()
index 2ac942f1072f796ce9c6205f6d3237fd033e5986..bdb208b468541e31befc0750a48bd61061b35847 100644 (file)
@@ -19,75 +19,7 @@ from ais.inputs.udp import UdpService
 from ais.inputs.serialin import SerialService
 from ais.inputs.tcpout import TcpOutService
 from ais.gpsdec import AivdmProcessor
-
-
-import socket
-import struct
-from threading import Thread, Lock
-from time import sleep
-from ais.ntools import formataddr
-output_tcpin_listeners = set()
-SERVER_LINGER_TIME = 5
-
-class OutputTcpInListener:
-    def __init__(self, conn, addr, port):
-        self.sock = conn
-        self.sock.setblocking(0)
-        self.addr = addr
-        self.port = port
-        logging.debug('Set linger time for outputtcpinlistener socket to %s seconds', SERVER_LINGER_TIME)
-        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
-                              struct.pack('ii', 1, SERVER_LINGER_TIME))
-
-    def send_line(self, line):
-        global output_tcpin_listeners_mutex
-        try:
-            self.sock.send(line+'\r\n')
-        except socket.error, err:
-            if err.errno == 32: # Broken pipe
-                logging.info('listener %s:%s disconnect', formataddr(self.addr), self.port)
-                output_tcpin_listeners_mutex.acquire()
-                output_tcpin_listeners.remove(self)
-                output_tcpin_listeners_mutex.release()
-            else:
-                logging.error('socket.send() to listener %s:%s failed %s', formataddr(self.addr), port, repr(err))
-
-
-class OutputTcpInListen(Thread):
-    def __init__(self, port):
-        Thread.__init__(self)
-        self.port = port
-        self.sock = None
-
-    def run(self):
-        self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
-        self.sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
-        while True:
-            try:
-                self.sock.bind(('', self.port))
-                break
-            except socket.error, err:
-                if err.errno == 98: # Address already in use
-                    logging.error('TCP port %s already in use. Waiting...', self.port)
-                    sleep(1)
-                else:
-                    raise
-        logging.info('Bound to socket on port %s for listeners', self.port)
-        logging.debug('Set linger time for outtcplisten socket to %s seconds', SERVER_LINGER_TIME)
-        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
-                              struct.pack('ii', 1, SERVER_LINGER_TIME))
-        self.sock.listen(2)
-        while True:
-            try:
-                conn, addr = self.sock.accept()
-            except socket.error, err:
-                if err.errno == 22: # Invalid argument
-                    logging.info('TcpInListen socket was closed. Exiting thread.')
-                    return
-                raise
-            logging.info('Incoming listener %s:%s', formataddr(addr[0]), addr[1])
-            output_tcpin_listeners.add(OutputTcpInListener(conn, addr[0], addr[1]))
-
+from ais.inputs.outpeer import TcpInOutPeerListen, tcpin_outpeers
 
 def mainloop(options, args):
     if options.debug:
@@ -121,10 +53,8 @@ def mainloop(options, args):
     #rootlogger.addHandler(loghandler)
     
     if options.tcp_listeners_port:
-        global output_tcpin_listeners_mutex
-        outtcplisten = OutputTcpInListen(options.tcp_listeners_port)
-        output_tcpin_listeners_mutex = Lock()
-        outtcplisten.start()
+        tcpinoutpeerlisten = TcpInOutPeerListen(options.tcp_listeners_port)
+        tcpinoutpeerlisten.start()
 
     try:
         config = peers_get_config()
@@ -193,13 +123,7 @@ def mainloop(options, args):
 
                         # forward the line
                         channel.source.outpeers.send_line(line)
-                        if options.tcp_listeners_port:
-                            # make a copy of the set, because its size may change during iteration (RuntimeError)
-                            output_tcpin_listeners_mutex.acquire()
-                            tmp_listeners = [ listener for listener in output_tcpin_listeners ]
-                            output_tcpin_listeners_mutex.release()
-                            for listener in tmp_listeners:
-                                listener.send_line(line)
+                        tcpin_outpeers.send_line(line)
                   
                     # we don't want any stdout buffering
                     if options.stdout:
@@ -215,16 +139,9 @@ def mainloop(options, args):
         logging.critical('Received SIGINT. Shuting down.')
 
     if options.tcp_listeners_port:
-        if outtcplisten.sock:
-            logging.info('Shuting down outtcplisten socket')
-            outtcplisten.sock.shutdown(socket.SHUT_RDWR)
-        output_tcpin_listeners_mutex.acquire()
-        tmp_listeners = [ listener for listener in output_tcpin_listeners ]
-        output_tcpin_listeners_mutex.release()
-        for listener in tmp_listeners:
-            logging.info('Shuting down listener socket %s:%s', formataddr(listener.addr), listener.port)
-            listener.sock.setblocking(1)
-            listener.sock.shutdown(socket.SHUT_RDWR)
+        tcpinoutpeerlisten.shutdown()
+        for outpeer in tcpin_outpeers.safe_iter():
+            outpeer.shutdown()
 
 def main():
     from optparse import OptionParser
@@ -242,7 +159,7 @@ def main():
         help='Process packets locally',
         action='store_true', dest='todb', default=False)
     parser.add_option('--tcp-listeners-port',
-        help='TCP global listener port',
+        help='TCP global OutPeer port',
         action='store', type=int, dest='tcp_listeners_port')
     options, args = parser.parse_args()
 
index f7675bd8355b91c5d0bac502547f317d79ecffb8..be257a9b8392375f03ab678e42e4328fbdbbd425 100644 (file)
@@ -87,6 +87,3 @@ class Channel:
                 continue # ignore empty line
     
             yield line
-
-
-