87545aeb3d2ce060a8906192bd220255cd51af17
[ais.git] / bin / inputs / run.py
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3
4 '''
5 This is the main runner for AIS inputs.
6 '''
7
8 from __future__ import division
9 #import os
10 import sys
11 import logging, logging.handlers
12 from select import select
13 from time import time as get_timestamp
14
15 from ais.inputs.config import peers_get_config
16 from ais.inputs.stats import STATS_RATE
17 from ais.inputs.common import refresh_all_stats, Source
18 from ais.inputs.udp import UdpServiceIn
19 from ais.inputs.serialin import SerialServiceIn
20 from ais.inputs.tcpout import TcpOutServiceIn
21 from ais.gpsdec import AivdmProcessor
22 from ais.inputs.outpeer import TcpInServiceOut, tcpin_outpeers
23
24 def mainloop(options, args):
25     if options.debug:
26         loglevel = logging.DEBUG
27     else:
28         loglevel = logging.INFO
29
30     rootlogger = logging.getLogger('')
31     rootlogger.setLevel(loglevel)
32     
33     # create file handler which logs even debug messages
34     #utc=True is python 2.6 specific
35     #fh = logging.handlers.TimedRotatingFileHandler('/var/log/ais/daemon', when='D', backupCount=7, utc=True)
36     fh = logging.handlers.TimedRotatingFileHandler('/var/log/ais/daemon', when='D', backupCount=7, utc=True)
37     #fh.setLevel(logging.DEBUG)
38
39     # create console handler with a higher log level
40     #ch = logging.StreamHandler()
41     #ch.setLevel(logging.ERROR)
42
43     # create formatter and add it to the handlers
44     #    format='%(asctime)s %(levelname)s %(message)s')
45     formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
46     fh.setFormatter(formatter)
47     #ch.setFormatter(formatter)
48
49     # add the handlers to the logger
50     rootlogger.addHandler(fh)
51     #rootlogger.addHandler(ch)
52
53     #rootlogger.addHandler(loghandler)
54     
55     if options.tcp_listeners_port:
56         tcpinoutpeerlisten = TcpInServiceOut(options.tcp_listeners_port)
57         tcpinoutpeerlisten.start()
58
59     try:
60         config = peers_get_config()
61         if len(config) == 0:
62             logging.critical('No source definition. Check your config file.')
63             return
64
65         # build a list of input services that needs to be launched
66         all_servicein_desc = set()
67         for id4, settings in config.iteritems():
68             if args and id4 not in args:
69                 continue
70             try:
71                 configin = settings['in']
72             except KeyError:
73                 continue
74             Source(id4) # make sure the source/stats is created
75             if configin[0] == 'Udp':
76                 servicein_desc =  ( configin[0], configin[1], configin[2] )
77             elif configin[0] == 'Serial':
78                 servicein_desc = ( configin[0], id4, configin[1] )
79             elif configin[0] == 'TcpOut':
80                 servicein_desc = ( configin[0], id4, configin[1], configin[2] ) 
81             else:
82                 logging.error('Unsupported protocol %s', configin[0])
83                 continue
84             all_servicein_desc.add(servicein_desc)
85
86         logging.info('all_servicein_desc=%s', all_servicein_desc)
87         
88         all_servicein = set()
89         for servicein_desc in all_servicein_desc:
90             service_class_name = servicein_desc[0] + 'ServiceIn'
91             service_class = globals()[service_class_name]
92             logging.debug('-'*80)
93             logging.debug('Starting %s%s', service_class, servicein_desc[1:])
94             service = service_class(*servicein_desc[1:])
95             all_servicein.add(service)
96         logging.debug('-'*80)
97         
98         last_stat = get_timestamp()
99         while True:
100             active_services = select(all_servicein, (), (), STATS_RATE)[0]
101             logging.debug('active_services: %s', active_services)
102             for service in active_services:
103                 from_, channel = service.get_activity()
104                 timestamp = get_timestamp()
105                 if channel is not None:
106                     for line in channel.ais_line_reader():
107                         channel.source.stats.nlines += 1
108                     
109                         if options.stdout:
110                             sys.stdout.write(line+'\r\n')
111
112                         # dump the line to file
113                         channel.source.logger.log_line(timestamp, from_, line)
114
115                         # process the line internally
116                         try:
117                             proc = channel.aivdm_processor
118                         except AttributeError:
119                             channel.aivdm_processor = AivdmProcessor(options.todb)
120                             proc = channel.aivdm_processor
121                         decoded_info = proc.process_aivdm_line(line, channel.source.id4)
122
123                         # forward the line
124                         if decoded_info:
125                             channel.source.outpeers.send_line(line, decoded_info)
126                             tcpin_outpeers.send_line(line, decoded_info)
127                   
128                     # we don't want any stdout buffering
129                     if options.stdout:
130                         sys.stdout.flush()
131
132             # log statistics every STATS_RATE seconds
133             timestamp = get_timestamp()       
134             if timestamp - last_stat > STATS_RATE:
135                 refresh_all_stats()
136                 last_stat = timestamp
137                         
138     except KeyboardInterrupt:
139         logging.critical('Received SIGINT. Shuting down.')
140
141     if options.tcp_listeners_port:
142         tcpinoutpeerlisten.shutdown()
143         for outpeer in tcpin_outpeers.safe_iter():
144             outpeer.shutdown()
145
146 def main():
147     from optparse import OptionParser
148     parser = OptionParser('%prog [options] SOURCE*')
149     parser.add_option('-d', '--debug',
150         help='Debug mode',
151         action='store_true', dest='debug', default=False)
152     parser.add_option('--stdout',
153         help="Print incoming packets to stdout",
154         action='store_true', dest='stdout', default=False)
155     parser.add_option('--background',
156         help='Run in the background',
157         action='store_true', dest='daemonize', default=False)
158     parser.add_option('--db',
159         help='Process packets locally',
160         action='store_true', dest='todb', default=False)
161     parser.add_option('--tcp-listeners-port',
162         help='TCP global OutPeer port',
163         action='store', type=int, dest='tcp_listeners_port')
164     options, args = parser.parse_args()
165
166
167     if options.daemonize:
168         from daemon import DaemonContext
169         #import lockfile
170         stderr = file('/var/log/ais/daemon.stderr', 'w+', 0664)
171         #pidfile = lockfile.FileLock('/var/run/ais/input.pid')
172         with DaemonContext(stdout=stderr, stderr=stderr, umask=002):#, pidfile=pidfile):
173             mainloop(options, args)
174     else:
175         mainloop(options, args)
176
177 if __name__ == '__main__':
178     main()