Functional inputs.run multiservice launcher. But it still mixes all the raw streams...
[ais.git] / bin / inputs / virtual.py
1 # -*- coding: utf-8 -*-
2 '''
3 AIS virtual bases for Channels and Services.
4 '''
5
6 __all__ = [
7     'Service',
8     ]
9 #
10 # Un Service c'est udp ou serialin ou tcpout
11 # Une source a un id4, donc des stats un et logger, mais pas de buffer
12 # Un canal a un buffer, et est associ√© √† une source
13
14 import sys
15 from time import time as get_timestamp
16 from ais.inputs.stats import STATS_RATE
17 from ais.inputs.common import refresh_all_stats
18
19 class Service:
20     '''
21     A service provides data for processing.
22     This is a pure virtual class, an interface, that need to be subclassed.
23     Example services are TcpOutService, UdpService, and SerialService.
24     '''
25     #def __init__(self, stdout):
26     #    '''
27     #    Set stdout = True to get a raw output suitable for piping.
28     #    '''
29     #    self.stdout = stdout
30         
31
32     def fileno(self):
33         '''
34         All services must implement such a function.
35         It should return a file descriptor suitable for os.select()
36         '''
37         raise NotImplemented
38
39     def get_activity(self):
40         '''
41         All services must implement such a function.
42         It should return a tuple (from_, channel)
43         from_ is a low-level id of the source.
44         channels contain a "data" buffer with bytes waiting to be processed,
45           and a reference to a source.
46         '''
47         raise NotImplemented
48
49     def run(self, stdout):
50         '''
51         Service main loop.
52         It shoudn't exit short of receiving SIGINT.
53         '''
54         last_stat = get_timestamp()
55         while True:
56             from_, channel = self.get_activity()
57             timestamp = get_timestamp()       
58             if channel is not None:
59                 for line in ais_line_reader(channel):
60                     channel.source.stats.nlines += 1
61
62                     # write to stdout if asked
63                     if stdout:
64                         sys.stdout.write(line+'\r\n')
65
66                     # dump the line to file
67                     channel.source.logger.log_line(timestamp, from_, line)
68
69                     # forward the line
70                     channel.source.outpeers.send_line(line)
71                   
72                 # we don't want any stdout buffering
73                 if stdout:
74                     sys.stdout.flush()
75
76             # log statistics every STATS_RATE seconds
77             if timestamp - last_stat > STATS_RATE:
78                 refresh_all_stats()
79                 last_stat = timestamp
80
81
82
83 #TODO: create an interface Channel and put ais_line_reader in
84 def ais_line_reader(channel):
85     """
86     parse channel.data and returns lines, without \r\n
87     """
88     data = channel.data
89     while True:
90         idx_line_end = sys.maxint
91         for sep in '\r\0\n':
92             idx = data.find(sep)
93             if idx != -1 and idx < idx_line_end:
94                 idx_line_end = idx
95         if idx_line_end == sys.maxint:
96             channel.data = data
97             break
98
99         ##logging.debug('data=%s idxll=%s', repr(data), idx_line_end)
100         #TODO improve me
101         line = data[:idx_line_end]
102         while data[idx_line_end] in '\r\n\0' and idx_line_end < len(data) - 1:
103             idx_line_end += 1
104         if idx_line_end == len(data)-1:
105             data = ''
106         else:
107             data = data[idx_line_end:]
108         #logging.debug('line=%s data=%s', repr(line), repr(data))
109
110         line = line.rstrip('\r\n\0')
111      
112         if not line:
113             continue # ignore empty line
114
115         yield line
116
117