AWS Kinesis Stream (not Firehose)

I can successfully post data to an AWS Kinesis Firehose delivery stream but I’m finding that the time from when the Imp posts the data and when it arrives in my Redshift DB (via S3 bucket) is roughly 5 minutes, which is way too slow for the application I’m developing. How might I be able to post data from my Imp to a Kinesis Stream instead so that I can write my own .NET record consumer application and be able to process data in real time?

I suspect you just need to talk to the correct AWS API. The delay is on Amazon’s side, and sort-of fits with their perceived applications for firehose (lots of data, zero pre-configuration required).

Have you tried taking the firehose library, changing the SERVICE string from “firehose” to “kinesis” and the TARGET_PREFIX from “Firehose_20150804” to “Kinesis_20131202”? That might “just work” as the PutRecord API - at first glance - looks about the same.

Thanks Hugo,

I had to make a couple of changes in the Firehose library to the to the post body as well including adding the partition key and changing the format of the data string. I’ve got the code below working on the Imp side, and also a console application in Visual Studio pulling the data out. Happy days!

#require “AWSRequestV4.class.nut:1.0.1”

class AWSKinesisStream {
static version = [1, 0, 1];

static SERVICE = “kinesis”;
static TARGET_PREFIX = “Kinesis_20131202”;

// Size limits
static MAX_DELIVERY_STREAM_NAME_LEN = 64; // bytes
static MAX_DATA_BLOB_LEN = 1024000; // bytes
static MAX_DATA_ARRAY_LEN = 500; // elements

_awsRequest = null;

/**

  • @param {string} region
  • @param {string} accessKeyId
  • @param {string} secretAccessKey
    */
    constructor(region, accessKeyId, secretAccessKey) {
    if (“AWSRequestV4” in getroottable()) {
    _awsRequest = AWSRequestV4(SERVICE, region, accessKeyId, secretAccessKey);
    } else {
    throw (“This class requires AWSRequestV4 - please make sure it is loaded.”);
    }
    }

/**

  • Put record onto the stream
  • @param {string} streamName
  • @param {string} partitionKey
  • @param {string|blob} data
  • @param {function} cb
  • @return {null}
    */
    function putRecord(streamName, partitionKey, data, cb) {
    // Validate input length
    if (streamName.len() > MAX_DELIVERY_STREAM_NAME_LEN) {
    server.error(format(“Delivery stream name must be no more than %d characters.”, MAX_DELIVERY_STREAM_NAME_LEN));
    return;
    }
if (data.len() > MAX_DATA_BLOB_LEN) {
  server.error(format("Data blob length must be no more than %d bytes", MAX_DATA_BLOB_LEN));
  return;
}

local headers = {
  "X-Amz-Target": format("%s.PutRecord", TARGET_PREFIX)
};

local body = {
  "StreamName": streamName,
  "PartitionKey": partitionKey,
  "Data": http.base64encode(data)
};

_awsRequest.post("/", headers, http.jsonencode(body), cb);

}
}

Timing wise, i can post a record into the stream and retrieve it back out in less than a second.

Excellent! We may just re-package this as a separate Kinesis library.

What sort of latency are you seeing with raw Kinesis?

I tried to setup a test where the Imp passes it’s timestamp and the consumer console .NET application prints that plus a timestamp when it picks up the record. It’s showing around 5s difference but the data appears in the console in less than 1s. Perhaps there’s just a small difference in the RTC of the Imp and the clock on my laptop.

Here’s the stream monitoring charts, if that’s what you meant?

Sounds like it should be in the <100ms range - the console update may also be delayed.

To get a real ntp-accurate timestamp, you can use something like this:

local d=date(); local s = format("%d.%d", d.time, d.usec);

…d.time is the unix time (seconds since 1/1/70) and d.usec is the number of microseconds elapsed in that second. The imp servers are NTP synchronized so you should be able to effectively compare these with any other NTP synchronized box.

@hugo BTW, date().usec is useful, but undocumented.

EDIT: Whoops, it now appears to be documented. I remember it being absent from here

It was absent, then I was benchmarking some Azure stuff and noticed it was missing, so @smittytone fixed it :slight_smile: