from ais.gpsdec import AivdmProcessor
+import socket
+import struct
+from threading import Thread, Lock
+from time import sleep
+from ais.ntools import formataddr
+output_tcpin_listeners = set()
+SERVER_LINGER_TIME = 5
+
+class OutputTcpInListener:
+ def __init__(self, conn, addr, port):
+ self.sock = conn
+ self.sock.setblocking(0)
+ self.addr = addr
+ self.port = port
+ logging.debug('Set linger time for outputtcpinlistener socket to %s seconds', SERVER_LINGER_TIME)
+ self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
+ struct.pack('ii', 1, SERVER_LINGER_TIME))
+
+ def send_line(self, line):
+ global output_tcpin_listeners_mutex
+ try:
+ self.sock.send(line+'\r\n')
+ except socket.error, err:
+ if err.errno == 32: # Broken pipe
+ logging.info('listener %s:%s disconnect', formataddr(self.addr), self.port)
+ output_tcpin_listeners_mutex.acquire()
+ output_tcpin_listeners.remove(self)
+ output_tcpin_listeners_mutex.release()
+ else:
+ logging.error('socket.send() to listener %s:%s failed %s', formataddr(self.addr), port, repr(err))
+
+
+class OutputTcpInListen(Thread):
+ def __init__(self, port):
+ Thread.__init__(self)
+ self.port = port
+ self.sock = None
+
+ def run(self):
+ self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+ self.sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
+ while True:
+ try:
+ self.sock.bind(('', self.port))
+ break
+ except socket.error, err:
+ if err.errno == 98: # Address already in use
+ logging.error('TCP port %s already in use. Waiting...', self.port)
+ sleep(1)
+ else:
+ raise
+ logging.info('Bound to socket on port %s for listeners', self.port)
+ logging.debug('Set linger time for outtcplisten socket to %s seconds', SERVER_LINGER_TIME)
+ self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
+ struct.pack('ii', 1, SERVER_LINGER_TIME))
+ self.sock.listen(2)
+ while True:
+ try:
+ conn, addr = self.sock.accept()
+ except socket.error, err:
+ if err.errno == 22: # Invalid argument
+ logging.info('TcpInListen socket was closed. Exiting thread.')
+ return
+ raise
+ logging.info('Incoming listener %s:%s', formataddr(addr[0]), addr[1])
+ output_tcpin_listeners.add(OutputTcpInListener(conn, addr[0], addr[1]))
+
+
def mainloop(options, args):
if options.debug:
loglevel = logging.DEBUG
#rootlogger.addHandler(loghandler)
+ if options.tcp_listeners_port:
+ global output_tcpin_listeners_mutex
+ outtcplisten = OutputTcpInListen(options.tcp_listeners_port)
+ output_tcpin_listeners_mutex = Lock()
+ outtcplisten.start()
+
try:
config = peers_get_config()
if len(config) == 0:
# forward the line
channel.source.outpeers.send_line(line)
+ if options.tcp_listeners_port:
+ # make a copy of the set, because its size may change during iteration (RuntimeError)
+ output_tcpin_listeners_mutex.acquire()
+ tmp_listeners = [ listener for listener in output_tcpin_listeners ]
+ output_tcpin_listeners_mutex.release()
+ for listener in tmp_listeners:
+ listener.send_line(line)
# we don't want any stdout buffering
if options.stdout:
refresh_all_stats()
last_stat = timestamp
-
-
except KeyboardInterrupt:
logging.critical('Received SIGINT. Shuting down.')
- raise
+
+ if options.tcp_listeners_port:
+ if outtcplisten.sock:
+ logging.info('Shuting down outtcplisten socket')
+ outtcplisten.sock.shutdown(socket.SHUT_RDWR)
+ output_tcpin_listeners_mutex.acquire()
+ tmp_listeners = [ listener for listener in output_tcpin_listeners ]
+ output_tcpin_listeners_mutex.release()
+ for listener in tmp_listeners:
+ logging.info('Shuting down listener socket %s:%s', formataddr(listener.addr), listener.port)
+ listener.sock.setblocking(1)
+ listener.sock.shutdown(socket.SHUT_RDWR)
def main():
from optparse import OptionParser
parser.add_option('--db',
help='Process packets locally',
action='store_true', dest='todb', default=False)
+ parser.add_option('--tcp-listeners-port',
+ help='TCP global listener port',
+ action='store', type=int, dest='tcp_listeners_port')
options, args = parser.parse_args()