'formataddr',
'aivdm_log_line',
'InStatsRate', 'InStats',
- 'ais_line_reader' ]
+ 'ais_line_reader']
import os.path
import sys
from time import time as get_timestamp
import rrdtool
-
+
NMEA_DIR = '/var/lib/ais/nmea'
STATS_DIR = '/var/lib/ais/stats'
Catches 'No such file or directory' and create it if necessary.
'''
logging.info('%.1f packets/s - %.1f AIVDM lines/s - '
- '%.0f bps payload - %.0f bps internet',
- self.rpackets, self.rlines,
- 8*self.rbytes, 8*self.rbytes_ethernet)
+ '%.0f bps payload - %.0f bps internet',
+ self.rpackets, self.rlines,
+ 8*self.rbytes, 8*self.rbytes_ethernet)
logging.debug('rrdtool <- %s:%s:%s:%s:%s' \
- % (int(self.timestamp),
- self.rpackets, self.rlines,
- self.rbytes, self.rbytes_ethernet))
+ % (int(self.timestamp),
+ self.rpackets, self.rlines,
+ self.rbytes, self.rbytes_ethernet))
try:
self._save(filename)
except rrdtool.error, err:
continue # ignore empty line
yield line
+
+
--- /dev/null
+# -*- coding: utf-8 -*-
+'''
+UDP out peers module
+'''
+
+__all__ = [ 'OutPeers' ]
+
+
+import logging
+from time import time as get_timestamp
+import socket
+
+class OutPeer:
+ '''
+ UDP output feed.
+ '''
+ def __init__(self, str_addrport):
+ colpos = str_addrport.rfind(':')
+ if colpos == 1:
+ assert colpos != 1, \
+ 'Missing column in host:port string for %s' % str_addrport
+ self.host = str_addrport[:colpos]
+ if self.host.startswith('['):
+ assert self.host[-1] == ']'
+ self.host = self.host[1:-1]
+ self.port = int(str_addrport[colpos+1:])
+ self.possibles_addrinfo = None
+ self.sock = None
+ self.addr = None
+ self.timestamp_getaddrinfo = 0
+ self._reinit_socket()
+
+
+ def _reinit_socket(self):
+ '''
+ Remove last known addrinfo for that host:port, if any.
+ If addrinfo is empty, fetch a new one.
+ Initialise self.sock and self.addr
+ '''
+ while True:
+ if self.possibles_addrinfo:
+ # remove current (0th) addrinfo
+ del self.possibles_addrinfo[0]
+
+ if not self.possibles_addrinfo:
+ timestamp = get_timestamp()
+ if timestamp > self.timestamp_getaddrinfo + 60:
+ try:
+ logging.debug('Calling socket.getaddrinfo for %s:%s',
+ self.host, self.port)
+ self.possibles_addrinfo = socket.getaddrinfo(self.host,
+ self.port, socket.AF_UNSPEC, socket.SOCK_DGRAM)
+ logging.info('%s possibles addrinfo for %s:%s.',
+ len(self.possibles_addrinfo), self.host, self.port)
+ logging.debug('possibles_addrinfo: %s ',
+ self.possibles_addrinfo)
+ except socket.gaierror, err:
+ logging.error("Can't resolve %s:%s: %s."
+ "Will retry in a minute.",
+ self.host, self.port, err)
+ return False
+ finally:
+ self.timestamp_getaddrinfo = timestamp
+ if not self.possibles_addrinfo:
+ logging.error("Can't resolve %s:%s."
+ "Will retry in a minute.",
+ self.host, self.port)
+ return False
+ else:
+ return False # waiting
+
+ addrinfo = self.possibles_addrinfo[0]
+ logging.info('Prepared socket to %s', addrinfo)
+ af, socktype, proto = addrinfo[:3]
+ self.addr = addrinfo[4]
+ try:
+ self.sock = socket.socket(af, socktype, proto)
+ return True
+ except socket.error, msg:
+ logging.error("Can't open socket to %s: %s", self.addr, msg)
+
+ def send_line(self, line):
+ '''
+ Sends a AIVDM line to that peer.
+ CRLF will be added automatically.
+ '''
+ while self.sock or self._reinit_socket():
+ try:
+ self.sock.sendto(line+'\r\n', self.addr)
+ return
+ except socket.error, err:
+ logging.error('socket.sendto(%s) failed: %s', self.addr, err)
+ self.sock = None
+
+
+class OutPeers:
+ '''
+ A group of UDP peers for output.
+ '''
+ def __init__(self, list_from_optparse):
+ '''
+ Just give the list for strings from optparse.
+ '''
+ self.peers = {}
+ self.sock = None
+ for str_addrport in list_from_optparse:
+ peer = OutPeer(str_addrport)
+ self.peers[peer] = 0
+
+ def send_line(self, line):
+ '''
+ Send a line to all the output peers registered.
+ '''
+ logging.debug('OUT %s', repr(line+'\r\n'))
+ for outpeer, stats in self.peers.iteritems():
+ #logging.debug('OUT %s:%s %s',
+ # formataddr(outpeer[0]), outpeer[1],
+ # repr(line+'\r\n'))
+ outpeer.send_line(line)
+ self.peers[outpeer] += 1
from time import time as get_timestamp
import serial
import logging
-import socket
-from common import STATS_RATE, aivdm_log_line, formataddr, InStats, \
- ais_line_reader
+from common import STATS_RATE, aivdm_log_line, InStats, ais_line_reader
+from outpeer import OutPeers
class SerialSource:
logging.debug('IN %s %s', self.name, repr(data))
self.data += data
+#class SerialNamedSource:
+# def __init__(self):
+# pass
def main():
from optparse import OptionParser
logging.basicConfig(level=loglevel,
format='%(asctime)s %(levelname)s %(message)s')
- outpeers = {}
- for peer in options.peers:
- colpos = peer.rfind(':')
- if colpos == 1:
- print >> sys.stderr, 'Missing column in host:port string'
- host = peer[:colpos]
- if host.startswith('['):
- assert host[-1] == ']'
- host = host[1:-1]
- port = int(peer[colpos+1:])
- outpeers[(host, port)] = 0
+ outpeers = OutPeers(options.peers)
if options.id4:
assert len(options.id4)==4, 'ID4 must be 4 characters long.'
source = SerialSource(options.id4, serialname)
- # shipplotter doesn't support ipv6 with wine:
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- #sock.bind(('::1', serverport))
-
last_stat = get_timestamp()
try:
while True:
aivdm_log_line(source.id4, timestamp, serialname, line)
# forward the line
- logging.debug('OUT %s', repr(line+'\r\n'))
- for outpeer, stats in outpeers.iteritems():
- logging.debug('OUT %s:%s %s',
- formataddr(outpeer[0]), outpeer[1],
- repr(line+'\r\n'))
- # FIXME: don't resolve hostname every time!
- try:
- sock.sendto(line+'\r\n', outpeer)
- except socket.error, err:
- # ignore socket.error: (101, 'Network is unreachable')
- # FIXME: don't flood with these messages....
- logging.error('socket.sendto failed: %s', err)
- outpeers[outpeer] += 1
+ outpeers.send_line(line)
if options.stdout:
sys.stdout.write(line+'\r\n')
import socket
from common import STATS_RATE, aivdm_log_line, InStats, ais_line_reader
+from outpeer import OutPeers
TCP_HEADER_SIZE = 52
parser.add_option('-d', '--debug',
help="debug mode",
action='store_true', dest='debug', default=False)
+ parser.add_option('-f', '--forward',
+ help="forward information to this peer.\nEg [::ffff:12.23.34.45]:1234",
+ action='append', dest='peers', default=[])
parser.add_option('--id4',
help='4 letter string for identifying the source.',
action='store', type='str', dest='id4', default=None)
loglevel = logging.DEBUG
else:
loglevel = logging.INFO
- logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
+ logging.basicConfig(level=loglevel,
+ format='%(asctime)s %(levelname)s %(message)s')
+
+ outpeers = OutPeers(options.peers)
if options.id4:
assert len(options.id4)==4, 'ID4 must be 4 characters long.'
# dump the line to file
if source.id4:
aivdm_log_line(source.id4, timestamp, from_, line)
-
+
+ # forward the line
+ outpeers.send_line(line)
+
+ # we don't want any stdout buffering
+ if options.stdout:
+ sys.stdout.flush()
+
# log statistics every STATS_RATE seconds
if timestamp - last_stat > STATS_RATE:
rates = source.stats.get_rates()
import socket
from common import STATS_RATE, aivdm_log_line, formataddr, InStats, InStatsRate, ais_line_reader
+from outpeer import OutPeers
UDP_HEADER_SIZE = 28
DEFAULT_MTU = 1500
logging.basicConfig(level=loglevel,
format='%(asctime)s %(levelname)s %(message)s')
- outpeers = {}
- for peer in options.peers:
- colpos = peer.rfind(':')
- if colpos == 1:
- print >> sys.stderr, 'Missing column in host:port string'
- host = peer[:colpos]
- if host.startswith('['):
- assert host[-1] == ']'
- host = host[1:-1]
- port = int(peer[colpos+1:])
- outpeers[(host, port)] = 0
+ outpeers = OutPeers(options.peers)
if options.id4:
assert len(options.id4)==4, 'ID4 must be 4 characters long.'
aivdm_log_line(options.id4, timestamp, from_, line)
# forward the line
- logging.debug('OUT %s', repr(line+'\r\n'))
- for outpeer, stats in outpeers.iteritems():
- #logging.debug('OUT %s:%s %s',
- # formataddr(outpeer[0]), outpeer[1],
- # repr(line+'\r\n'))
- sock.sendto(line+'\r\n', outpeer)
- outpeers[outpeer] += 1
+ outpeers.send_line(line)
# we don't want any stdout buffering
if options.stdout: