Functional inputs.run multiservice launcher. But it still mixes all the raw streams...
[ais.git] / bin / inputs / run.py
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3
4 '''
5 This is the main runner for AIS inputs.
6 '''
7
8 import sys
9 import logging
10 from select import select
11 from time import time as get_timestamp
12
13 from ais.inputs.peers import peers_get_config
14 from ais.inputs.stats import STATS_RATE
15 from ais.inputs.common import refresh_all_stats
16 from ais.inputs.virtual import ais_line_reader
17 from ais.inputs.udp import UdpService
18 from ais.inputs.serialin import SerialService
19 from ais.inputs.tcpout import TcpOutService
20
21
22 def main():
23     from optparse import OptionParser
24     parser = OptionParser('%prog [options] SOURCE*')
25     parser.add_option('-d', '--debug',
26         help="debug mode",
27         action='store_true', dest='debug', default=False)
28     parser.add_option('--stdout',
29         help="Print incoming packets to stdout",
30         action='store_true', dest='stdout', default=False)
31     options, args = parser.parse_args()
32
33     if options.debug:
34         loglevel = logging.DEBUG
35     else:
36         loglevel = logging.INFO
37     logging.basicConfig(level=loglevel,
38         format='%(asctime)s %(levelname)s %(message)s')
39     
40     try:
41         config = peers_get_config()
42         if len(config) == 0:
43             logging.critical('No source definition. Check your config file.')
44             return
45
46         # build a list of services that needs to be launched
47         all_service_desc = set()
48         for id4, settings in config.iteritems():
49             if args and id4 not in args:
50                 continue
51             try:
52                 configin = settings['in']
53             except KeyError:
54                 continue
55             if configin[0] == 'Udp':
56                 service_desc =  ( configin[0], configin[2] )
57                 all_service_desc.add(service_desc)
58             elif configin[0] == 'Serial':
59                 service_desc = ( configin[0], id4, configin[1] )
60                 all_service_desc.add(service_desc)
61             elif configin[0] == 'TcpOut':
62                 service_desc = ( configin[0], id4, configin[1], configin[2] ) 
63                 all_service_desc.add(service_desc)
64             else:
65                 logging.error('Unsupported protocol %s', configin[0])
66
67         logging.info('all_service_desc=%s', all_service_desc)
68         
69         all_services = set()
70         for service_desc in all_service_desc:
71             service_class_name = service_desc[0] + 'Service'
72             service_class = globals()[service_class_name]
73             logging.debug('-'*80)
74             logging.debug('Starting %s%s', service_class, service_desc[1:])
75             service = service_class(*service_desc[1:])
76             all_services.add(service)
77         logging.debug('-'*80)
78         
79         last_stat = get_timestamp()
80         while True:
81             active_services = select(all_services, (), ())[0]
82             logging.debug('active_services: %s', active_services)
83             for service in active_services:
84                 from_, channel = service.get_activity()
85                 timestamp = get_timestamp()
86                 if channel is not None:
87                     for line in ais_line_reader(channel):
88                         channel.source.stats.nlines += 1
89                     
90                         if options.stdout:
91                             sys.stdout.write(line+'\r\n')
92
93                         # dump the line to file
94                         channel.source.logger.log_line(timestamp, from_, line)
95
96                         # forward the line
97                         channel.source.outpeers.send_line(line)
98                   
99                     # we don't want any stdout buffering
100                     if options.stdout:
101                         sys.stdout.flush()
102
103                 # log statistics every STATS_RATE seconds
104                 timestamp = get_timestamp()       
105                 if timestamp - last_stat > STATS_RATE:
106                     refresh_all_stats()
107                     last_stat = timestamp
108                         
109
110
111     except KeyboardInterrupt:
112         logging.critical('Received SIGINT. Shuting down.')
113
114 if __name__ == '__main__':
115     main()