In this example we will develop an Oficloud project that will read data from a channel and a) write that data to a CSV file or b) dump that data to a port where Apache Spark will be reading to obtain streaming data.
This way you can bring data in real time or batch mode to any data analytics software.
You can run that project locally on the machine that will develop the data analytics task, ensuring that way that your unencrypted data is always on a computer controlled by you.
You can run that project on any OS supporting Node JS.
STEPS
var oficloud = require('./oficloud');
const fs = require('fs');
const username="MYEMAIL@mail.com";
const password="MUPASSWORD";
const channel="MYCHANNEL";
const filename='MYFILENAME.csv';
function remove_non_ascii(str) {
if ((str===null) || (str===''))
return false;
else
str = str.toString();
return str.replace(/[^\x20-\x7E]/g, '');
}
oficloud.login(username,password,function(res){
if(res.e==-1)
{
console.log("Invalid login");
return;
}
console.log("login res: "+res.res);
function onmsg(msg)
{
//Process received messages here
console.log("msg: "+remove_non_ascii(msg));
try{
msg=JSON.parse(remove_non_ascii(msg));
}
catch(e)
{
console.log("invalid msg: "+e);
return;
}
if( typeof msg.temperature !== "undefined")
{
var s=""+msg.temperature+","+msg.humidity+"\r\n";
fs.appendFile(filename, s, function (err) {
if (err) throw err;
});
}
}
oficloud.open_channel(channel,onmsg,function(err){
if(typeof err !="undefined")
{
console.log("open channel error: "+err);
return;
}
//Do your stuff here once you are logged in and joined a channel
//(...)
})
});
This script will remain reading on a channel and appending readed data to the CSV file.
Alternatively, you can write a script that dumps all received data to a port where a real time data analytics tool will be reading. i.e.: dump data to a TCP port where a Spark job is reading.
var oficloud = require('./oficloud');
var net = require('net');
const username="MYEMAIL@mail.com";
const password="MUPASSWORD";
const channel="MYCHANNEL";
const MYPORT=1234;
var client=null;
function remove_non_ascii(str) {
if ((str===null) || (str===''))
return false;
else
str = str.toString();
return str.replace(/[^\x20-\x7E]/g, '');
}
oficloud.login(username,password,function(res){
if(res.e==-1)
{
console.log("Invalid login");
return;
}
console.log("login res: "+res.res);
function onmsg(msg)
{
//Process received messages here
console.log("msg: "+remove_non_ascii(msg));
try{
msg=JSON.parse(remove_non_ascii(msg));
}
catch(e)
{
console.log("invalid msg: "+e);
return;
}
if( typeof msg.temperature !== "undefined")
{
var s=""+msg.temperature+","+msg.humidity+"\r\n";
client.write(s);
}
}
client = new net.Socket();
client.connect(MYPORT, '127.0.0.1', function() {
oficloud.open_channel(channel,onmsg,function(err){
if(typeof err !="undefined")
{
console.log("open channel error: "+err);
return;
}
//Do your stuff here once you are logged in and joined a channel
//(...)
})
});
});
This script will remain reading on a channel and sending data to the TCP port.
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import sys
import requests
# crearw a spark configuration
conf = SparkConf()
conf.setAppName("ExampleStreamApp")
# create a spark context
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
# create the streaming context with a interval of 2 seconds
ssc = StreamingContext(sc, 2)
ssc.checkpoint("checkpoint_ExampleApp")
# reading from port MYPORT
dataStream = ssc.socketTextStream("localhost",MYPORT)
# here you can process your data