Added --tcp-listeners-port option to daemon in order to allow a global inter-source...
authorJean-Michel Nirgal Vourgère <jmv@nirgal.com>
Fri, 22 Apr 2011 05:27:18 +0000 (05:27 +0000)
committerJean-Michel Nirgal Vourgère <jmv@nirgal.com>
Fri, 22 Apr 2011 05:27:18 +0000 (05:27 +0000)
bin/inputs/outpeer.py
bin/inputs/run.py

index 7d1b673ea46c2bfe4358044ad31836fa60f01d9c..7d0b031c5a325e623b5cee7d1f09c4e76e001f46 100644 (file)
@@ -10,7 +10,7 @@ import socket
 
 __all__ = [ 'OutPeers', 'outpeers_from_config' ]
 
-class OutPeer:
+class UdpOutPeer:
     '''
     UDP output feed.
     '''
@@ -123,6 +123,6 @@ def outpeers_from_config(list_from_config):
     for str_config in list_from_config:
         assert str_config.startswith('udp:'), \
                'Unsupported output peer protocol %s' % str_config
-        peer = OutPeer(str_config[4:])
+        peer = UdpOutPeer(str_config[4:])
         outpeers.peers.append(peer)
     return outpeers
index 662f9e9e0f764d8f25d683428c189d35efa7513b..2ac942f1072f796ce9c6205f6d3237fd033e5986 100644 (file)
@@ -21,6 +21,74 @@ from ais.inputs.tcpout import TcpOutService
 from ais.gpsdec import AivdmProcessor
 
 
+import socket
+import struct
+from threading import Thread, Lock
+from time import sleep
+from ais.ntools import formataddr
+output_tcpin_listeners = set()
+SERVER_LINGER_TIME = 5
+
+class OutputTcpInListener:
+    def __init__(self, conn, addr, port):
+        self.sock = conn
+        self.sock.setblocking(0)
+        self.addr = addr
+        self.port = port
+        logging.debug('Set linger time for outputtcpinlistener socket to %s seconds', SERVER_LINGER_TIME)
+        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
+                              struct.pack('ii', 1, SERVER_LINGER_TIME))
+
+    def send_line(self, line):
+        global output_tcpin_listeners_mutex
+        try:
+            self.sock.send(line+'\r\n')
+        except socket.error, err:
+            if err.errno == 32: # Broken pipe
+                logging.info('listener %s:%s disconnect', formataddr(self.addr), self.port)
+                output_tcpin_listeners_mutex.acquire()
+                output_tcpin_listeners.remove(self)
+                output_tcpin_listeners_mutex.release()
+            else:
+                logging.error('socket.send() to listener %s:%s failed %s', formataddr(self.addr), port, repr(err))
+
+
+class OutputTcpInListen(Thread):
+    def __init__(self, port):
+        Thread.__init__(self)
+        self.port = port
+        self.sock = None
+
+    def run(self):
+        self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+        self.sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
+        while True:
+            try:
+                self.sock.bind(('', self.port))
+                break
+            except socket.error, err:
+                if err.errno == 98: # Address already in use
+                    logging.error('TCP port %s already in use. Waiting...', self.port)
+                    sleep(1)
+                else:
+                    raise
+        logging.info('Bound to socket on port %s for listeners', self.port)
+        logging.debug('Set linger time for outtcplisten socket to %s seconds', SERVER_LINGER_TIME)
+        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
+                              struct.pack('ii', 1, SERVER_LINGER_TIME))
+        self.sock.listen(2)
+        while True:
+            try:
+                conn, addr = self.sock.accept()
+            except socket.error, err:
+                if err.errno == 22: # Invalid argument
+                    logging.info('TcpInListen socket was closed. Exiting thread.')
+                    return
+                raise
+            logging.info('Incoming listener %s:%s', formataddr(addr[0]), addr[1])
+            output_tcpin_listeners.add(OutputTcpInListener(conn, addr[0], addr[1]))
+
+
 def mainloop(options, args):
     if options.debug:
         loglevel = logging.DEBUG
@@ -52,6 +120,12 @@ def mainloop(options, args):
 
     #rootlogger.addHandler(loghandler)
     
+    if options.tcp_listeners_port:
+        global output_tcpin_listeners_mutex
+        outtcplisten = OutputTcpInListen(options.tcp_listeners_port)
+        output_tcpin_listeners_mutex = Lock()
+        outtcplisten.start()
+
     try:
         config = peers_get_config()
         if len(config) == 0:
@@ -119,6 +193,13 @@ def mainloop(options, args):
 
                         # forward the line
                         channel.source.outpeers.send_line(line)
+                        if options.tcp_listeners_port:
+                            # make a copy of the set, because its size may change during iteration (RuntimeError)
+                            output_tcpin_listeners_mutex.acquire()
+                            tmp_listeners = [ listener for listener in output_tcpin_listeners ]
+                            output_tcpin_listeners_mutex.release()
+                            for listener in tmp_listeners:
+                                listener.send_line(line)
                   
                     # we don't want any stdout buffering
                     if options.stdout:
@@ -130,11 +211,20 @@ def mainloop(options, args):
                 refresh_all_stats()
                 last_stat = timestamp
                         
-
-
     except KeyboardInterrupt:
         logging.critical('Received SIGINT. Shuting down.')
-        raise
+
+    if options.tcp_listeners_port:
+        if outtcplisten.sock:
+            logging.info('Shuting down outtcplisten socket')
+            outtcplisten.sock.shutdown(socket.SHUT_RDWR)
+        output_tcpin_listeners_mutex.acquire()
+        tmp_listeners = [ listener for listener in output_tcpin_listeners ]
+        output_tcpin_listeners_mutex.release()
+        for listener in tmp_listeners:
+            logging.info('Shuting down listener socket %s:%s', formataddr(listener.addr), listener.port)
+            listener.sock.setblocking(1)
+            listener.sock.shutdown(socket.SHUT_RDWR)
 
 def main():
     from optparse import OptionParser
@@ -151,6 +241,9 @@ def main():
     parser.add_option('--db',
         help='Process packets locally',
         action='store_true', dest='todb', default=False)
+    parser.add_option('--tcp-listeners-port',
+        help='TCP global listener port',
+        action='store', type=int, dest='tcp_listeners_port')
     options, args = parser.parse_args()