Streamdata.io is a service acting as a reverse proxy that translates REST API into a stream of data. You can request a proxy from different contexts and using different languages. In this article, we will walk through a step-by-step tutorial on developing a GitHub Android Client App that provides real-time notifications when commits are pushed to repositories, thanks to streamdata.io service.
Summary
- Install Android Studio & SDK
- Streamdata.io-Android dependencies
- Subscribe to an API
- Receive messages from Streamdata.io proxy
1. Install Android Studio & SDK
Please follow this guide to install Android Studio.
2. Streamdata.io-Android dependencies
We will focus first on how to use Streamdata.io with Java Android.
Streamdata.io needs three Java external dependencies. All of them are available on GitHub:
- JSON Parser – Jackson Databind
- JSON Patch
- Event Source Android
3. Subscribe to an API
Allowing network operations
To perform networking operations, your application manifest must include the following permissions:
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
In order to use Eventsource, you should create and connect the event source from a separate thread. If you wish to handle your event source in the activity main thread, you’ll need to add the following instructions at the beginning of the onCreate() android callback:
StrictMode.ThreadPolicy policy = new StrictMode.ThreadPolicy.Builder().permitAll().build();
StrictMode.setThreadPolicy(policy);
Streamdata.io proxy authentication
Streamdata.io requires an App Token to accept requests from a client. Once registered on our Portal, you can create an application or use the default one to get your own App Token.
Calling an API through Streamdata.io
You can try your token by using a CURL request in a shell. To do this, you have to pass a URL parameter named X-Sd-Token.
Here is an example of a call that you can do in a shell:
curl https://streamdata.motwin.net/https://api.bitcoinaverage.com/ticker/EUR/?X-Sd-Token=[YOURTOKEN]
As you have understood, this URL request is composed of three parts:
- https://streamdata.motwin.net – Streamdata.io proxy prefix (DO NOT change it!).
- https://api.bitcoinaverage.com/ticker/EUR/ – The URL of the JSON API you wan’t to subscribe to.
- X-Sd-Token=[YOURTOKEN] – The Streamdata.io token identifying your application.
Of course, if your API requires more URL parameters, you can add them at the end.
If you did not manage to receive push message using provided example, take a look the Streamdata.io Documentation or contact us before going further.
Subscribe to an API in an Android application
First of all, let’s get back to Android basics.
In this life cycle, only two states are visible (or partially visible), it is represented by the dash line box. Moreover, the user can interact with the application in the resumed state only.
For better performances, we strongly recommend to start listening to incoming messages in the onResume() callback and stop doing so in the onPause() callback.
First of all, we will declare three attributes in our base Activity class, corresponding to the three URL elements we have seen in the previous section.
private String streamdataioProxyPrefix = "https://streamdata.motwin.net/";
private String streamdataioAppToken = "YOUR_TOKEN_HERE";
private String myApiUrl = "YOUR_JSON_API_URL";
Now, let’s create two methods connect() and disconnect() to start listening to Streamdata.io proxy messages, and to stop listening to them.
Code: Streamdata.io streaming connection method
/**
* Create the EventSource object & start listening SSE incoming messages
*/private void connect() {
// Create headers: Add the streamdata.io app token
Map<String, String> headers = new HashMap<String, String>();
headers.put("X-Sd-Token", streamdataioAppToken);
// Create the EventSource with API URL & Streamdata.io authentication token
try {
eventSource = new EventSource(new URI(streamdataioProxyPrefix), new URI(myApiUrl), new SSEHandler(), headers);
} catch (URISyntaxException e) {
e.printStackTrace();
}
// Start data receiving
eventSource.connect();
}
- connect() method has to be called from the onResume() callback:
@Override
protected void onResume() {
super.onResume();
// Connection to EventSource
connect();
}
If you copy-paste these methods, Android Studio should not resolve the SSEHandler class but there is no need to worry about it now because we will create this class later.
For further information on the EventSource constructors and parameters, you can take a look at its implementation file EventSource.java.
Code: Streamdata.io streaming disconnection method
/**
* Closes the event source connection and dereference the EventSource object
*/private void disconnect() {
// Disconnect the eventSource Handler
if (eventSource!= null && eventSource.isConnected()) {
try {
eventSource.close();
} catch (Exception e) {
if ( Log.isLoggable(TAG, Log.ERROR)) {
Log.e(TAG, "Error on closing SSE", e);
}
}
}
// Dereferencing variable
eventSource = null;
}
- disconnect() method has to be called from the onPause() callback.
4. Receive messages from Streamdata.io proxy
Your application will need to cache the data received from the Streamdata.io proxy. Because once the initial snapshot is received, the server will only send incremental patches that have to be applied subsequently to the initial set of data.
To do so, create a JsonNode attribute to locally cache your data:
private JsonNode data;
In order to handle message reception you have to create a private class in your activity that implements EventSourceHandler. This way you will be able to override all the message callbacks and define what to do in each case. I will name this class SSEHandler to coincide with the constructor call we wrote in the connect() method, but you are free to name it as you want.
Example of SSE handler class implementation:
private class SSEHandler implements EventSourceHandler {
public SSEHandler() { }
/**
* SSE handler for connection starting
*/ @Override
public void onConnect() {
if (Log.isLoggable(TAG, Log.DEBUG)) {
Log.d(TAG, "SSE Connected");
}
/* Do whatever you like in when the stream gets open */ }
/* SSE incoming message handler */ @Override
public void onMessage(String event, MessageEvent message) throws IOException {
if ("data".equals(event)) {
// SSE message is the first snapshot
data = mapper.readTree(message.data);
} else if ("patch".equals(event)) {
// SSE message is a patch
try {
JsonNode patchNode = mapper.readTree(message.data);
JsonPatch patch = JsonPatch.fromJson(patchNode);
data = patch.apply(data);
} catch (JsonPatchException e) {
e.printStackTrace();
}
} else {
throw new RuntimeException("Unexpected SSE message: " + event);
}
}
/* SSE error Handler */ @Override
public void onError(Throwable t) {
/* Do whatever you like in case of error */ }
/* SSE Handler for connection interruption */ @Override
public void onClosed(boolean willReconnect) {
/* Do whatever you like when the stream gets stopped */ }
}
Full demo: GitHub Client
Now that we have seen the basics of connecting to Streamdata.io from an Android app, let’s create an Android GitHub client app that uses GitHub API through Streamdata.io service to monitor the activity of repositories and receives last commits in real-time.
Overall presentation
The application supports both getting GitHub public data or log in to GitHub to be able to list your private repositories. Then, the app is composed of two activities: the first one provides a repository search field, where you can select repositories you want to watch in the second Activity. You can select up to five repositories to watch at the same time.
In addition, the app saves the list of repositories you have selected in the past.
Pushed commits are sorted by date, the newest are added at the top of the list, so that you can see them appear when you receive a new patch.
The MainActivity does not use Streamdata.io push system to make its search request, only CommitsActivity does. As the goal of this post is not to explain the GitHub Java SDK features, I will not go into details about MainActivity. The only important information is that selected repositories list is passed to CommitsActivity as an ArrayList.
We get the list of repositoriesId passed by MainActivity, using the Intent of the context:
// Get class intent
Intent intent = getIntent();
// Get the array of Repositories-ID as Strings
ArrayList reposIdArray = intent.getStringArrayListExtra("reposId");
For example, in the above MainActivity screenshot, the passed object from MainActivity to CommitsActivity (triggered by the click on “Show Commits” button) is an array of strings:
{
"streamdataio/streamdataio-js",
"streamdataio/streamdataio-js-sdk"
}
Those strings are used by the GitHub API as repository identifiers.
For each repository, we need to call for a distinct URL.
GitHub API URL example that provides commits of the two example repositories
- https://api.github.com/repos/streamdataio/streamdataio-js/commits
- https://api.github.com/repos/streamdataio/streamdataio-js-sdk/commits
Multiple API subscribing management
In order to handle the message from multiple APIs, we need to create an EventSource object and a SSEHandler object for each URL. Moreover, each SSEHandler has to keep its own data to avoid applying a patch on a snapshot that does not match. Thus, we will store the dataset as an attribute of the SSEHandler class.
Additionally, as we want to aggregate commit from every repository to order them by date, we will use a ConcurrentHashMap<EventSource> which is a thread-safe implementation of a HashTable. This is necessary as different EventSource can receive data concurrently.
First of all, we defined a Commit class:
/* ********************************** Class Commit ********************************** */private class Commit {
public Date date;
public String user;
public String comment;
public String uid;
public String repositoryID;
public Commit(Date date, String user, String comment, String uid, String repositoryID) {
this.date = date;
this.user = user;
this.comment = comment;
this.uid = uid;
this.repositoryID = repositoryID;
}
}
We also need to add the concurrent map to store the list of commits.
// Map of EventSource foreach repo: (repositoriesID, EventSource)
private ConcurrentHashMap<String, EventSource> reposEventSources = new ConcurrentHashMap<>();
For simplicity, we decided to create four methods:
- connect(String api)
- connectAll()
- disconnect(String api)
- disconnectAll()
As you can imagine, connectAll() and disconnectAll() methods will merely iterate over reposIdArray and then, call connect(String api) and disconnect(String api) foreach repository identifier.
connect() and connectAll() methods
/**
* Create the EventSource object & start listening SSE incoming messages
*/private void connect(String api) {
// Add the GitHub API token with a URL parameter (only way to authenticate)
myApi = "https://api.github.com/repos/" + api + "/commits?access_token=" + gitHubApiToken;
// Add the Streamdata.io authentication token
Map<String, String> headers = new HashMap<String, String>();
headers.put("X-Sd-Token", streamdataioAppToken);
// Create the EventSource with API URL & Streamdata.io authentication token
try {
SSEHandler sseHandler = new SSEHandler();
/* Create EventSource */ EventSource eventSrc = new EventSource(new URI(streamdataioProxyPrefix), new URI(myApi), sseHandler, headers);
/* Adding EventSource into reposEventSources map */ reposEventSources.put(api, eventSrc);
// Start data receiving from API
eventSrc.connect();
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
/**
* Start listening to every selected repositories commits
*/private void connectAll() {
for (String repo: reposIdArray) {
Log.i("info", "Connecting: " + repo);
connect(repo);
}
}
disconnect() and disconnectAll() methods
/** Closes the event source connection and dereference the EventSource object */private void disconnect(String api) {
// Disconnect the eventSource Handler
if (reposEventSources.containsKey(api)) {
if (reposEventSources.get(api).isConnected()) {
reposEventSources.get(api).close();
reposEventSources.remove(api);
}
}
}
/**
* Closes every open EventSources
*/private void disconnectAll() {
for (Map.Entry<String, EventSource> entry: reposEventSources.entrySet()) {
disconnect(entry.getKey());
Log.i("info", "Disconnecting: " + entry.getKey());
}
}
Handle multiple API messages concurrently
It is time to handle message reception. As a reminder, a message can be of three types:
- initial snapshot (JsonNode)
- json patch (JsonNode)
- Error Message (Throwable)
In the case we receive a snapshot or a patch, we have to rebuild the whole list of Commits and sort it by date before refreshing UI. To do this, we made a procedure named updateCommits().
Now, brace yourself and let’s look at SSEHandler class.
private class SSEHandler implements EventSourceHandler {
private JsonNode ownData;
public SSEHandler() { }
/**
* SSE handler for connection starting
*/ @Override
public void onConnect() {
System.out.println("SSE Connected");
}
/**
* SSE incoming message handler
*
* @param event type of message
* @param message message JSON content
* @throws IOException if JSON syntax is not valid
*/ @Override
public void onMessage(String event, MessageEvent message) throws IOException {
if ("data".equals(event)) {
// SSE message is a snapshot
ownData = mapper.readTree(message.data);
// Update commits array
updateCommits();
} else if ("patch".equals(event)) {
// SSE message is a patch
System.out.println(message.data.toString());
try {
JsonNode patchNode = mapper.readTree(message.data);
JsonPatch patch = JsonPatch.fromJson(patchNode);
ownData = patch.apply(ownData);
// Update commits array
updateCommits();
} catch (JsonPatchException e) {
e.printStackTrace();
}
} else {
Log.d("Debug", "Disconnecting: " + message.toString());
throw new RuntimeException("Wrong SSE message!");
}
}
/**
* SSE error Handler
*/ @Override
public void onError(Throwable t) {
// Network error message...
if (t.toString().contains("java.nio.channels.UnresolvedAddressException")) {
runOnUiThread(new Runnable() {
@Override
public void run() {
Toast.makeText(CommitsActivity.this, "You need to connect to the Internet", Toast.LENGTH_SHORT).show();
}
});
CommitsActivity.this.disconnectAll();
}
}
/**
* SSE Handler for connection interruption
*/ @Override
public void onClosed(boolean willReconnect) {
System.out.println("SSE Closed - reconnect? " + willReconnect);
}
/**
* Return commits of ONE repository, depending on its EventSource, as a JsonNode
* @return
*/ public JsonNode getOwnData() {
return ownData;
}
}
As you can see, the main structure of the class is similar to the previous SSEHandler. Notable differences are:
- The ownData attribute.
- Call to updateCommits() method on every onData() or onPatch() event.
- Catching connectivity error.
Last but not least, the updateCommits() method, which parses ownDatas of every SSEHandler, reconstructs the up-to-date list of Commits, sorts it by date, and then refreshes the UI:
public void updateCommits() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
// Clear commits
commits.clear();
Commit commit = null;
// foreach EventSource Handler ...
for (Map.Entry<String, EventSource> entry: reposEventSources.entrySet()) {
SSEHandler sseHandler = (SSEHandler) entry.getValue().getEventSourceHandler();
JsonNode data = sseHandler.getOwnData();
// Reconstructs commits array from JSON data
for (Iterator iterator = data.iterator(); iterator.hasNext(); ) {
JsonNode commitJson = iterator.next();
try {
sdf.parse(commitJson.path("commit").path("author").path("date").textValue());
commit = new Commit(
sdf.parse(commitJson.path("commit").path("author").path("date").textValue()),
commitJson.path("commit").path("author").path("name").textValue(),
commitJson.path("commit").path("message").textValue(),
commitJson.path("sha").textValue().substring(0, 9),
entry.getKey()
);
} catch (ParseException e) {
e.printStackTrace();
}
//commit.print();
commits.add(commit);
}
}
Collections.sort(commits, new DateComparator());
// Refresh UI
runOnUiThread(new Runnable() {
@Override
public void run() {
adapter.updateData(commits);
adapter.notifyDataSetChanged();
}
});
}
Hoping that this CommitsActivity.java dissection helped you to understand how to set up a multiple API push handler.
I intentionally skipped the less relevant parts of this application, otherwise, none of you would have read it until the end.
You can see for yourself all the Java Android UI & XML layouts (res/ directory), the GitHub Java SDK (used in MainActivity.java), or even the search history file storing (SearchHistoryManager.java).
You can download all the app source code on its GitHub repository.