Application Integration

Applying External Webhooks To Existing APIs Using AWS Lambda

We’ve been developing serverless streaming connectors that run in AWS Lambda, allowing anyone to stream data from common APIs like Twitter, Stack Exchange, and Reddit lately. The serverless functions are Node.js scripts evolved from the Streamdata.io JavaScript SDK, to publish results to Amazon S3 buckets, an operate within a serverless environment. As we continue to roll out more streaming connectors for filling up Amazon S3 data lakes, we are working on other variations that help API consumers augment the existing APIs they depend on with event-driven features like streaming, and now webhooks.

The next version of our serverless connector will take each proxied API result, and incremental update and publish to a webhook if specific criteria are met. Augmenting existing APIs with webhooks from the outside-in, even if the API providers aren’t quite ready to be investing in their own webhooks, and event-driven architecture. Resulting in a function that looks like this:


exports.handler = (event, context, callback) => {
    
    // Streamdata Dependencies
    var streamdataio = require('streamdataio-js-sdk/dist/bundles/streamdataio-node');
    var AuthStrategy = require('streamdataio-js-sdk-auth');
    
    // All The Other Dependencies
    var jsonPatch = require('fast-json-patch');
    var print = require('node-print');
    var AWS = require('aws-sdk');

    function server()
    {
      // targetUrl is the JSON API you wish to stream
      var targetUrl = process.env.targetUrl;
      
      // q is the query parameter to search by
      var query = process.env.query;
      
      targetUrl = targetUrl + '?q=' + query;

      // appToken is the Streamdata.io token
      var appToken = process.env.appToken;
      
      // oauthToken is the Twitter Oauth Token
      var oauthToken = process.env.oauthToken;
      
      // userAgent is used to identify your client
      var userAgent = process.env.userAgent;      
      
      var privateKey = '';
      
      var headers = ['Authorization: Bearer ' + oauthToken,'User-Agent: ' + userAgent];
    
      var eventSource = streamdataio.createEventSource(targetUrl, appToken, headers, AuthStrategy.newSignatureStrategy(appToken, privateKey));
      var result = [];
    
      eventSource
      // the standard 'open' callback will be called when connection is established with the server
        .onOpen(function ()
        {
          console.log("connected!");
        })
        // the streamdata.io specific 'data' event will be called when a fresh Json data set
        // is pushed by Streamdata.io coming from the API
        .onData(function (data)
        {
          console.log("data received");
          // memorize the fresh data set
          
          result = data;
          console.log(result);
          
          // if some criteria is met      
          criteria = '';
          
          if(criteria == 'met')
          {
            // An object of options to indicate where to post to
            var post_options = {
                host: 'example.com',
                port: '80',
                path: '/webhook-location/',
                method: 'POST',
                headers: {
                    'Content-Type': 'application/x-www-form-urlencoded',
                    'Content-Length': Buffer.byteLength(result)
                }
            };                

            // Set up the request
            var post_req = http.request(post_options, function(res) {
                res.setEncoding('utf8');
                res.on('data', function (chunk) {
                    console.log('Response: ' + chunk);
                });
            });

            // post the data
            post_req.write(result);
            post_req.end();  
    }
          
        })
        // the streamdata.io specific 'patch' event will be called when a fresh Json patch
        // is pushed by streamdata.io from the API. This patch has to be applied to the
        // latest data set provided.
        .onPatch(function (patch)
        {
          // display the patch
          console.log("patch: ", patch);
          
          // apply the patch to data using json patch API
          jsonPatch.applyPatch(result, patch);
          
          // if some criteria is met      
          criteria = '';
          
          if(criteria == 'met')
          {
            // An object of options to indicate where to post to
            var post_options = {
                host: 'example.com',
                port: '80',
                path: '/webhook-location/',
                method: 'POST',
                headers: {
                    'Content-Type': 'application/x-www-form-urlencoded',
                    'Content-Length': Buffer.byteLength(result)
                }
            };                

            // Set up the request
            var post_req = http.request(post_options, function(res) {
                res.setEncoding('utf8');
                res.on('data', function (chunk) {
                    console.log('Response: ' + chunk);
                });
            });

            // post the data
            post_req.write(result);
            post_req.end();  
    }
            
        })
    
        // the standard 'error' callback will be called when an error occur with the evenSource
        // for example with an invalid token provided
        .onError(function (error)
        {
          console.log('ERROR!', error);
          eventSource.close();
    
        });
    
      eventSource.open();
    
    }
    
    console.log('starting');
    server();

    // TODO implement
    callback(null, 'Hello from Lambda');
};

We are still working on the handler for the webhook handler. Deciding what should be met for each webhook to post. We could just post every result to the webhook location, and let it sort out the details, but we think the logic should exist within the serverless function. Providing modular logic for augmenting existing APIs with webhook functionality. I’m guessing the webhooks criteria will need to be unique for each individual API we are augmenting. In this case, it is Twitter, so we’ll need to build some handler for looking at the resulting Tweets, followers, and other activity for changes and meaningful events (ie, contains URL, has a new follower, etc.).

We will publish a couple of variations of our serverless webhooks to the AWS Serverless Application Repository, and begin gathering feedback from users on what types of features they’d like to see. We don’t want to assume too much when it comes to what users are looking for when it comes to augmenting existing APIs with event-driven functionality. It is not something we think that people are aware of is possible, so we feel like we should put the concept out there, and let people start considering the possibilities. In the meantime we’ll play with some more variations so that we can keep pushing the boundaries when it comes to augmenting existing APIs with external event-driven features.

AI in Finance White paper - API Integrations
**Original source: streamdata.io blog