1 # -*- coding: utf-8 -*-
6 from __future__ import division
8 from time import time as get_timestamp, sleep
10 from threading import Thread, Lock
12 from ais.ntools import formataddr
14 __all__ = [ 'OutPeers', 'outpeers_from_config', 'TcpInOutPeer', 'TcpInServiceOut', 'tcpin_outpeers' ]
20 def __init__(self, str_addrport):
21 colpos = str_addrport.rfind(':')
24 'Missing column in host:port string for %s' % str_addrport
25 self.host = str_addrport[:colpos]
26 if self.host.startswith('['):
27 assert self.host[-1] == ']'
28 self.host = self.host[1:-1]
29 self.port = int(str_addrport[colpos+1:])
30 self.possibles_addrinfo = None
33 self.timestamp_getaddrinfo = 0
37 return '%s:%s' % (self.host, self.port)
39 def _reinit_socket(self):
41 Remove last known addrinfo for that host:port, if any.
42 If addrinfo is empty, fetch a new one.
43 Initialise self.sock and self.addr
46 if self.possibles_addrinfo:
47 # remove current (0th) addrinfo
48 del self.possibles_addrinfo[0]
50 if not self.possibles_addrinfo:
51 timestamp = get_timestamp()
52 if timestamp > self.timestamp_getaddrinfo + 60:
54 logging.debug('Calling socket.getaddrinfo for %s:%s',
56 self.possibles_addrinfo = socket.getaddrinfo(self.host,
57 self.port, socket.AF_UNSPEC, socket.SOCK_DGRAM)
58 logging.info('%s possibles addrinfo for %s:%s.',
59 len(self.possibles_addrinfo), self.host, self.port)
60 logging.debug('possibles_addrinfo: %s ',
61 self.possibles_addrinfo)
62 except socket.gaierror, err:
63 logging.error("Can't resolve %s:%s: %s. "
64 "Will retry in a minute.",
65 self.host, self.port, err)
68 self.timestamp_getaddrinfo = timestamp
69 if not self.possibles_addrinfo:
70 logging.error("Can't resolve %s:%s. "
71 "Will retry in a minute.",
75 return False # waiting
77 addrinfo = self.possibles_addrinfo[0]
78 logging.info('Prepared socket to %s', addrinfo)
79 af_, socktype, proto = addrinfo[:3]
80 self.addr = addrinfo[4]
82 self.sock = socket.socket(af_, socktype, proto)
84 except socket.error, msg:
85 logging.error("Can't open socket to %s: %s", self.addr, msg)
87 def send_line(self, line):
89 Sends a AIVDM line to that peer.
90 CRLF will be added automatically.
92 while self.sock or self._reinit_socket():
94 self.sock.sendto(line+'\r\n', self.addr)
96 except socket.error, err:
97 logging.error('socket.sendto(%s) failed: %s', self.addr, err)
102 def __init__(self, conn, addr, port):
104 self.sock.setblocking(0)
107 #logging.debug('Set linger time for outputtcpinlistener socket to %s seconds', SERVER_LINGER_TIME)
108 #self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
109 # struct.pack('ii', 1, SERVER_LINGER_TIME))
111 def send_line(self, line):
113 self.sock.send(line+'\r\n')
114 except socket.error, err:
115 if err.errno == 32: # Broken pipe
116 logging.info('TcpInOutPeer %s:%s disconnect', formataddr(self.addr), self.port)
117 tcpin_outpeers.remove_peer(self)
119 logging.error('socket.send() to TcpInOutPeer %s:%s failed %s', formataddr(self.addr), port, repr(err))
122 logging.info('Shuting down TcpInOutPeer socket %s:%s', formataddr(self.addr), self.port)
123 self.sock.setblocking(1)
124 self.sock.shutdown(socket.SHUT_RDWR)
128 class TcpInServiceOut(Thread):
130 Service that listen to TCP port and create TcpInOutPeer on the fly.
132 def __init__(self, port):
133 Thread.__init__(self)
138 self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
139 self.sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
142 self.sock.bind(('', self.port))
144 except socket.error, err:
145 if err.errno == 98: # Address already in use
146 logging.error('TCP port %s already in use. Waiting...', self.port)
148 elif err.errno == 9: # Bad file descriptor
149 logging.error('TcpInServiceOut socket is not opened. Can\'t bind. Exiting service.')
153 logging.info('Bound to socket on port %s for listeners', self.port)
154 #logging.debug('Set linger time for tcpinoutpeerlisten socket to %s seconds', SERVER_LINGER_TIME)
155 #self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
156 # struct.pack('ii', 1, SERVER_LINGER_TIME))
160 conn, addr = self.sock.accept()
161 except socket.error, err:
162 if err.errno == 22: # Invalid argument
163 logging.info('TcpInServiceOut socket was closed. Exiting thread.')
165 logging.info('Incoming TcpInOutPeer %s:%s', formataddr(addr[0]), addr[1])
166 tcpin_outpeers.add_peer(TcpInOutPeer(conn, addr[0], addr[1]))
171 logging.info('Shuting down tcpinoutpeerlisten socket')
172 self.sock.shutdown(socket.SHUT_RDWR)
173 except socket.error, err:
174 logging.info('Ignoring error %s while shuting down TcpInServiceOut socket.', err.errno)
179 A group of UDP peers for output.
180 This class is thread safe.
187 return '(' + ', '.join([ repr(peer) for peer in self._peers]) + ')'
189 def add_peer(self, peer):
191 self._peers.add(peer)
194 def remove_peer(self, peer):
196 self._peers.remove(peer)
200 # make a copy of the set, because its size may change during iteration (RuntimeError)
202 tmp_peers = [ peer for peer in self._peers ]
204 for peer in tmp_peers:
207 def send_line(self, line):
209 Send a line to all the output peers registered.
211 for outpeer in self.safe_iter():
212 logging.debug('OUT %s %s', outpeer, repr(line))
213 outpeer.send_line(line)
215 def outpeers_from_config(list_from_config):
217 Just give the list of strings from config.
218 Returns an OutPeers object.
220 outpeers = OutPeers()
221 for str_config in list_from_config:
222 assert str_config.startswith('udp:'), \
223 'Unsupported output peer protocol %s' % str_config
224 peer = UdpOutPeer(str_config[4:])
225 outpeers.add_peer(peer)
229 tcpin_outpeers = OutPeers()