Ingest real time streaming data for time series analysis

This post describes how to ingest sample real time streaming data from PubNub to a Postgres database with the TimescaleDB extension installed and enabled for time series analysis. I wrote a small Node.js app to test and demonstrate how TimescaleDB performs well in fetching results while data is also being ingested in the database. Will be using the following data feed, Market Orders – an artificial data stream that provides the latest market orders for a fictitious marketplace and you can clone or download the GitHub repository from -> https://github.com/sfrechette/stream-sequelize-node.

The Market Orders data feed generates on average 4 inserts per second, if you would to ingest more inserts per second I recommend you look at the following data feed Sensor Network which generates on average 10 inserts per second.

Ingest real time streaming data for time series analysis

Prerequesites

Running this application assumes that you already have an instance of Postgres running and have installed the TimescaleDB extension. If not please refer to the following documentation -> Getting started, installing and setting up -> https://docs.timescale.com/v0.9/getting-started.

Once Postgres with the TimescaleDB extension is installed you will need to execute the following SQL code snippets in your Postgres environment using your preferred tool to connect (I am using pgcli) in order to create the database, create the timescaledb extension, create the schema, and hypertable.

Issue the following SQL statements in your Postgres environment:

create database market_orders;

\c market_orders

create extension if not exists timescaledb cascade;

create table orders (
	order_time          bigint not null, 
	trade_type          text not null,
	symbol              text not null, 
	order_quantity      double precision not null, 
	bid_price           double precision not null
);

create index on orders(order_time desc);
create index on orders(trade_type, order_time desc);
create index on orders(symbol, order_time desc);
create index on orders(order_time desc, order_quantity);
create index on orders(order_time desc, bid_price);

-- 86400000000 is in usecs and is equal to 1 day
select create_hypertable('orders', 'order_time', chunk_time_interval => 86400000000);

Installation

Clone the repository:

git clone https://github.com/sfrechette/stream-sequelize-node.git

cd stream-sequilize-node

Next install the dependencies with the following command:
npm install

You will then need to create an .env file in the root folder of the app “stream-sequilize-node” and add configuration values. Copy and paste the following snippet in your .env file and provide the appropriate connectivity information to your Postgres environment:

DB_SERVER_USER_NAME = ""    
DB_SERVER_USER_PASSWORD = ""    
DB_SERVER_HOST = "" 
DB_SERVER_PORT = ""     

Usage

You are now ready to start ingesting data, simply issue the following command:

node ingest.js

Sample queries

Assuming that you have been ingesting data for a while (you can modify the time interval in the script below), run some queries to test and validate how Timescale performs nicely in fetching results while data is also being ingested in the database. More data the better 😉

-- Average bid price by 5 minute intervals for Google and day trade type 
select 	time_bucket('5 minutes', to_timestamp(order_time)) as five_min,
        avg(bid_price) as avg_bid_price
from 	orders
where	symbol = 'Google' and trade_type = 'day'
group by five_min 
order by five_min limit 10;
-- Min and Max bid price by 2 minute intervals for all symbols 
select 	time_bucket('2 minutes', to_timestamp(order_time)) as two_min,
        symbol, 
        min(bid_price) as min_bid_price,
        max(bid_price) as max_bid_price 
from 	orders
group by two_min, symbol
order by two_min desc limit 10;
-- Orders with order quantity greater than 975...
select 	order_time,
        trade_type, 
        order_quantity, 
        bid_price
from 	orders
where	order_quantity > 975 and 
        symbol = 'Bespin Gas' and 
        trade_type = 'market' limit 20;

Enjoy!

comments powered by Disqus