Fix for marinetraffic web change
[ais.git] / bin / ntools.py
1 # -*- coding: utf-8 -*-
2 from __future__ import division
3
4 import os
5 import calendar
6
7 __all__ = [
8     'IPV4_IN_IPV6_PREFIX',
9     'datetime_to_timestamp',
10     'read_cfg',
11     'clean_ais_charset',
12     'clean_alnum',
13     'clean_alnum_unicode',
14     'open_with_mkdirs',
15     'logliner',
16     'dumpsource',
17     'xml_escape',
18     'alarm',
19     'str_split_column_ipv6',
20     'formataddr',
21     'LatLonFormatError',
22     'clean_latitude',
23     'clean_longitude',
24     ]
25
26 IPV4_IN_IPV6_PREFIX = '::ffff:'
27
28 def datetime_to_timestamp(dt):
29     return calendar.timegm(dt.utctimetuple())
30
31 def read_cfg(filename):
32     '''
33     Function that reads a file in the form
34     key=value
35     and returns the resulting dictionary
36     '''
37     cfg = {}
38     for line in file(filename).readlines():
39         line = line.rstrip('\r\n\0')
40         line = unicode(line, 'utf-8')
41         if line.startswith(u'#'):
42             continue # skip comments
43         spl = line.split(u'=', 1)
44         if len(spl) == 2:
45             cfg[spl[0]] = spl[1]
46         else:
47             cfg[spl[0]] = None
48     return cfg
49
50
51 def clean_ais_charset(txt):
52     assert isinstance(txt, str)
53     result = ''
54     for c in txt:
55         oc = ord(c)
56         if oc < 32 or oc > 95:
57             result += ''
58         else:
59             result += c
60     return result
61
62 def clean_alnum(txt):
63     assert isinstance(txt, str)
64     result = ''
65     for c in txt:
66         if ( c >= '0' and c <= '9' ) or ( c >= 'A' and c <= 'Z' ):
67             result += c
68     return result
69
70 def clean_alnum_unicode(txt):
71     assert isinstance(txt, unicode)
72     return unicode(clean_alnum(txt.encode('ascii7', 'replace')))
73     
74     
75 def open_with_mkdirs(filename, mode):
76     try:
77         return file(filename, mode)
78     except IOError, ioerr:
79         # FIXME only if doesn't exists ...
80         #print 'Creating directory', os.path.dirname(filename)
81         os.makedirs(os.path.dirname(filename))
82         return file(filename, mode)
83
84
85 # log file source
86 def logliner(filename):
87     for line in file(filename).readlines():
88         yield line
89
90
91 # debug/display wraper source
92 def dumpsource(source):
93     for line in source:
94         while line and line[-1] in '\r\n\0':
95             line = line[:-1]
96         print "INPUT", line
97         yield line
98
99 def xml_escape(txt):
100     return txt.replace(u'&', u'&amp;').replace(u'<', u'&lt;')
101
102 def alarm():
103     os.system('touch /home/nirgal/kod/ais/alarm &')
104
105
106 def str_split_column_ipv6(txt):
107     '''
108     Helper function that will split a column separated string of tokens.
109     It will take care not to ignore : in [] for ipv6 support.
110     '''
111     res = []
112     in_bracket = False
113     iprev = None
114     for i in range(len(txt)):
115         if not in_bracket:
116             if txt[i] == ':':
117                 res.append(txt[iprev:i])
118                 iprev = i+1
119             elif txt[i] == '[':
120                 in_bracket = True
121         else: # in bracket
122             if txt[i] == ']':
123                 in_bracket = False
124     res.append(txt[iprev:])
125     return res
126
127
128 def formataddr(addr):
129     if addr.startswith(IPV4_IN_IPV6_PREFIX):
130         return addr[7:]
131     elif ':' in addr:
132         return '['+addr+']'
133     else:
134         return addr
135
136
137
138
139 class LatLonFormatError(ValueError):
140     pass
141
142
143 def clean_latitude(data):
144     '''
145     Convert a string latitude into a float.
146     Raises LatLonFormatError on error.
147     '''
148     data = data.replace(u"''", u'"') # common mistake
149     data = data.replace(u' ', u'') # remove spaces
150     sides = u'SN'
151     if not data:
152         return None
153     tmp, side = data[:-1], data[-1]
154     if side == sides[0]:
155         side = -1
156     elif side == sides[1]:
157         side = 1
158     else:
159         raise LatLonFormatError(u'Last character must be either %s or %s.', sides[0], sides[1])
160     spl = tmp.split(u'°')
161     if len(spl) == 1:
162         raise LatLonFormatError(u'You need to use the ° character.')
163     d, tmp = spl
164     try:
165         d = float(d)
166     except ValueError:
167         raise LatLonFormatError(u'Degrees must be an number. It\'s %s.', d)
168     spl = tmp.split(u"'", 1)
169     if len(spl) == 1:
170         # no ' sign: ok only if there is nothing but the side after °
171         # we don't accept seconds if there is no minutes:
172         # It might be an entry mistake
173         tmp = spl[0]
174         if len(tmp) == 0:
175             m = s = 0
176         else:
177             raise LatLonFormatError(u'You must use the \' character between ° and %s.', data[-1])
178     else:
179         m, tmp = spl
180         try:
181             m = float(m)
182         except ValueError:
183             raise LatLonFormatError(u'Minutes must be an number. It\'s %s.', m)
184         if len(tmp) == 0:
185             s = 0
186         else:
187             if tmp[-1] != '"':
188                 raise LatLonFormatError(u'You must use the " character between seconds and %s.', data[-1])
189             s = tmp[:-1]
190             try:
191                 s = float(s)
192             except ValueError:
193                 raise LatLonFormatError(u'Seconds must be an number. It\'s %s.', s)
194     data = side * ( d + m / 60 + s / 3600)
195
196     if data < -90 or data > 90:
197         raise LatLonFormatError(u'%s in not in -90..90 range', data)
198     return data
199
200
201
202 def clean_longitude(data):
203     '''
204     Convert a string latitude into a float.
205     Raises LatLonFormatError on error.
206     '''
207     data = data.replace(u"''", u'"') # common mistake
208     data = data.replace(u' ', u'') # remove spaces
209     sides = u'WE'
210     if not data:
211         return None
212     tmp, side = data[:-1], data[-1]
213     if side == sides[0]:
214         side = -1
215     elif side == sides[1]:
216         side = 1
217     else:
218         raise LatLonFormatError(u'Last character must be either %s or %s.', sides[0], sides[1])
219     spl = tmp.split(u'°')
220     if len(spl) == 1:
221         raise LatLonFormatError(u'You need to use the ° character.')
222     d, tmp = spl
223     try:
224         d = float(d)
225     except ValueError:
226         raise LatLonFormatError(u'Degrees must be an number. It\'s %s.', d)
227     spl = tmp.split(u"'", 1)
228     if len(spl) == 1:
229         # no ' sign: ok only if there is nothing but the side after °
230         # we don't accept seconds if there is no minutes:
231         # It might be an entry mistake
232         tmp = spl[0]
233         if len(tmp) == 0:
234             m = s = 0
235         else:
236             raise LatLonFormatError(u'You must use the \' character between ° and %s.', data[-1])
237     else:
238         m, tmp = spl
239         try:
240             m = float(m)
241         except ValueError:
242             raise LatLonFormatError(u'Minutes must be an number. It\'s %s.', m)
243         if len(tmp) == 0:
244             s = 0
245         else:
246             if tmp[-1] != '"':
247                 raise LatLonFormatError(u'You must use the " character between seconds and %s.', data[-1])
248             s = tmp[:-1]
249             try:
250                 s = float(s)
251             except ValueError:
252                 raise LatLonFormatError(u'Seconds must be an number. It\'s %s.', s)
253     data = side * ( d + m / 60 + s / 3600)
254
255     if data < -180 or data > 180:
256         raise LatLonFormatError(u'%s in not in -180..180 range', data)
257     return data
258
259 #if __name__ == '__main__':
260 #    for test in ('12:34:56:78',
261 #                 ':12::56:',
262 #                 '1:2:3:abc:[bd:ef::]:45',
263 #                 '1:2:3:abc:[bd:ef:]:]:45'):
264 #        print test, str_split_column_ipv6(test)