089b20c330bdefd095aee3e4265de37817121cdf
[ais.git] / bin / inputs / outpeer.py
1 # -*- coding: utf-8 -*-
2 '''
3 UDP out peers module
4 '''
5
6 from __future__ import division
7 import logging
8 from time import time as get_timestamp, sleep
9 import socket
10 from threading import Thread, Lock
11
12 from ais.ntools import formataddr
13
14 __all__ = [ 'OutPeers', 'outpeers_from_config', 'TcpInOutPeer', 'TcpInServiceOut', 'tcpin_outpeers' ]
15
16 class UdpOutPeer:
17     '''
18     UDP output feed.
19     '''
20     def __init__(self, str_addrport):
21         colpos = str_addrport.rfind(':')
22         if colpos == 1:
23             assert colpos != 1, \
24                 'Missing column in host:port string for %s' % str_addrport
25         self.host = str_addrport[:colpos]
26         if self.host.startswith('['):
27             assert self.host[-1] == ']'
28             self.host = self.host[1:-1]
29         self.port = int(str_addrport[colpos+1:])
30         self.possibles_addrinfo = None
31         self.sock = None
32         self.addr = None
33         self.timestamp_getaddrinfo = 0
34         self._reinit_socket()
35
36     def __repr__(self):
37         return '%s:%s' % (self.host, self.port)
38
39     def _reinit_socket(self):
40         '''
41         Remove last known addrinfo for that host:port, if any.
42         If addrinfo is empty, fetch a new one.
43         Initialise self.sock and self.addr
44         '''
45         while True:
46             if self.possibles_addrinfo:
47                 # remove current (0th) addrinfo
48                 del self.possibles_addrinfo[0]
49
50             if not self.possibles_addrinfo:
51                 timestamp = get_timestamp()
52                 if timestamp > self.timestamp_getaddrinfo + 60:
53                     try:
54                         logging.debug('Calling socket.getaddrinfo for %s:%s',
55                             self.host, self.port)
56                         self.possibles_addrinfo = socket.getaddrinfo(self.host,
57                             self.port, socket.AF_UNSPEC, socket.SOCK_DGRAM)
58                         logging.info('%s possibles addrinfo for %s:%s.', 
59                             len(self.possibles_addrinfo), self.host, self.port)
60                         logging.debug('possibles_addrinfo: %s ',
61                             self.possibles_addrinfo)
62                     except socket.gaierror, err:
63                         logging.error("Can't resolve %s:%s: %s. "
64                             "Will retry in a minute.",
65                             self.host, self.port, err)
66                         return False
67                     finally:
68                         self.timestamp_getaddrinfo = timestamp
69                     if not self.possibles_addrinfo:
70                         logging.error("Can't resolve %s:%s. "
71                             "Will retry in a minute.",
72                             self.host, self.port)
73                         return False
74                 else:
75                     return False # waiting 
76
77             addrinfo = self.possibles_addrinfo[0]
78             logging.info('Prepared socket to %s', addrinfo)
79             af_, socktype, proto = addrinfo[:3]
80             self.addr = addrinfo[4]
81             try:
82                 self.sock = socket.socket(af_, socktype, proto)
83                 return True
84             except socket.error, msg:
85                 logging.error("Can't open socket to %s: %s", self.addr, msg)
86         
87     def send_line(self, line):
88         '''
89         Sends a AIVDM line to that peer.
90         CRLF will be added automatically.
91         '''
92         while self.sock or self._reinit_socket():
93             try:
94                 self.sock.sendto(line+'\r\n', self.addr)
95                 return
96             except socket.error, err:
97                 logging.error('socket.sendto(%s) failed: %s', self.addr, err)
98                 self.sock = None
99
100
101 class TcpInOutPeer:
102     def __init__(self, conn, addr, port):
103         self.sock = conn
104         self.sock.setblocking(0)
105         self.addr = addr
106         self.port = port
107         #logging.debug('Set linger time for outputtcpinlistener socket to %s seconds', SERVER_LINGER_TIME)
108         #self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
109         #                      struct.pack('ii', 1, SERVER_LINGER_TIME))
110
111     def send_line(self, line):
112         try:
113             self.sock.send(line+'\r\n')
114         except socket.error, err:
115             if err.errno == 32: # Broken pipe
116                 logging.info('TcpInOutPeer %s:%s disconnect', formataddr(self.addr), self.port)
117                 tcpin_outpeers.remove_peer(self)
118             else:
119                 logging.error('socket.send() to TcpInOutPeer %s:%s failed %s', formataddr(self.addr), port, repr(err))
120
121     def shutdown(self):
122         logging.info('Shuting down TcpInOutPeer socket %s:%s', formataddr(self.addr), self.port)
123         self.sock.setblocking(1)
124         self.sock.shutdown(socket.SHUT_RDWR)
125
126
127
128 class TcpInServiceOut(Thread):
129     '''
130     Service that listen to TCP port and create TcpInOutPeer on the fly.
131     '''
132     def __init__(self, port):
133         Thread.__init__(self)
134         self.port = port
135         self.sock = None
136
137     def run(self):
138         self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
139         self.sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
140         while True:
141             try:
142                 self.sock.bind(('', self.port))
143                 break
144             except socket.error, err:
145                 if err.errno == 98: # Address already in use
146                     logging.error('TCP port %s already in use. Waiting...', self.port)
147                     sleep(1)
148                 elif err.errno == 9: # Bad file descriptor
149                     logging.error('TcpInServiceOut socket is not opened. Can\'t bind. Exiting service.')
150                     return
151                 else:
152                     raise
153         logging.info('Bound to socket on port %s for listeners', self.port)
154         #logging.debug('Set linger time for tcpinoutpeerlisten socket to %s seconds', SERVER_LINGER_TIME)
155         #self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
156         #                      struct.pack('ii', 1, SERVER_LINGER_TIME))
157         self.sock.listen(2)
158         while True:
159             try:
160                 conn, addr = self.sock.accept()
161             except socket.error, err:
162                 if err.errno == 22: # Invalid argument
163                     logging.info('TcpInServiceOut socket was closed. Exiting thread.')
164                     return
165             logging.info('Incoming TcpInOutPeer %s:%s', formataddr(addr[0]), addr[1])
166             tcpin_outpeers.add_peer(TcpInOutPeer(conn, addr[0], addr[1]))
167     
168     def shutdown(self):
169         if self.sock:
170             try:
171                 logging.info('Shuting down tcpinoutpeerlisten socket')
172                 self.sock.shutdown(socket.SHUT_RDWR)
173             except socket.error, err:
174                 logging.info('Ignoring error %s while shuting down TcpInServiceOut socket.', err.errno)
175             self.sock.close()       
176
177 class OutPeers:
178     '''
179     A group of UDP peers for output.
180     This class is thread safe.
181     '''
182     def __init__(self):
183         self._peers = set()
184         self._lock = Lock()
185
186     def __repr__(self):
187         return '(' + ', '.join([ repr(peer) for peer in self._peers]) + ')'
188         
189     def add_peer(self, peer):
190         self._lock.acquire()
191         self._peers.add(peer)
192         self._lock.release()
193
194     def remove_peer(self, peer):
195         self._lock.acquire()
196         self._peers.remove(peer)
197         self._lock.release()
198
199     def safe_iter(self):
200         # make a copy of the set, because its size may change during iteration (RuntimeError)
201         self._lock.acquire()
202         tmp_peers = [ peer for peer in self._peers ]
203         self._lock.release()
204         for peer in tmp_peers:
205             yield peer
206
207     def send_line(self, line):
208         '''
209         Send a line to all the output peers registered.
210         '''
211         for outpeer in self.safe_iter():
212             logging.debug('OUT %s %s', outpeer, repr(line))
213             outpeer.send_line(line)
214
215 def outpeers_from_config(list_from_config):
216     '''
217     Just give the list of strings from config.
218     Returns an OutPeers object.
219     '''
220     outpeers = OutPeers()
221     for str_config in list_from_config:
222         assert str_config.startswith('udp:'), \
223                'Unsupported output peer protocol %s' % str_config
224         peer = UdpOutPeer(str_config[4:])
225         outpeers.add_peer(peer)
226     return outpeers
227
228
229 tcpin_outpeers = OutPeers()