2 # -*- coding: utf-8 -*-
5 This is the main runner for AIS inputs.
8 from __future__ import division
11 import logging, logging.handlers
12 from select import select
13 from time import time as get_timestamp
15 from ais.inputs.config import peers_get_config
16 from ais.inputs.stats import STATS_RATE
17 from ais.inputs.common import refresh_all_stats, Source
18 from ais.inputs.udp import UdpServiceIn
19 from ais.inputs.serialin import SerialServiceIn
20 from ais.inputs.tcpout import TcpOutServiceIn
21 from ais.gpsdec import AivdmProcessor
22 from ais.inputs.outpeer import TcpInServiceOut, tcpin_outpeers
24 def mainloop(options, args):
26 loglevel = logging.DEBUG
28 loglevel = logging.INFO
30 rootlogger = logging.getLogger('')
31 rootlogger.setLevel(loglevel)
33 # create file handler which logs even debug messages
34 #utc=True is python 2.6 specific
35 #fh = logging.handlers.TimedRotatingFileHandler('/var/log/ais/daemon', when='D', backupCount=7, utc=True)
36 fh = logging.handlers.TimedRotatingFileHandler('/var/log/ais/daemon', when='D', backupCount=7, utc=True)
37 #fh.setLevel(logging.DEBUG)
39 # create console handler with a higher log level
40 #ch = logging.StreamHandler()
41 #ch.setLevel(logging.ERROR)
43 # create formatter and add it to the handlers
44 # format='%(asctime)s %(levelname)s %(message)s')
45 formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
46 fh.setFormatter(formatter)
47 #ch.setFormatter(formatter)
49 # add the handlers to the logger
50 rootlogger.addHandler(fh)
51 #rootlogger.addHandler(ch)
53 #rootlogger.addHandler(loghandler)
55 if options.tcp_listeners_port:
56 tcpinoutpeerlisten = TcpInServiceOut(options.tcp_listeners_port)
57 tcpinoutpeerlisten.start()
60 config = peers_get_config()
62 logging.critical('No source definition. Check your config file.')
65 # build a list of input services that needs to be launched
66 all_servicein_desc = set()
67 for id4, settings in config.iteritems():
68 if args and id4 not in args:
71 configin = settings['in']
74 Source(id4) # make sure the source/stats is created
75 if configin[0] == 'Udp':
76 servicein_desc = ( configin[0], configin[1], configin[2] )
77 elif configin[0] == 'Serial':
78 servicein_desc = ( configin[0], id4, configin[1] )
79 elif configin[0] == 'TcpOut':
80 servicein_desc = ( configin[0], id4, configin[1], configin[2] )
82 logging.error('Unsupported protocol %s', configin[0])
84 all_servicein_desc.add(servicein_desc)
86 logging.info('all_servicein_desc=%s', all_servicein_desc)
89 for servicein_desc in all_servicein_desc:
90 service_class_name = servicein_desc[0] + 'ServiceIn'
91 service_class = globals()[service_class_name]
93 logging.debug('Starting %s%s', service_class, servicein_desc[1:])
94 service = service_class(*servicein_desc[1:])
95 all_servicein.add(service)
98 last_stat = get_timestamp()
100 active_services = select(all_servicein, (), (), STATS_RATE)[0]
101 logging.debug('active_services: %s', active_services)
102 for service in active_services:
103 from_, channel = service.get_activity()
104 timestamp = get_timestamp()
105 if channel is not None:
106 for line in channel.ais_line_reader():
107 channel.source.stats.nlines += 1
110 sys.stdout.write(line+'\r\n')
112 # dump the line to file
113 channel.source.logger.log_line(timestamp, from_, line)
115 # process the line internally
117 proc = channel.aivdm_processor
118 except AttributeError:
119 channel.aivdm_processor = AivdmProcessor(options.todb)
120 proc = channel.aivdm_processor
121 decoded_info = proc.process_aivdm_line(line, channel.source.id4)
125 channel.source.outpeers.send_line(line, decoded_info)
126 tcpin_outpeers.send_line(line, decoded_info)
128 # we don't want any stdout buffering
132 # log statistics every STATS_RATE seconds
133 timestamp = get_timestamp()
134 if timestamp - last_stat > STATS_RATE:
136 last_stat = timestamp
138 except KeyboardInterrupt:
139 logging.critical('Received SIGINT. Shuting down.')
141 if options.tcp_listeners_port:
142 tcpinoutpeerlisten.shutdown()
143 for outpeer in tcpin_outpeers.safe_iter():
147 from optparse import OptionParser
148 parser = OptionParser('%prog [options] SOURCE*')
149 parser.add_option('-d', '--debug',
151 action='store_true', dest='debug', default=False)
152 parser.add_option('--stdout',
153 help="Print incoming packets to stdout",
154 action='store_true', dest='stdout', default=False)
155 parser.add_option('--background',
156 help='Run in the background',
157 action='store_true', dest='daemonize', default=False)
158 parser.add_option('--db',
159 help='Process packets locally',
160 action='store_true', dest='todb', default=False)
161 parser.add_option('--log-stderr',
162 help='Log stderr output to /var/log/ais/deamon.stderr\n'
163 'Only works when --background',
164 action='store_true', dest='log_stderr', default=False)
165 parser.add_option('--tcp-listeners-port',
166 help='TCP global OutPeer port',
167 action='store', type=int, dest='tcp_listeners_port')
168 options, args = parser.parse_args()
171 if options.daemonize:
172 from daemon import DaemonContext
174 if options.log_stderr:
175 stderr = file('/var/log/ais/daemon.stderr', 'w+', 0664)
178 #pidfile = lockfile.FileLock('/var/run/ais/input.pid')
179 with DaemonContext(stdout=stderr, stderr=stderr, umask=002):#, pidfile=pidfile):
180 mainloop(options, args)
182 mainloop(options, args)
184 if __name__ == '__main__':