-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstorage-api.js
More file actions
95 lines (75 loc) · 3.14 KB
/
storage-api.js
File metadata and controls
95 lines (75 loc) · 3.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
var http = require('http');
var path = require('path');
var redis = require('redis');
var env = process.env.NODE_ENV || 'production';
var config = require(path.join(__dirname, 'config', 'config.json'))[env];
var debugModule = require('debug');
config.logging && debugModule.enable('storage');
var debug = debugModule('storage');
var port = parseInt(config.port, 10);
var lastRatesNumber = parseInt(config.lastRatesNumber, 10);
var redisPort = parseInt(config.redisPort, 10);
var client = redis.createClient(redisPort, config.redisHost);
client.on('error', function (err) {
console.log('Redis connection error: ' + err);
});
http.createServer(function (request, response) {
if ('POST' == request.method) {
var body = '';
request.on('data', function (data) {
body += data;
// Too much POST data, kill the connection!
if (body.length > 1e6) {
request.connection.destroy();
}
});
request.on('end', function () {
function tryParseJson(str) {
try {
return JSON.parse(str);
} catch (ex) {
debug('Invalid JSON passed: %s', str);
return null;
}
}
var data = tryParseJson(body);
if (null == data) {
return;
}
data.flows = data.flows || [];
data.messages = parseInt(data.messages || 0, 10);
data.countries = data.countries || {};
var multi = client.multi();
multi.incrby('batch', 1);
multi.incrby('messages', data.messages);
for (var country in data.countries) {
if (!data.countries.hasOwnProperty(country)) {
continue;
}
multi.hincrby('countries', country.toUpperCase(), parseInt(data.countries[country], 10));
}
for (var i = 0; i < data.flows.length; i++) {
var flow = data.flows[i];
if (!flow.source || !flow.target) {
console.log('Wrong format of flow: ' + JSON.stringify(flow));
continue;
}
var source = flow.source.toUpperCase();
var target = flow.target.toUpperCase();
flow.from = parseFloat(flow.from || 0, 10);
flow.to = parseFloat(flow.to || 0, 10);
flow.rate = parseFloat(flow.rate || 0, 10);
multi.sadd('currencies', source, target);
multi.hincrbyfloat('flows', source + ':from:' + target, flow.from);
multi.hincrbyfloat('flows', source + ':to:' + target, flow.to);
multi.rpush('rates:' + source + ':' + target, flow.rate);
multi.ltrim('rates:' + source + ':' + target, 0, lastRatesNumber - 1);
}
multi.publish(config.redisChannel, 'data');
multi.exec(config.logging ? redis.print : null);
});
}
response.writeHead(204);
response.end();
}).listen(port, config.host);
console.log('Listening on http://' + config.host + ':' + port);