461036555083e1d91c19eb8c2e2287b83f7d209b
[ais.git] / bin / inputs / run.py
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3
4 '''
5 This is the main runner for AIS inputs.
6 '''
7
8 from __future__ import division
9 #import os
10 import sys
11 import logging, logging.handlers
12 from select import select
13 from time import time as get_timestamp
14
15 from ais.inputs.config import peers_get_config
16 from ais.inputs.stats import STATS_RATE
17 from ais.inputs.common import refresh_all_stats, Source
18 from ais.inputs.udp import UdpService
19 from ais.inputs.serialin import SerialService
20 from ais.inputs.tcpout import TcpOutService
21 from ais.gpsdec import AivdmProcessor
22
23
24 def mainloop(options, args):
25     if options.debug:
26         loglevel = logging.DEBUG
27     else:
28         loglevel = logging.INFO
29
30     rootlogger = logging.getLogger('')
31     rootlogger.setLevel(loglevel)
32     
33     # create file handler which logs even debug messages
34     #utc=True is python 2.6 specific
35     #fh = logging.handlers.TimedRotatingFileHandler('/var/log/ais/daemon', when='D', backupCount=7, utc=True)
36     fh = logging.handlers.TimedRotatingFileHandler('/var/log/ais/daemon', when='D', backupCount=7, utc=True)
37     #fh.setLevel(logging.DEBUG)
38
39     # create console handler with a higher log level
40     #ch = logging.StreamHandler()
41     #ch.setLevel(logging.ERROR)
42
43     # create formatter and add it to the handlers
44     #    format='%(asctime)s %(levelname)s %(message)s')
45     formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
46     fh.setFormatter(formatter)
47     #ch.setFormatter(formatter)
48
49     # add the handlers to the logger
50     rootlogger.addHandler(fh)
51     #rootlogger.addHandler(ch)
52
53     #rootlogger.addHandler(loghandler)
54     
55     try:
56         config = peers_get_config()
57         if len(config) == 0:
58             logging.critical('No source definition. Check your config file.')
59             return
60
61         # build a list of services that needs to be launched
62         all_service_desc = set()
63         for id4, settings in config.iteritems():
64             if args and id4 not in args:
65                 continue
66             try:
67                 configin = settings['in']
68             except KeyError:
69                 continue
70             Source(id4) # make sure the source/stats is created
71             if configin[0] == 'Udp':
72                 service_desc =  ( configin[0], configin[1], configin[2] )
73             elif configin[0] == 'Serial':
74                 service_desc = ( configin[0], id4, configin[1] )
75             elif configin[0] == 'TcpOut':
76                 service_desc = ( configin[0], id4, configin[1], configin[2] ) 
77             else:
78                 logging.error('Unsupported protocol %s', configin[0])
79                 continue
80             all_service_desc.add(service_desc)
81
82         logging.info('all_service_desc=%s', all_service_desc)
83         
84         all_services = set()
85         for service_desc in all_service_desc:
86             service_class_name = service_desc[0] + 'Service'
87             service_class = globals()[service_class_name]
88             logging.debug('-'*80)
89             logging.debug('Starting %s%s', service_class, service_desc[1:])
90             service = service_class(*service_desc[1:])
91             all_services.add(service)
92         logging.debug('-'*80)
93         
94         last_stat = get_timestamp()
95         while True:
96             active_services = select(all_services, (), (), STATS_RATE)[0]
97             logging.debug('active_services: %s', active_services)
98             for service in active_services:
99                 from_, channel = service.get_activity()
100                 timestamp = get_timestamp()
101                 if channel is not None:
102                     for line in channel.ais_line_reader():
103                         channel.source.stats.nlines += 1
104                     
105                         if options.stdout:
106                             sys.stdout.write(line+'\r\n')
107
108                         # dump the line to file
109                         channel.source.logger.log_line(timestamp, from_, line)
110
111                         # process the line internally (WIP)
112                         if options.todb:
113                             try:
114                                 proc = channel.aivdm_processor
115                             except AttributeError:
116                                 channel.aivdm_processor = AivdmProcessor()
117                                 proc = channel.aivdm_processor
118                             proc.process_aivdm_line(line, channel.source.id4)
119
120                         # forward the line
121                         channel.source.outpeers.send_line(line)
122                   
123                     # we don't want any stdout buffering
124                     if options.stdout:
125                         sys.stdout.flush()
126
127             # log statistics every STATS_RATE seconds
128             timestamp = get_timestamp()       
129             if timestamp - last_stat > STATS_RATE:
130                 refresh_all_stats()
131                 last_stat = timestamp
132                         
133
134
135     except KeyboardInterrupt:
136         logging.critical('Received SIGINT. Shuting down.')
137         raise
138
139 def main():
140     from optparse import OptionParser
141     parser = OptionParser('%prog [options] SOURCE*')
142     parser.add_option('-d', '--debug',
143         help='Debug mode',
144         action='store_true', dest='debug', default=False)
145     parser.add_option('--stdout',
146         help="Print incoming packets to stdout",
147         action='store_true', dest='stdout', default=False)
148     parser.add_option('--background',
149         help='Run in the background',
150         action='store_true', dest='daemonize', default=False)
151     parser.add_option('--db',
152         help='Process packets locally',
153         action='store_true', dest='todb', default=False)
154     options, args = parser.parse_args()
155
156
157     if options.daemonize:
158         from daemon import DaemonContext
159         #import lockfile
160         stderr = file('/var/log/ais/daemon.stderr', 'w+', 0664)
161         #pidfile = lockfile.FileLock('/var/run/ais/input.pid')
162         with DaemonContext(stdout=stderr, stderr=stderr, umask=002):#, pidfile=pidfile):
163             mainloop(options, args)
164     else:
165         mainloop(options, args)
166
167 if __name__ == '__main__':
168     main()