Replace peer-based hidemmsi option by a global list of mmsi in /etc/ais/hidden
[ais.git] / bin / inputs / outpeer.py
1 # -*- coding: utf-8 -*-
2 '''
3 UDP out peers module
4 '''
5
6 from __future__ import division
7 import logging
8 from time import time as get_timestamp, sleep
9 import socket
10 from threading import Thread, Lock
11
12 from ais.ntools import formataddr
13 from ais.common import strmmsi_to_mmsi, mmsi_to_strmmsi
14 from ais.inputs.config import get_hidden_mmsi
15
16 __all__ = [ 'OutPeers', 'outpeers_from_config', 'TcpInOutPeer', 'TcpInServiceOut', 'tcpin_outpeers' ]
17
18 class UdpOutPeer:
19     '''
20     UDP output feed.
21     '''
22     def __init__(self, str_addrport, options):
23         colpos = str_addrport.rfind(':')
24         if colpos == 1:
25             assert colpos != 1, \
26                 'Missing column in host:port string for %s' % str_addrport
27         self.host = str_addrport[:colpos]
28         if self.host.startswith('['):
29             assert self.host[-1] == ']'
30             self.host = self.host[1:-1]
31         self.port = int(str_addrport[colpos+1:])
32         self.possibles_addrinfo = None
33         self.sock = None
34         self.addr = None
35         self.timestamp_getaddrinfo = 0
36         self._reinit_socket()
37
38         self.send_hidden = False
39         logging.warning('%s', options)
40         for option in options:
41             if option == 'send_hidden':
42                 self.send_hidden = True
43             else:
44                 loggin.critical('Unsupported option %s for UdpOutPeer' % option)
45                 raise RuntimeError('Unsupported option %s for UdpOutPeer' % option)
46
47     def __repr__(self):
48         return '%s:%s' % (self.host, self.port)
49
50     def _reinit_socket(self):
51         '''
52         Remove last known addrinfo for that host:port, if any.
53         If addrinfo is empty, fetch a new one.
54         Initialise self.sock and self.addr
55         '''
56         while True:
57             if self.possibles_addrinfo:
58                 # remove current (0th) addrinfo
59                 del self.possibles_addrinfo[0]
60
61             if not self.possibles_addrinfo:
62                 timestamp = get_timestamp()
63                 if timestamp > self.timestamp_getaddrinfo + 60:
64                     try:
65                         logging.debug('Calling socket.getaddrinfo for %s:%s',
66                             self.host, self.port)
67                         self.possibles_addrinfo = socket.getaddrinfo(self.host,
68                             self.port, socket.AF_UNSPEC, socket.SOCK_DGRAM)
69                         logging.info('%s possibles addrinfo for %s:%s.', 
70                             len(self.possibles_addrinfo), self.host, self.port)
71                         logging.debug('possibles_addrinfo: %s ',
72                             self.possibles_addrinfo)
73                     except socket.gaierror, err:
74                         logging.error("Can't resolve %s:%s: %s. "
75                             "Will retry in a minute.",
76                             self.host, self.port, err)
77                         return False
78                     finally:
79                         self.timestamp_getaddrinfo = timestamp
80                     if not self.possibles_addrinfo:
81                         logging.error("Can't resolve %s:%s. "
82                             "Will retry in a minute.",
83                             self.host, self.port)
84                         return False
85                 else:
86                     return False # waiting 
87
88             addrinfo = self.possibles_addrinfo[0]
89             logging.info('Prepared socket to %s', addrinfo)
90             af_, socktype, proto = addrinfo[:3]
91             self.addr = addrinfo[4]
92             try:
93                 self.sock = socket.socket(af_, socktype, proto)
94                 return True
95             except socket.error, msg:
96                 logging.error("Can't open socket to %s: %s", self.addr, msg)
97         
98     def send_line(self, line, decoded_info):
99         '''
100         Sends a AIVDM line to that peer.
101         CRLF will be added automatically.
102         '''
103         while self.sock or self._reinit_socket():
104             try:
105                 self.sock.sendto(line+'\r\n', self.addr)
106                 return
107             except socket.error, err:
108                 logging.error('socket.sendto(%s) failed: %s', self.addr, err)
109                 self.sock = None
110
111
112 class TcpInOutPeer:
113     def __init__(self, conn, addr, port):
114         self.sock = conn
115         self.sock.setblocking(0)
116         self.addr = addr
117         self.port = port
118         #logging.debug('Set linger time for outputtcpinlistener socket to %s seconds', SERVER_LINGER_TIME)
119         #self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
120         #                      struct.pack('ii', 1, SERVER_LINGER_TIME))
121
122     def send_line(self, line, decoded_info):
123         try:
124             self.sock.send(line+'\r\n')
125         except socket.error, err:
126             if err.errno == 32: # Broken pipe
127                 logging.info('TcpInOutPeer %s:%s disconnect', formataddr(self.addr), self.port)
128                 tcpin_outpeers.remove_peer(self)
129             else:
130                 logging.error('socket.send() to TcpInOutPeer %s:%s failed %s', formataddr(self.addr), port, repr(err))
131
132     def shutdown(self):
133         logging.info('Shuting down TcpInOutPeer socket %s:%s', formataddr(self.addr), self.port)
134         self.sock.setblocking(1)
135         self.sock.shutdown(socket.SHUT_RDWR)
136
137
138
139 class TcpInServiceOut(Thread):
140     '''
141     Service that listen to TCP port and create TcpInOutPeer on the fly.
142     '''
143     def __init__(self, port):
144         Thread.__init__(self)
145         self.port = port
146         self.sock = None
147
148     def run(self):
149         self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
150         self.sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
151         while True:
152             try:
153                 self.sock.bind(('', self.port))
154                 break
155             except socket.error, err:
156                 if err.errno == 98: # Address already in use
157                     logging.error('TCP port %s already in use. Waiting...', self.port)
158                     sleep(1)
159                 elif err.errno == 9: # Bad file descriptor
160                     logging.error('TcpInServiceOut socket is not opened. Can\'t bind. Exiting service.')
161                     return
162                 else:
163                     raise
164         logging.info('Bound to socket on port %s for listeners', self.port)
165         #logging.debug('Set linger time for tcpinoutpeerlisten socket to %s seconds', SERVER_LINGER_TIME)
166         #self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
167         #                      struct.pack('ii', 1, SERVER_LINGER_TIME))
168         self.sock.listen(2)
169         while True:
170             try:
171                 conn, addr = self.sock.accept()
172             except socket.error, err:
173                 if err.errno == 22: # Invalid argument
174                     logging.info('TcpInServiceOut socket was closed. Exiting thread.')
175                     return
176             logging.info('Incoming TcpInOutPeer %s:%s', formataddr(addr[0]), addr[1])
177             tcpin_outpeers.add_peer(TcpInOutPeer(conn, addr[0], addr[1]))
178     
179     def shutdown(self):
180         if self.sock:
181             try:
182                 logging.info('Shuting down tcpinoutpeerlisten socket')
183                 self.sock.shutdown(socket.SHUT_RDWR)
184             except socket.error, err:
185                 logging.info('Ignoring error %s while shuting down TcpInServiceOut socket.', err.errno)
186             self.sock.close()       
187
188 class OutPeers:
189     '''
190     A group of UDP peers for output.
191     This class is thread safe.
192     '''
193     def __init__(self):
194         self._peers = set()
195         self._lock = Lock()
196
197     def __repr__(self):
198         return '(' + ', '.join([ repr(peer) for peer in self._peers]) + ')'
199         
200     def add_peer(self, peer):
201         self._lock.acquire()
202         self._peers.add(peer)
203         self._lock.release()
204
205     def remove_peer(self, peer):
206         self._lock.acquire()
207         self._peers.remove(peer)
208         self._lock.release()
209
210     def safe_iter(self):
211         # make a copy of the set, because its size may change during iteration (RuntimeError)
212         self._lock.acquire()
213         tmp_peers = [ peer for peer in self._peers ]
214         self._lock.release()
215         for peer in tmp_peers:
216             yield peer
217
218     def send_line(self, line, decoded_info):
219         '''
220         Send a line to all the output peers registered.
221         '''
222         hidden_mmsi = get_hidden_mmsi()
223         #logging.info('hiddenmmsi: %s', get_hidden_mmsi())
224         for outpeer in self.safe_iter():
225             if not decoded_info:
226                 continue
227             try:
228                 send_hidden = outpeer.send_hidden
229             except AttributeError:
230                 send_hidden = False
231             if decoded_info != True and decoded_info.mmsi in hidden_mmsi and not send_hidden:
232                 logging.info('Hiding line for mmsi=%s', decoded_info.mmsi)
233                 continue
234             
235             logging.debug('OUT %s %s', outpeer, repr(line))
236             outpeer.send_line(line, decoded_info)
237
238 def outpeers_from_config(list_from_config):
239     '''
240     Just give the list of strings from config.
241     Returns an OutPeers object.
242     '''
243     outpeers = OutPeers()
244     for config_with_options in list_from_config:
245         str_config = config_with_options[0]
246         if len(config_with_options):
247             options = config_with_options[1:]
248         else:
249             options = []
250         assert str_config.startswith('udp:'), \
251                'Unsupported output peer protocol %s' % str_config
252         peer = UdpOutPeer(str_config[4:], options)
253         outpeers.add_peer(peer)
254     return outpeers
255
256
257 tcpin_outpeers = OutPeers()