We’ve been developing serverless streaming connectors that run in AWS Lambda, allowing anyone to stream data from common APIs like Twitter, Stack Exchange, and Reddit lately. The serverless functions are Node.js scripts evolved from the Streamdata.io JavaScript SDK, to publish results to Amazon S3 buckets, an operate within a serverless environment. As we continue to roll out more streaming connectors for filling up Amazon S3 data lakes, we are working on other variations that help API consumers augment the existing APIs they depend on with event-driven features like streaming, and now webhooks.
The next version of our serverless connector will take each proxied API result, and incremental update and publish to a webhook if specific criteria are met. Augmenting existing APIs with webhooks from the outside-in, even if the API providers aren’t quite ready to be investing in their own webhooks, and event-driven architecture. Resulting in a function that looks like this:
exports.handler = (event, context, callback) => {
// Streamdata Dependencies
var streamdataio = require('streamdataio-js-sdk/dist/bundles/streamdataio-node');
var AuthStrategy = require('streamdataio-js-sdk-auth');
// All The Other Dependencies
var jsonPatch = require('fast-json-patch');
var print = require('node-print');
var AWS = require('aws-sdk');
function server()
{
// targetUrl is the JSON API you wish to stream
var targetUrl = process.env.targetUrl;
// q is the query parameter to search by
var query = process.env.query;
targetUrl = targetUrl + '?q=' + query;
// appToken is the Streamdata.io token
var appToken = process.env.appToken;
// oauthToken is the Twitter Oauth Token
var oauthToken = process.env.oauthToken;
// userAgent is used to identify your client
var userAgent = process.env.userAgent;
var privateKey = '';
var headers = ['Authorization: Bearer ' + oauthToken,'User-Agent: ' + userAgent];
var eventSource = streamdataio.createEventSource(targetUrl, appToken, headers, AuthStrategy.newSignatureStrategy(appToken, privateKey));
var result = [];
eventSource
// the standard 'open' callback will be called when connection is established with the server
.onOpen(function ()
{
console.log("connected!");
})
// the streamdata.io specific 'data' event will be called when a fresh Json data set
// is pushed by Streamdata.io coming from the API
.onData(function (data)
{
console.log("data received");
// memorize the fresh data set
result = data;
console.log(result);
// if some criteria is met
criteria = '';
if(criteria == 'met')
{
// An object of options to indicate where to post to
var post_options = {
host: 'example.com',
port: '80',
path: '/webhook-location/',
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'Content-Length': Buffer.byteLength(result)
}
};
// Set up the request
var post_req = http.request(post_options, function(res) {
res.setEncoding('utf8');
res.on('data', function (chunk) {
console.log('Response: ' + chunk);
});
});
// post the data
post_req.write(result);
post_req.end();
}
})
// the streamdata.io specific 'patch' event will be called when a fresh Json patch
// is pushed by streamdata.io from the API. This patch has to be applied to the
// latest data set provided.
.onPatch(function (patch)
{
// display the patch
console.log("patch: ", patch);
// apply the patch to data using json patch API
jsonPatch.applyPatch(result, patch);
// if some criteria is met
criteria = '';
if(criteria == 'met')
{
// An object of options to indicate where to post to
var post_options = {
host: 'example.com',
port: '80',
path: '/webhook-location/',
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'Content-Length': Buffer.byteLength(result)
}
};
// Set up the request
var post_req = http.request(post_options, function(res) {
res.setEncoding('utf8');
res.on('data', function (chunk) {
console.log('Response: ' + chunk);
});
});
// post the data
post_req.write(result);
post_req.end();
}
})
// the standard 'error' callback will be called when an error occur with the evenSource
// for example with an invalid token provided
.onError(function (error)
{
console.log('ERROR!', error);
eventSource.close();
});
eventSource.open();
}
console.log('starting');
server();
// TODO implement
callback(null, 'Hello from Lambda');
};
We are still working on the handler for the webhook handler. Deciding what should be met for each webhook to post. We could just post every result to the webhook location, and let it sort out the details, but we think the logic should exist within the serverless function. Providing modular logic for augmenting existing APIs with webhook functionality. I’m guessing the webhooks criteria will need to be unique for each individual API we are augmenting. In this case, it is Twitter, so we’ll need to build some handler for looking at the resulting Tweets, followers, and other activity for changes and meaningful events (ie, contains URL, has a new follower, etc.).
We will publish a couple of variations of our serverless webhooks to the AWS Serverless Application Repository, and begin gathering feedback from users on what types of features they’d like to see. We don’t want to assume too much when it comes to what users are looking for when it comes to augmenting existing APIs with event-driven functionality. It is not something we think that people are aware of is possible, so we feel like we should put the concept out there, and let people start considering the possibilities. In the meantime we’ll play with some more variations so that we can keep pushing the boundaries when it comes to augmenting existing APIs with external event-driven features.
Follow us on social