Functional inputs.run multiservice launcher. But it still mixes all the raw streams...
authorJean-Michel Nirgal Vourgère <jmv@nirgal.com>
Sat, 12 Jun 2010 19:40:46 +0000 (19:40 +0000)
committerJean-Michel Nirgal Vourgère <jmv@nirgal.com>
Sat, 12 Jun 2010 19:40:46 +0000 (19:40 +0000)
Added __repr__ for services.
Removed stdout parameter from service.
Added a geter for normalized SOURCE in peers.
Change get_activity from a yielder to a simple return statement.

bin/inputs/common.py
bin/inputs/outpeer.py
bin/inputs/peers.py
bin/inputs/run.py
bin/inputs/serialin.py
bin/inputs/tcpout.py
bin/inputs/udp.py
bin/inputs/virtual.py

index 152aea6e39bf76ca63470f3adc5cc6036696c159..40ca9841fca61d7e2075962a2ea70ecb0202082a 100644 (file)
@@ -16,7 +16,7 @@ import logging
 from datetime import datetime
 
 from ais.inputs.stats import InStats
-from ais.inputs.peers import SOURCES
+from ais.inputs.peers import peers_get_config
 from ais.inputs.outpeer import outpeers_from_config
 
 
@@ -72,8 +72,9 @@ class Source:
         __all_sources__[id4] = self
         self.stats = InStats(id4)
         self.logger = SourceLogger(id4)
-        logging.debug('sources: %s', SOURCES)
-        config_outpeers = SOURCES[id4].get('out', ())
+        sources_config = peers_get_config()
+        logging.debug('sources: %s', sources_config)
+        config_outpeers = sources_config[id4].get('out', ())
         self.outpeers = outpeers_from_config(config_outpeers)
         logging.info('Outpeers for %s: %s', id4, self.outpeers)
 
index 6fce1264af312408601f8888248f9ff5903c1e5d..0df030af9e8f70a96b5d56bb6b3cec220fa7a8c5 100644 (file)
@@ -104,13 +104,14 @@ class OutPeers:
         self.sock = None
 
     def __repr__(self):
-        return '(' + ','.join([ repr(peer) for peer in self.peers]) + ')'
+        return '(' + ', '.join([ repr(peer) for peer in self.peers]) + ')'
         
     def send_line(self, line):
         '''
         Send a line to all the output peers registered.
         '''
         for outpeer in self.peers:
+            logging.debug('OUT %s %s', outpeer, repr(line))
             outpeer.send_line(line)
 
 def outpeers_from_config(list_from_config):
index 1f1f5cc7153fb4987f4bf6ada2ccee03c5b7f3e0..15e7211db94b4178f462b7ed67266c8c3003dab5 100644 (file)
@@ -3,21 +3,25 @@
 Peers definition
 '''
 
+__all__ = [
+    'peers_get_config',
+    'source_get_infoin']
+
+#TODO: rename this to config.py or config reader or whatever
+
 import logging
 import pprint
 from ais.ntools import str_split_column_ipv6
 from ais.inputs.config import SOURCES
-#from ais.inputs.udp import UdpService
-#from ais.inputs.serialin import SerialService
 
-UDPIN_HLPORT = {}
 __source_normalized__ = False
-
-# SOURCES normalization & checks
-def _normalize_sources():
+def peers_get_config():
+    '''
+    SOURCES normalization & checks
+    '''
     global __source_normalized__
     if __source_normalized__:
-        return
+        return SOURCES
     logging.debug('SOURCES (raw):\n%s', pprint.pformat(SOURCES))
     for id4, settings in SOURCES.iteritems():
         assert len(id4) == 4, \
@@ -66,16 +70,19 @@ def _normalize_sources():
                     id4
                 port_remote = int(port_remote)
          
-            if port_local not in UDPIN_HLPORT:
-                UDPIN_HLPORT[port_local] = {}
-            UDPIN_HLPORT[port_local][host_remote, port_remote] \
-                 = id4, source_port_discreminate
             SOURCES[id4]['in'] = ('Udp', host_local, port_local, host_remote, port_remote, source_port_discreminate)
+
         elif protocol == 'serial':
             assert len(parameters) == 1, \
                 "SOURCES['%s']['in'] serial: protocol requires 1 parameter." % \
                 id4
-            SOURCES[id4]['in'] = ('Serial', parameters[0])
+            SOURCES[id4]['in'] = tuple(['Serial'] + parameters)
+
+        elif protocol == 'tcpout':
+            assert len(parameters) == 2, \
+                "SOURCES['%s']['in'] tcpout: protocol requires 2 parameters." % \
+                id4
+            SOURCES[id4]['in'] = tuple(['TcpOut'] + parameters)
 
         else:
             logging.error(
@@ -83,7 +90,8 @@ def _normalize_sources():
                 protocol, id4, in_)
     __source_normalized__ = True
     logging.debug('SOURCES (normalised):\n%s', pprint.pformat(SOURCES))
-    logging.debug('UDPIN_HLPORT:\n%s', pprint.pformat(UDPIN_HLPORT))
+    return SOURCES
+
 
 
 def source_get_infoin(*args):
@@ -93,10 +101,8 @@ def source_get_infoin(*args):
       might return [ ['SRC1', '12.23.34.45', '', False],
                      ['SRC2', '98.87.76.65', '', True]]
     '''
-    if not __source_normalized__:
-        _normalize_sources()
     result = []
-    for id4, source_info in SOURCES.iteritems():
+    for id4, source_info in peers_get_config().iteritems():
         rawin = source_info.get('in', None)
         if not rawin:
             continue
index 25cee021166b8b303472392f4711e283580ece2a..a5abe6ed57533fb8f5534eeb94a39b7a57213702 100755 (executable)
@@ -5,14 +5,29 @@
 This is the main runner for AIS inputs.
 '''
 
+import sys
 import logging
+from select import select
+from time import time as get_timestamp
+
+from ais.inputs.peers import peers_get_config
+from ais.inputs.stats import STATS_RATE
+from ais.inputs.common import refresh_all_stats
+from ais.inputs.virtual import ais_line_reader
+from ais.inputs.udp import UdpService
+from ais.inputs.serialin import SerialService
+from ais.inputs.tcpout import TcpOutService
+
 
 def main():
     from optparse import OptionParser
-    parser = OptionParser()
+    parser = OptionParser('%prog [options] SOURCE*')
     parser.add_option('-d', '--debug',
         help="debug mode",
         action='store_true', dest='debug', default=False)
+    parser.add_option('--stdout',
+        help="Print incoming packets to stdout",
+        action='store_true', dest='stdout', default=False)
     options, args = parser.parse_args()
 
     if options.debug:
@@ -23,8 +38,76 @@ def main():
         format='%(asctime)s %(levelname)s %(message)s')
     
     try:
-        # service.run()
-        pass
+        config = peers_get_config()
+        if len(config) == 0:
+            logging.critical('No source definition. Check your config file.')
+            return
+
+        # build a list of services that needs to be launched
+        all_service_desc = set()
+        for id4, settings in config.iteritems():
+            if args and id4 not in args:
+                continue
+            try:
+                configin = settings['in']
+            except KeyError:
+                continue
+            if configin[0] == 'Udp':
+                service_desc =  ( configin[0], configin[2] )
+                all_service_desc.add(service_desc)
+            elif configin[0] == 'Serial':
+                service_desc = ( configin[0], id4, configin[1] )
+                all_service_desc.add(service_desc)
+            elif configin[0] == 'TcpOut':
+                service_desc = ( configin[0], id4, configin[1], configin[2] ) 
+                all_service_desc.add(service_desc)
+            else:
+                logging.error('Unsupported protocol %s', configin[0])
+
+        logging.info('all_service_desc=%s', all_service_desc)
+        
+        all_services = set()
+        for service_desc in all_service_desc:
+            service_class_name = service_desc[0] + 'Service'
+            service_class = globals()[service_class_name]
+            logging.debug('-'*80)
+            logging.debug('Starting %s%s', service_class, service_desc[1:])
+            service = service_class(*service_desc[1:])
+            all_services.add(service)
+        logging.debug('-'*80)
+        
+        last_stat = get_timestamp()
+        while True:
+            active_services = select(all_services, (), ())[0]
+            logging.debug('active_services: %s', active_services)
+            for service in active_services:
+                from_, channel = service.get_activity()
+                timestamp = get_timestamp()
+                if channel is not None:
+                    for line in ais_line_reader(channel):
+                        channel.source.stats.nlines += 1
+                    
+                        if options.stdout:
+                            sys.stdout.write(line+'\r\n')
+
+                        # dump the line to file
+                        channel.source.logger.log_line(timestamp, from_, line)
+
+                        # forward the line
+                        channel.source.outpeers.send_line(line)
+                  
+                    # we don't want any stdout buffering
+                    if options.stdout:
+                        sys.stdout.flush()
+
+                # log statistics every STATS_RATE seconds
+                timestamp = get_timestamp()       
+                if timestamp - last_stat > STATS_RATE:
+                    refresh_all_stats()
+                    last_stat = timestamp
+                        
+
+
     except KeyboardInterrupt:
         logging.critical('Received SIGINT. Shuting down.')
 
index cd0b8f84d409edf06fd12cac945b36787f0e4548..f7b7c9be6fbda8645f77b1789cc25f1e917dff1a 100755 (executable)
@@ -60,11 +60,14 @@ class SerialService(Service):
     '''
     Service that listen to a ttyS device like /dev/ttyUSB0
     '''
-    def __init__(self, stdout, id4, serialname, speed=DEFAULT_SPEED):
+    def __init__(self, id4, serialname, speed=DEFAULT_SPEED):
         # TODO: wait for the request device to be available
-        Service.__init__(self, stdout)
+        #Service.__init__(self, stdout)
         self.channel = SerialChannel(id4, serialname, speed)
 
+    def __repr__(self):
+        return 'SerialService<%s>' % self.channel.name
+
     def fileno(self):
         '''
         Returns file descriptor of underlying socket, for os.select().
@@ -73,12 +76,11 @@ class SerialService(Service):
 
     def get_activity(self):
         '''
-        Pool the serial channel and yields when ready.
+        Pool the serial channel and returns when ready.
         '''
         channel = self.channel
-        while True:
-            channel.fill_buffer()
-            yield channel.name, channel
+        channel.fill_buffer()
+        return channel.name, channel
     
 
 def main():
@@ -113,9 +115,9 @@ def main():
     else:
         serialname = args[0]
 
-    service = SerialService(options.stdout, options.id4, serialname)
+    service = SerialService(options.id4, serialname)
     try:
-        service.run()   
+        service.run(options.stdout)
     except KeyboardInterrupt:
         logging.critical('Received SIGINT. Shuting down.')
 
index 7447036467947689e0d9e9ecdd50e941fb22f9ff..c58ac58a278e5f7d7e37b3a9784740e162d21acf 100755 (executable)
@@ -62,10 +62,14 @@ class TcpOutService(Service):
     '''
     Service that connects to a TCP server and listen for data there.
     '''
-    def __init__(self, stdout, id4, hostname, port):
-        Service.__init__(self, stdout)
+    def __init__(self, id4, hostname, port):
+        #Service.__init__(self, stdout)
         self.channel = TcpOutChannel(id4, hostname, port)
         
+    def __repr__(self):
+        return 'TcpOutService<%s,%s>' \
+            % (self.channel.hostname, self.channel.port)
+
     def fileno(self):
         '''
         Returns file descriptor of underlying socket, for os.select().
@@ -74,12 +78,11 @@ class TcpOutService(Service):
 
     def get_activity(self):
         '''
-        Pool the channel and yields when ready.
+        Pool the channel and returns information when ready.
         '''
         channel = self.channel
-        while True:
-            channel.fill_buffer()
-            yield channel.name, channel
+        channel.fill_buffer()
+        return channel.name, channel
     
 
 def main():
@@ -110,9 +113,9 @@ def main():
 
     hostname, port = args[0].split(':', 1)
     port = int(port)
-    service = TcpOutService(options.stdout, options.id4, hostname, port)
+    service = TcpOutService(options.id4, hostname, port)
     try:
-        service.run()
+        service.run(options.stdout)
     except KeyboardInterrupt:
         logging.critical('Received SIGINT. Shuting down.')
 
index 9779ccdf2c437e2b88d0a424145a9eb8210b3f66..21fd97aa89ef581e644aab9d86822b0d0de15ae9 100755 (executable)
@@ -41,8 +41,8 @@ class UdpService(Service):
     #TODO: Check there is not too many ports with sport_discriminate==True
     MAX_SPORT_DISCRIMINATE = 100
 
-    def __init__(self, stdout, serverport):
-        Service.__init__(self, stdout)
+    def __init__(self, serverport):
+        #Service.__init__(self, stdout)
         self.serverport = serverport
         self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
         self.sock.settimeout(STATS_RATE)
@@ -70,6 +70,8 @@ class UdpService(Service):
         if len(infoin) == 0:
             logging.error('No matching source. All information will be discarded')
 
+    def __repr__(self):
+        return 'UdpService<%s>' % self.serverport
 
     def fileno(self):
         '''
@@ -146,22 +148,21 @@ class UdpService(Service):
 
 
     def get_activity(self):
-        while True:
-            try:
-                data, recv_from_info = self.sock.recvfrom(DEFAULT_MTU)
-            except socket.timeout:
-                return
-            channel = self._get_channel_by_addr(recv_from_info)
-            if channel is None:
-                # Unauthorized
-                return
-            channel.source.stats.npackets += 1
-            channel.source.stats.nbytes += len(data)
-            channel.source.stats.nbytes_ethernet += len(data) + UDP_HEADER_SIZE
-            from_ = '%s:%s' % (formataddr(channel.addr), recv_from_info[1])
-            logging.debug('IN %s %s', from_, repr(data))
-            channel.data += data
-            yield from_, channel
+        try:
+            data, recv_from_info = self.sock.recvfrom(DEFAULT_MTU)
+        except socket.timeout:
+            return
+        channel = self._get_channel_by_addr(recv_from_info)
+        if channel is None:
+            # Unauthorized
+            return None, None
+        channel.source.stats.npackets += 1
+        channel.source.stats.nbytes += len(data)
+        channel.source.stats.nbytes_ethernet += len(data) + UDP_HEADER_SIZE
+        from_ = '%s:%s' % (formataddr(channel.addr), recv_from_info[1])
+        logging.debug('IN %s %s', from_, repr(data))
+        channel.data += data
+        return from_, channel
 
 
 def main():
@@ -195,10 +196,10 @@ def main():
         sys.exit(1)
     
     serverport = int(args[0])
-    service = UdpService(options.stdout, serverport)
+    service = UdpService(serverport)
 
     try:
-        service.run()   
+        service.run(options.stdout)
     except KeyboardInterrupt:
         logging.critical('Received SIGINT. Shuting down.')
 
index c9ce876a1538b290a59a7e78acbdee405e10337a..dee532228216e5b42aa323869857bafb14e8bee9 100644 (file)
@@ -22,11 +22,11 @@ class Service:
     This is a pure virtual class, an interface, that need to be subclassed.
     Example services are TcpOutService, UdpService, and SerialService.
     '''
-    def __init__(self, stdout):
-        '''
-        Set stdout = True to get a raw output suitable for piping.
-        '''
-        self.stdout = stdout
+    #def __init__(self, stdout):
+    #    '''
+    #    Set stdout = True to get a raw output suitable for piping.
+    #    '''
+    #    self.stdout = stdout
         
 
     def fileno(self):
@@ -39,27 +39,28 @@ class Service:
     def get_activity(self):
         '''
         All services must implement such a function.
-        It should yield tupples (from_, channel)
+        It should return a tuple (from_, channel)
         from_ is a low-level id of the source.
         channels contain a "data" buffer with bytes waiting to be processed,
           and a reference to a source.
         '''
         raise NotImplemented
 
-    def run(self):
+    def run(self, stdout):
         '''
         Service main loop.
         It shoudn't exit short of receiving SIGINT.
         '''
         last_stat = get_timestamp()
         while True:
-            for from_, channel in self.get_activity():
-                timestamp = get_timestamp()       
+            from_, channel = self.get_activity()
+            timestamp = get_timestamp()       
+            if channel is not None:
                 for line in ais_line_reader(channel):
                     channel.source.stats.nlines += 1
 
                     # write to stdout if asked
-                    if self.stdout:
+                    if stdout:
                         sys.stdout.write(line+'\r\n')
 
                     # dump the line to file
@@ -69,21 +70,14 @@ class Service:
                     channel.source.outpeers.send_line(line)
                   
                 # we don't want any stdout buffering
-                if self.stdout:
+                if stdout:
                     sys.stdout.flush()
 
-                # log statistics every STATS_RATE seconds
-                if timestamp - last_stat > STATS_RATE:
-                    refresh_all_stats()
-                    last_stat = timestamp
-
-            else:
-                # A timeout occured
-                # log statistics every STATS_RATE seconds
-                timestamp = get_timestamp()       
-                if timestamp - last_stat > STATS_RATE:
-                    refresh_all_stats()
-                    last_stat = timestamp
+            # log statistics every STATS_RATE seconds
+            if timestamp - last_stat > STATS_RATE:
+                refresh_all_stats()
+                last_stat = timestamp
+
 
 
 #TODO: create an interface Channel and put ais_line_reader in