Added hidemmsi option for outpeers
[ais.git] / bin / inputs / outpeer.py
1 # -*- coding: utf-8 -*-
2 '''
3 UDP out peers module
4 '''
5
6 from __future__ import division
7 import logging
8 from time import time as get_timestamp, sleep
9 import socket
10 from threading import Thread, Lock
11
12 from ais.ntools import formataddr
13 from ais.common import strmmsi_to_mmsi
14
15 __all__ = [ 'OutPeers', 'outpeers_from_config', 'TcpInOutPeer', 'TcpInServiceOut', 'tcpin_outpeers' ]
16
17 class UdpOutPeer:
18     '''
19     UDP output feed.
20     '''
21     def __init__(self, str_addrport, options):
22         colpos = str_addrport.rfind(':')
23         if colpos == 1:
24             assert colpos != 1, \
25                 'Missing column in host:port string for %s' % str_addrport
26         self.host = str_addrport[:colpos]
27         if self.host.startswith('['):
28             assert self.host[-1] == ']'
29             self.host = self.host[1:-1]
30         self.port = int(str_addrport[colpos+1:])
31         self.possibles_addrinfo = None
32         self.sock = None
33         self.addr = None
34         self.timestamp_getaddrinfo = 0
35         self._reinit_socket()
36
37         self.hidemmsi = []
38         logging.warning('%s', options)
39         for option in options:
40             if option.startswith('hidemmsi:'):
41                 self.hidemmsi += option[len('hidemmsi:'):].split(',')
42         self.hidemmsi = [ strmmsi_to_mmsi(mmsi) for mmsi in self.hidemmsi ]
43         logging.warning('hidemmsi:%s', self.hidemmsi)
44
45     def __repr__(self):
46         return '%s:%s' % (self.host, self.port)
47
48     def _reinit_socket(self):
49         '''
50         Remove last known addrinfo for that host:port, if any.
51         If addrinfo is empty, fetch a new one.
52         Initialise self.sock and self.addr
53         '''
54         while True:
55             if self.possibles_addrinfo:
56                 # remove current (0th) addrinfo
57                 del self.possibles_addrinfo[0]
58
59             if not self.possibles_addrinfo:
60                 timestamp = get_timestamp()
61                 if timestamp > self.timestamp_getaddrinfo + 60:
62                     try:
63                         logging.debug('Calling socket.getaddrinfo for %s:%s',
64                             self.host, self.port)
65                         self.possibles_addrinfo = socket.getaddrinfo(self.host,
66                             self.port, socket.AF_UNSPEC, socket.SOCK_DGRAM)
67                         logging.info('%s possibles addrinfo for %s:%s.', 
68                             len(self.possibles_addrinfo), self.host, self.port)
69                         logging.debug('possibles_addrinfo: %s ',
70                             self.possibles_addrinfo)
71                     except socket.gaierror, err:
72                         logging.error("Can't resolve %s:%s: %s. "
73                             "Will retry in a minute.",
74                             self.host, self.port, err)
75                         return False
76                     finally:
77                         self.timestamp_getaddrinfo = timestamp
78                     if not self.possibles_addrinfo:
79                         logging.error("Can't resolve %s:%s. "
80                             "Will retry in a minute.",
81                             self.host, self.port)
82                         return False
83                 else:
84                     return False # waiting 
85
86             addrinfo = self.possibles_addrinfo[0]
87             logging.info('Prepared socket to %s', addrinfo)
88             af_, socktype, proto = addrinfo[:3]
89             self.addr = addrinfo[4]
90             try:
91                 self.sock = socket.socket(af_, socktype, proto)
92                 return True
93             except socket.error, msg:
94                 logging.error("Can't open socket to %s: %s", self.addr, msg)
95         
96     def send_line(self, line, decoded_info):
97         '''
98         Sends a AIVDM line to that peer.
99         CRLF will be added automatically.
100         '''
101         while self.sock or self._reinit_socket():
102             try:
103                 self.sock.sendto(line+'\r\n', self.addr)
104                 return
105             except socket.error, err:
106                 logging.error('socket.sendto(%s) failed: %s', self.addr, err)
107                 self.sock = None
108
109
110 class TcpInOutPeer:
111     def __init__(self, conn, addr, port):
112         self.sock = conn
113         self.sock.setblocking(0)
114         self.addr = addr
115         self.port = port
116         #logging.debug('Set linger time for outputtcpinlistener socket to %s seconds', SERVER_LINGER_TIME)
117         #self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
118         #                      struct.pack('ii', 1, SERVER_LINGER_TIME))
119
120     def send_line(self, line, decoded_info):
121         try:
122             self.sock.send(line+'\r\n')
123         except socket.error, err:
124             if err.errno == 32: # Broken pipe
125                 logging.info('TcpInOutPeer %s:%s disconnect', formataddr(self.addr), self.port)
126                 tcpin_outpeers.remove_peer(self)
127             else:
128                 logging.error('socket.send() to TcpInOutPeer %s:%s failed %s', formataddr(self.addr), port, repr(err))
129
130     def shutdown(self):
131         logging.info('Shuting down TcpInOutPeer socket %s:%s', formataddr(self.addr), self.port)
132         self.sock.setblocking(1)
133         self.sock.shutdown(socket.SHUT_RDWR)
134
135
136
137 class TcpInServiceOut(Thread):
138     '''
139     Service that listen to TCP port and create TcpInOutPeer on the fly.
140     '''
141     def __init__(self, port):
142         Thread.__init__(self)
143         self.port = port
144         self.sock = None
145
146     def run(self):
147         self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
148         self.sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
149         while True:
150             try:
151                 self.sock.bind(('', self.port))
152                 break
153             except socket.error, err:
154                 if err.errno == 98: # Address already in use
155                     logging.error('TCP port %s already in use. Waiting...', self.port)
156                     sleep(1)
157                 elif err.errno == 9: # Bad file descriptor
158                     logging.error('TcpInServiceOut socket is not opened. Can\'t bind. Exiting service.')
159                     return
160                 else:
161                     raise
162         logging.info('Bound to socket on port %s for listeners', self.port)
163         #logging.debug('Set linger time for tcpinoutpeerlisten socket to %s seconds', SERVER_LINGER_TIME)
164         #self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
165         #                      struct.pack('ii', 1, SERVER_LINGER_TIME))
166         self.sock.listen(2)
167         while True:
168             try:
169                 conn, addr = self.sock.accept()
170             except socket.error, err:
171                 if err.errno == 22: # Invalid argument
172                     logging.info('TcpInServiceOut socket was closed. Exiting thread.')
173                     return
174             logging.info('Incoming TcpInOutPeer %s:%s', formataddr(addr[0]), addr[1])
175             tcpin_outpeers.add_peer(TcpInOutPeer(conn, addr[0], addr[1]))
176     
177     def shutdown(self):
178         if self.sock:
179             try:
180                 logging.info('Shuting down tcpinoutpeerlisten socket')
181                 self.sock.shutdown(socket.SHUT_RDWR)
182             except socket.error, err:
183                 logging.info('Ignoring error %s while shuting down TcpInServiceOut socket.', err.errno)
184             self.sock.close()       
185
186 class OutPeers:
187     '''
188     A group of UDP peers for output.
189     This class is thread safe.
190     '''
191     def __init__(self):
192         self._peers = set()
193         self._lock = Lock()
194
195     def __repr__(self):
196         return '(' + ', '.join([ repr(peer) for peer in self._peers]) + ')'
197         
198     def add_peer(self, peer):
199         self._lock.acquire()
200         self._peers.add(peer)
201         self._lock.release()
202
203     def remove_peer(self, peer):
204         self._lock.acquire()
205         self._peers.remove(peer)
206         self._lock.release()
207
208     def safe_iter(self):
209         # make a copy of the set, because its size may change during iteration (RuntimeError)
210         self._lock.acquire()
211         tmp_peers = [ peer for peer in self._peers ]
212         self._lock.release()
213         for peer in tmp_peers:
214             yield peer
215
216     def send_line(self, line, decoded_info):
217         '''
218         Send a line to all the output peers registered.
219         '''
220         for outpeer in self.safe_iter():
221             if decoded_info and decoded_info != True:
222                 try:
223                     hidemmsi = outpeer.hidemmsi
224                 except AttributeError:
225                     hidemmsi = []
226                 if decoded_info.mmsi in hidemmsi:
227                     logging.info('Hiding line for mmsi=%s', decoded_info.mmsi)
228                     continue
229             logging.debug('OUT %s %s', outpeer, repr(line))
230             outpeer.send_line(line, decoded_info)
231
232 def outpeers_from_config(list_from_config):
233     '''
234     Just give the list of strings from config.
235     Returns an OutPeers object.
236     '''
237     outpeers = OutPeers()
238     for config_with_options in list_from_config:
239         str_config = config_with_options[0]
240         if len(config_with_options):
241             options = config_with_options[1:]
242         else:
243             options = []
244         assert str_config.startswith('udp:'), \
245                'Unsupported output peer protocol %s' % str_config
246         peer = UdpOutPeer(str_config[4:], options)
247         outpeers.add_peer(peer)
248     return outpeers
249
250
251 tcpin_outpeers = OutPeers()