Rationalized output udp forwarding:
authorJean-Michel Nirgal Vourgère <jmv@nirgal.com>
Thu, 3 Jun 2010 22:59:08 +0000 (22:59 +0000)
committerJean-Michel Nirgal Vourgère <jmv@nirgal.com>
Thu, 3 Jun 2010 22:59:08 +0000 (22:59 +0000)
- Code factorisation
- IP error resilient
- Support IPv4 and IPv6 including change from DNS resolution on the fly

bin/inputs/common.py
bin/inputs/outpeer.py [new file with mode: 0644]
bin/inputs/serialin.py
bin/inputs/tcpout.py
bin/inputs/udp.py

index 4c7de4048022d6eacd9d0830760aba11e766c953..386bf8cc0cea0f551794984d134902c11601e119 100644 (file)
@@ -8,7 +8,7 @@ __all__ = [
     'formataddr',
     'aivdm_log_line',
     'InStatsRate', 'InStats',
-    'ais_line_reader' ]
+    'ais_line_reader']
 
 import os.path
 import sys
@@ -17,7 +17,7 @@ from datetime import datetime
 from time import time as get_timestamp
 
 import rrdtool
-       
+
 
 NMEA_DIR = '/var/lib/ais/nmea'
 STATS_DIR = '/var/lib/ais/stats'
@@ -129,13 +129,13 @@ class InStatsRate:
         Catches 'No such file or directory' and create it if necessary.
         '''
         logging.info('%.1f packets/s - %.1f AIVDM lines/s - '
-                            '%.0f bps payload - %.0f bps internet',
-                                        self.rpackets, self.rlines,
-                                        8*self.rbytes, 8*self.rbytes_ethernet)
+                     '%.0f bps payload - %.0f bps internet',
+                     self.rpackets, self.rlines,
+                     8*self.rbytes, 8*self.rbytes_ethernet)
         logging.debug('rrdtool <- %s:%s:%s:%s:%s' \
-                            % (int(self.timestamp), 
-                                           self.rpackets, self.rlines, 
-                                               self.rbytes, self.rbytes_ethernet))
+                     % (int(self.timestamp), 
+                        self.rpackets, self.rlines, 
+                        self.rbytes, self.rbytes_ethernet))
         try:
             self._save(filename)
         except rrdtool.error, err:
@@ -212,3 +212,5 @@ def ais_line_reader(source):
             continue # ignore empty line
 
         yield line
+
+
diff --git a/bin/inputs/outpeer.py b/bin/inputs/outpeer.py
new file mode 100644 (file)
index 0000000..e754bf6
--- /dev/null
@@ -0,0 +1,120 @@
+# -*- coding: utf-8 -*-
+'''
+UDP out peers module
+'''
+
+__all__ = [ 'OutPeers' ]
+
+
+import logging
+from time import time as get_timestamp
+import socket
+
+class OutPeer:
+    '''
+    UDP output feed.
+    '''
+    def __init__(self, str_addrport):
+        colpos = str_addrport.rfind(':')
+        if colpos == 1:
+            assert colpos != 1, \
+                'Missing column in host:port string for %s' % str_addrport
+        self.host = str_addrport[:colpos]
+        if self.host.startswith('['):
+            assert self.host[-1] == ']'
+            self.host = self.host[1:-1]
+        self.port = int(str_addrport[colpos+1:])
+        self.possibles_addrinfo = None
+        self.sock = None
+        self.addr = None
+        self.timestamp_getaddrinfo = 0
+        self._reinit_socket()
+
+
+    def _reinit_socket(self):
+        '''
+        Remove last known addrinfo for that host:port, if any.
+        If addrinfo is empty, fetch a new one.
+        Initialise self.sock and self.addr
+        '''
+        while True:
+            if self.possibles_addrinfo:
+                # remove current (0th) addrinfo
+                del self.possibles_addrinfo[0]
+
+            if not self.possibles_addrinfo:
+                timestamp = get_timestamp()
+                if timestamp > self.timestamp_getaddrinfo + 60:
+                    try:
+                        logging.debug('Calling socket.getaddrinfo for %s:%s',
+                            self.host, self.port)
+                        self.possibles_addrinfo = socket.getaddrinfo(self.host,
+                            self.port, socket.AF_UNSPEC, socket.SOCK_DGRAM)
+                        logging.info('%s possibles addrinfo for %s:%s.', 
+                            len(self.possibles_addrinfo), self.host, self.port)
+                        logging.debug('possibles_addrinfo: %s ',
+                            self.possibles_addrinfo)
+                    except socket.gaierror, err:
+                        logging.error("Can't resolve %s:%s: %s."
+                            "Will retry in a minute.",
+                            self.host, self.port, err)
+                        return False
+                    finally:
+                        self.timestamp_getaddrinfo = timestamp
+                    if not self.possibles_addrinfo:
+                        logging.error("Can't resolve %s:%s."
+                            "Will retry in a minute.",
+                            self.host, self.port)
+                        return False
+                else:
+                    return False # waiting 
+
+            addrinfo = self.possibles_addrinfo[0]
+            logging.info('Prepared socket to %s', addrinfo)
+            af, socktype, proto = addrinfo[:3]
+            self.addr = addrinfo[4]
+            try:
+                self.sock = socket.socket(af, socktype, proto)
+                return True
+            except socket.error, msg:
+                logging.error("Can't open socket to %s: %s", self.addr, msg)
+        
+    def send_line(self, line):
+        '''
+        Sends a AIVDM line to that peer.
+        CRLF will be added automatically.
+        '''
+        while self.sock or self._reinit_socket():
+            try:
+                self.sock.sendto(line+'\r\n', self.addr)
+                return
+            except socket.error, err:
+                logging.error('socket.sendto(%s) failed: %s', self.addr, err)
+                self.sock = None
+
+
+class OutPeers:
+    '''
+    A group of UDP peers for output.
+    '''
+    def __init__(self, list_from_optparse):
+        '''
+        Just give the list for strings from optparse.
+        '''
+        self.peers = {}
+        self.sock = None
+        for str_addrport in list_from_optparse:
+            peer = OutPeer(str_addrport)
+            self.peers[peer] = 0
+
+    def send_line(self, line):
+        '''
+        Send a line to all the output peers registered.
+        '''
+        logging.debug('OUT %s', repr(line+'\r\n'))
+        for outpeer, stats in self.peers.iteritems():
+            #logging.debug('OUT %s:%s %s',
+            #              formataddr(outpeer[0]), outpeer[1],
+            #              repr(line+'\r\n'))
+            outpeer.send_line(line)
+            self.peers[outpeer] += 1
index 9857687870a0f2c247d279b5e97127ef9a2e9b81..693b8eedafb2c25260f567dab355257ea5a85232 100755 (executable)
@@ -5,10 +5,9 @@ import sys
 from time import time as get_timestamp
 import serial
 import logging
-import socket
 
-from common import STATS_RATE, aivdm_log_line, formataddr, InStats, \
-     ais_line_reader
+from common import STATS_RATE, aivdm_log_line, InStats, ais_line_reader
+from outpeer import OutPeers
 
 
 class SerialSource:
@@ -45,6 +44,9 @@ class SerialSource:
             logging.debug('IN %s %s', self.name, repr(data))
             self.data += data
 
+#class SerialNamedSource:
+#    def __init__(self):
+#        pass
 
 def main():
     from optparse import OptionParser
@@ -70,17 +72,7 @@ def main():
     logging.basicConfig(level=loglevel,
                         format='%(asctime)s %(levelname)s %(message)s')
 
-    outpeers = {}
-    for peer in options.peers:
-        colpos = peer.rfind(':')
-        if colpos == 1:
-            print >> sys.stderr, 'Missing column in host:port string'
-        host = peer[:colpos]
-        if host.startswith('['):
-            assert host[-1] == ']'
-            host = host[1:-1]
-        port = int(peer[colpos+1:])
-        outpeers[(host, port)] = 0
+    outpeers = OutPeers(options.peers)
 
     if options.id4:
         assert len(options.id4)==4, 'ID4 must be 4 characters long.'
@@ -95,10 +87,6 @@ def main():
 
     source = SerialSource(options.id4, serialname)
 
-    # shipplotter doesn't support ipv6 with wine:
-    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-    #sock.bind(('::1', serverport))
-
     last_stat = get_timestamp()
     try:
         while True:
@@ -112,19 +100,7 @@ def main():
                     aivdm_log_line(source.id4, timestamp, serialname, line)
 
                 # forward the line
-                logging.debug('OUT %s', repr(line+'\r\n'))
-                for outpeer, stats in outpeers.iteritems():
-                    logging.debug('OUT %s:%s %s',
-                                  formataddr(outpeer[0]), outpeer[1],
-                                  repr(line+'\r\n'))
-                    # FIXME: don't resolve hostname every time!
-                    try:
-                        sock.sendto(line+'\r\n', outpeer)
-                    except socket.error, err:
-                        # ignore socket.error: (101, 'Network is unreachable')
-                        # FIXME: don't flood with these messages....
-                        logging.error('socket.sendto failed: %s', err)
-                    outpeers[outpeer] += 1
+                outpeers.send_line(line)
 
                 if options.stdout:
                     sys.stdout.write(line+'\r\n')
index 2f957fbc85944ba808932b7014154e8310472524..41b964f225e71049ad3ac00f958a51866d04f4f2 100755 (executable)
@@ -6,6 +6,7 @@ from time import time as get_timestamp
 import socket
 
 from common import STATS_RATE, aivdm_log_line, InStats, ais_line_reader
+from outpeer import OutPeers
 
 TCP_HEADER_SIZE = 52
 
@@ -50,6 +51,9 @@ def main():
     parser.add_option('-d', '--debug',
         help="debug mode",
         action='store_true', dest='debug', default=False)
+    parser.add_option('-f', '--forward',
+        help="forward information to this peer.\nEg [::ffff:12.23.34.45]:1234",
+        action='append', dest='peers', default=[])
     parser.add_option('--id4',
         help='4 letter string for identifying the source.',
         action='store', type='str', dest='id4', default=None)
@@ -62,7 +66,10 @@ def main():
         loglevel = logging.DEBUG
     else:
         loglevel = logging.INFO
-    logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
+    logging.basicConfig(level=loglevel,
+        format='%(asctime)s %(levelname)s %(message)s')
+
+    outpeers = OutPeers(options.peers)
 
     if options.id4:
         assert len(options.id4)==4, 'ID4 must be 4 characters long.'
@@ -86,7 +93,14 @@ def main():
                 # dump the line to file
                 if source.id4:
                     aivdm_log_line(source.id4, timestamp, from_, line)
-            
+                
+                # forward the line
+                outpeers.send_line(line)
+
+            # we don't want any stdout buffering
+            if options.stdout:
+                sys.stdout.flush()
+                
             # log statistics every STATS_RATE seconds
             if timestamp - last_stat > STATS_RATE:
                 rates = source.stats.get_rates()
index 642e5d62c0a13f349352a4736b3c0a79be1e673f..a5769da2fff07d6e87ac9fa94b1cea9107c235cf 100755 (executable)
@@ -7,6 +7,7 @@ from time import time as get_timestamp
 import socket
 
 from common import STATS_RATE, aivdm_log_line, formataddr, InStats, InStatsRate, ais_line_reader
+from outpeer import OutPeers
 
 UDP_HEADER_SIZE = 28
 DEFAULT_MTU = 1500
@@ -82,17 +83,7 @@ def main():
     logging.basicConfig(level=loglevel,
         format='%(asctime)s %(levelname)s %(message)s')
 
-    outpeers = {}
-    for peer in options.peers:
-        colpos = peer.rfind(':')
-        if colpos == 1:
-            print >> sys.stderr, 'Missing column in host:port string'
-        host = peer[:colpos]
-        if host.startswith('['):
-            assert host[-1] == ']'
-            host = host[1:-1]
-        port = int(peer[colpos+1:])
-        outpeers[(host, port)] = 0
+    outpeers = OutPeers(options.peers)
 
     if options.id4:
         assert len(options.id4)==4, 'ID4 must be 4 characters long.'
@@ -140,13 +131,7 @@ def main():
                         aivdm_log_line(options.id4, timestamp, from_, line)
                 
                     # forward the line
-                    logging.debug('OUT %s', repr(line+'\r\n'))
-                    for outpeer, stats in outpeers.iteritems():
-                        #logging.debug('OUT %s:%s %s',
-                        #              formataddr(outpeer[0]), outpeer[1],
-                        #              repr(line+'\r\n'))
-                        sock.sendto(line+'\r\n', outpeer)
-                        outpeers[outpeer] += 1
+                    outpeers.send_line(line)
 
                 # we don't want any stdout buffering
                 if options.stdout: