5ca1a12884f593e0962c13f239c34db796c3b478
[ais.git] / bin / inputs / udp.py
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3
4 '''
5 Module for receiving AIVDM data from UDP.
6 '''
7
8 from __future__ import division
9 import logging
10 import socket
11
12 from ais.ntools import formataddr
13 from ais.inputs.common import DEFAULT_MTU, get_source_by_id4
14 from ais.inputs.virtual import Channel, Service
15 from ais.inputs.config import source_get_infoin
16
17 UDP_HEADER_SIZE = 28
18 UDP_RCVBUF = 1024*1024
19
20
21 class UdpChannel(Channel):
22     def __init__(self, id4, addr, port=None):
23         Channel.__init__(self)
24         self.id4 = id4
25         self.source = get_source_by_id4(id4)
26         self.addr = addr
27         self.port = port
28
29     def __repr__(self):
30         result = formataddr(self.addr)
31         if self.port:
32             result += ':'+str(self.port)
33         return result
34
35
36 class UdpService(Service):
37     '''
38     Service that open an UDP socket and listen for AIVDM data on it.
39     '''
40
41     #TODO: Check there is not too many ports with sport_discriminate==True
42     MAX_SPORT_DISCRIMINATE = 100
43
44     def __init__(self, serverhost, serverport):
45         #Service.__init__(self, stdout)
46         self.serverport = serverport
47         self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
48         #http://bugs.debian.org/cgi-bin/bugreport.cgi?bug=560238 :
49         self.sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
50         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, UDP_RCVBUF)
51         socksize = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
52         logging.info('Socket buffer size is %s bytes', socksize)
53         self.sock.bind(('', serverport))
54         self.channels = {} # channelid is (addr,port) or (addr,None)
55         logging.info('Listening on UDP port %s.', serverport)
56         
57         self.rhostrport2id4sdisc = {}
58         infoin = source_get_infoin('Udp', '', serverport)
59         for id4, raddr, rport, sport_disc in infoin:
60             self.rhostrport2id4sdisc[(raddr, rport)] = id4, sport_disc
61             if not raddr:
62                 raddr = '*'
63             if not rport:
64                 rport = '*'
65             if sport_disc:
66                 sport_disc_display = 'WITH'
67             else:
68                 sport_disc_display = 'WITHOUT'
69
70             logging.info('Mapped %s:%s to source %s'
71                          ' %s source port discrimination.',
72                          raddr, rport, id4, sport_disc_display)
73
74         if len(infoin) == 0:
75             logging.error('No matching source.'
76                           ' All information will be discarded.')
77
78     def __repr__(self):
79         return 'UdpService<%s>' % self.serverport
80
81     def fileno(self):
82         '''
83         Returns file descriptor of underlying socket, for os.select().
84         '''
85         return self.sock.fileno()
86
87     def recvinfo_to_id4sdisc(self, recv_from_info):
88         '''
89         Identify a datagram from the recv information.
90         Returns its source id4 and boolean source_port_discrimination.
91         If source_port_discrimination, (ip1, port1) and (ip1, port2) will have
92         separated decoding channels.
93         '''
94         addr, port = recv_from_info[:2]
95         addr = formataddr(addr)
96         rhostrport2id4sdisc = self.rhostrport2id4sdisc
97         id4, sdisc = rhostrport2id4sdisc.get((addr, port), (None, None))
98         if id4:
99             logging.debug('ID4 full match. Source discrimination=%d.', sdisc)
100             return id4, sdisc
101         id4, sdisc = rhostrport2id4sdisc.get((addr, ''), (None, None))
102         if id4:
103             logging.debug('ID4 host match. Source discrimination=%d.', sdisc)
104             return id4, sdisc
105         id4, sdisc = rhostrport2id4sdisc.get(('', port), (None, None))
106         if id4:
107             logging.debug('ID4 port match. Source discrimination=%d.', sdisc)
108             return id4, sdisc
109         id4, sdisc = rhostrport2id4sdisc.get(('', ''), (None, None))
110         if id4:
111             logging.debug('ID4 generic match. Source discrimination=%d.', sdisc)
112             return id4, sdisc
113         logging.warning("Can't find ID4 for datagram from %s:%s", addr, port)
114         return None, None
115     
116
117     def _get_channel_by_addr(self, recv_from_info):
118         '''
119         Returns an element of self.channels.
120         Adds one when necessary.
121         Normally just use the IP address that is recv_from_info[0].
122         '''
123         addr, port = recv_from_info[:2]
124         #addr, port, flowinfo, scopeid = recv_from_info
125         #if flowinfo != 0 or scopeid != 0:
126         #    logging.info('(DEBUG) Weird rcv_from_info: %s', recv_from_info)
127
128         channelid = addr, port
129         channel = self.channels.get(channelid, None)
130         if channel is not None:
131             return channel
132
133         channelid = addr
134         channel = self.channels.get(channelid, None)
135         if channel is not None:
136             return channel
137
138         id4, sport_discriminate = self.recvinfo_to_id4sdisc(recv_from_info)
139         if id4 is None:
140             logging.warning(
141                 'Rejected datagram from unauthorized source %s:%s',
142                 recv_from_info[0], recv_from_info[1])
143             return None
144             # TODO: Or we keep track of them low level?
145         if sport_discriminate:
146             channelid = addr, port
147             channel = UdpChannel(id4, addr, port)
148         else:
149             channelid = addr
150             channel = UdpChannel(id4, addr)
151         self.channels[channelid] = channel
152         logging.info('New connection from %s', channel)
153             
154         return channel
155
156
157     def get_activity(self):
158         data, recv_from_info = self.sock.recvfrom(DEFAULT_MTU)
159         from_ = '%s:%s' % (formataddr(recv_from_info[0]), recv_from_info[1])
160         logging.debug('IN %s %s', from_, repr(data))
161         channel = self._get_channel_by_addr(recv_from_info)
162         if channel is None:
163             # Unauthorized
164             return None, None
165         channel.source.stats.npackets += 1
166         channel.source.stats.nbytes += len(data)
167         channel.source.stats.nbytes_ethernet += len(data) + UDP_HEADER_SIZE
168         channel.data += data
169         return from_, channel