Functional inputs.run multiservice launcher. But it still mixes all the raw streams...
[ais.git] / bin / inputs / outpeer.py
1 # -*- coding: utf-8 -*-
2 '''
3 UDP out peers module
4 '''
5
6 __all__ = [ 'OutPeers', 'outpeers_from_config' ]
7
8
9 import logging
10 from time import time as get_timestamp
11 import socket
12
13 class OutPeer:
14     '''
15     UDP output feed.
16     '''
17     def __init__(self, str_addrport):
18         colpos = str_addrport.rfind(':')
19         if colpos == 1:
20             assert colpos != 1, \
21                 'Missing column in host:port string for %s' % str_addrport
22         self.host = str_addrport[:colpos]
23         if self.host.startswith('['):
24             assert self.host[-1] == ']'
25             self.host = self.host[1:-1]
26         self.port = int(str_addrport[colpos+1:])
27         self.possibles_addrinfo = None
28         self.sock = None
29         self.addr = None
30         self.timestamp_getaddrinfo = 0
31         self._reinit_socket()
32
33     def __repr__(self):
34         return '%s:%s' % (self.host, self.port)
35
36     def _reinit_socket(self):
37         '''
38         Remove last known addrinfo for that host:port, if any.
39         If addrinfo is empty, fetch a new one.
40         Initialise self.sock and self.addr
41         '''
42         while True:
43             if self.possibles_addrinfo:
44                 # remove current (0th) addrinfo
45                 del self.possibles_addrinfo[0]
46
47             if not self.possibles_addrinfo:
48                 timestamp = get_timestamp()
49                 if timestamp > self.timestamp_getaddrinfo + 60:
50                     try:
51                         logging.debug('Calling socket.getaddrinfo for %s:%s',
52                             self.host, self.port)
53                         self.possibles_addrinfo = socket.getaddrinfo(self.host,
54                             self.port, socket.AF_UNSPEC, socket.SOCK_DGRAM)
55                         logging.info('%s possibles addrinfo for %s:%s.', 
56                             len(self.possibles_addrinfo), self.host, self.port)
57                         logging.debug('possibles_addrinfo: %s ',
58                             self.possibles_addrinfo)
59                     except socket.gaierror, err:
60                         logging.error("Can't resolve %s:%s: %s. "
61                             "Will retry in a minute.",
62                             self.host, self.port, err)
63                         return False
64                     finally:
65                         self.timestamp_getaddrinfo = timestamp
66                     if not self.possibles_addrinfo:
67                         logging.error("Can't resolve %s:%s. "
68                             "Will retry in a minute.",
69                             self.host, self.port)
70                         return False
71                 else:
72                     return False # waiting 
73
74             addrinfo = self.possibles_addrinfo[0]
75             logging.info('Prepared socket to %s', addrinfo)
76             af_, socktype, proto = addrinfo[:3]
77             self.addr = addrinfo[4]
78             try:
79                 self.sock = socket.socket(af_, socktype, proto)
80                 return True
81             except socket.error, msg:
82                 logging.error("Can't open socket to %s: %s", self.addr, msg)
83         
84     def send_line(self, line):
85         '''
86         Sends a AIVDM line to that peer.
87         CRLF will be added automatically.
88         '''
89         while self.sock or self._reinit_socket():
90             try:
91                 self.sock.sendto(line+'\r\n', self.addr)
92                 return
93             except socket.error, err:
94                 logging.error('socket.sendto(%s) failed: %s', self.addr, err)
95                 self.sock = None
96
97
98 class OutPeers:
99     '''
100     A group of UDP peers for output.
101     '''
102     def __init__(self):
103         self.peers = []
104         self.sock = None
105
106     def __repr__(self):
107         return '(' + ', '.join([ repr(peer) for peer in self.peers]) + ')'
108         
109     def send_line(self, line):
110         '''
111         Send a line to all the output peers registered.
112         '''
113         for outpeer in self.peers:
114             logging.debug('OUT %s %s', outpeer, repr(line))
115             outpeer.send_line(line)
116
117 def outpeers_from_config(list_from_config):
118     '''
119     Just give the list of strings from config.
120     Returns an OutPeers object.
121     '''
122     outpeers = OutPeers()
123     for str_config in list_from_config:
124         assert str_config.startswith('udp:'), \
125                'Unsupported output peer protocol %s' % str_config
126         peer = OutPeer(str_config[4:])
127         outpeers.peers.append(peer)
128     return outpeers