-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmonitor.py
218 lines (164 loc) · 5.62 KB
/
monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
#!/usr/bin/env python3.5
# coding: utf-8
'''Show streaming graph of database.'''
import io
import os
import datetime
import logging
from dbinfo import dsn
import pandas
import psycopg2
from flask import Flask, jsonify, render_template, request, send_file
from threading import Thread
import time
log = logging.getLogger()
log.info("Starting application")
root = os.path.dirname(__file__)
sleep = 2 # timestep for reading database
app = Flask(__name__)
values = [[], [], []] # data, last values (with coords), coords
# Open database
class Database():
def __init__(self, dsn):
self.dsn = dsn
def __enter__(self):
self._connect()
return self
def __exit__(self, exc_type, exc_value, traceback):
self._disconnect()
def _connect(self):
self.conn = psycopg2.connect(self.dsn)
self.curs = self.conn.cursor()
log.debug("connection database ok")
def _disconnect(self):
self.curs.close()
self.conn.close()
log.debug("connection database closed")
def execute(self, sql):
self._connect()
log.debug("running SQL: %s" % sql)
self.curs.execute(sql)
self.conn.commit()
self._disconnect()
def execute_and_fetch_all(self, sql):
self._connect()
log.debug("running SQL: %s" % sql)
self.curs.execute(sql)
res = self.curs.fetchall()
self._disconnect()
return res
db = Database(dsn)
def dt2jst(dt):
""" Convert python 'datetime' object to javascript 'time' object.
:param dt: datetime object.
:return dt: javascript time as int.
"""
# with x 1000 to convert to javascript time
return int(
time.mktime(
time.strptime(
dt.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S')))
def prettydt(dt):
""" Convert datetime string.
:param dt: datetime stdring (%Y-%m-%d %H:%M:%S format).
:return: datetime string (%d/%m/%Y %H:%M:%S format).
"""
pydt = datetime.datetime.strptime(
dt.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S')
return pydt.strftime('%d/%m/%Y %H:%M:%S')
def poll_data():
""" Read data from the database and save them into list.
"""
while True:
# Read the last 100 rows
try:
res = db.execute_and_fetch_all("""
SELECT dt, value1, value2, value3, lon, lat
FROM qa.data
ORDER BY dt DESC LIMIT 100
""")
res = res[::-1]
except psycopg2.OperationalError:
log.error("error: cannot read database !")
time.sleep(sleep)
continue
if not res:
time.sleep(sleep)
continue
# list of (time, value) for the chart
data1 = [(dt2jst(dt) * 1000, float(value1))
for dt, value1, value2, value3, lon, lat in res]
data2 = [(dt2jst(dt) * 1000, float(value2))
for dt, value1, value2, value3, lon, lat in res]
data3 = [(dt2jst(dt) * 1000, float(value3))
for dt, value1, value2, value3, lon, lat in res]
# last record (and convert datetime in a pretty format)
# it will be use to add a marker inside map and show last value in a
# tooltip.
last = list(res[-1])
last[0] = prettydt(last[0])
last[1:] = [float(e) for e in last[1:]]
# list of coords (lat, lon) for polyline
coords = [(float(lat), float(lon))
for dt, value1, value2, value3, lon, lat in res]
# save data
values[0], values[1], values[2] = [data1, data2, data3], last, coords
# wait
time.sleep(sleep)
@app.route('/')
def home():
""" Homepage. """
return render_template('index.html', sleepms=sleep * 1000)
@app.route('/data')
def data():
""" Export data in JSON format. """
return jsonify(values=values[0], last=values[1], coords=values[2])
@app.route('/import')
def import_data():
""" Import data into database. """
lon = request.args.get('lon')
lat = request.args.get('lat')
value1 = request.args.get('value1', 'NULL')
value2 = request.args.get('value2', 'NULL')
value3 = request.args.get('value3', 'NULL')
now = datetime.datetime.utcnow()
if lon is None:
return jsonify(status='error', message='pas de données lon'), 400
if lat is None:
return jsonify(status='error', message='pas de données lat'), 400
sql = ("INSERT INTO qa.data (dt, value1, value2, value3, lon, lat) "
"VALUES ('{now:%Y-%m-%d %H:%M:%S}', {value1}, {value2}, {value3}, "
"{lon}, {lat})").format(**locals())
db.execute(sql)
return jsonify(status='ok')
@app.route('/export.csv')
def export_data():
""" Export data. """
sql = "SELECT dt, value1, value2, value3, lon, lat FROM qa.data ORDER BY dt"
with Database(dsn) as db:
df = pandas.read_sql(sql, db.conn)
f = io.BytesIO()
writer = pandas.ExcelWriter(f, engine='xlsxwriter')
df.to_excel(writer, sheet_name='data')
writer.close()
f.seek(0)
return send_file(f, attachment_filename="data.xlsx",
as_attachment=True)
@app.route('/clean')
def clean():
""" Clean database. """
sql = "DELETE FROM qa.data"
db.execute(sql)
return jsonify(status='ok')
@app.route('/doc')
def doc():
with open(os.path.join(root, 'HOWTO.md'), 'rb') as f:
txt = f.read().decode('utf-8')
return "<pre>" + txt + "</pre>"
# Run a thread to read data
thr = Thread(target=poll_data)
thr.daemon = True
thr.start()
if __name__ == '__main__':
# Start application
app.run(host='0.0.0.0', port=9877, debug=True)