Wednesday, July 8, 2015

Using Akka Http to perform a Rest call and deserialise json

I have been playing with Akka Streams and Akka Http to create a flow to get some data from a public Rest endpoint and deserialize the json using Json4s.
Since there are not that many examples yet, and documentation only has a few examples, I'm sharing my little app here.

Default Akka Http only supports Spray Json, but fortunately Heiko already created a small akka-http-json library for Json4s or Play Json.

Here's is small code sample on how to create a Akka Streams Flow and run it. This was just to test the calling of the Rest endpoint and deserialise the result json into a case class. Next step is then to extend the flow to do something useful with the retrieved data. I'll put putting it into a time series database called Prometheus, and maybe also into Mongo.

package enphase

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, Uri}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.{DefaultFormats, Formats, Serialization, jackson}

import scala.concurrent.{Await, Future}

/**
 * Enphase API Client which gets Enphase data and put those into InfluxDB
 *
 * - Start with HTTP GET request to Enphase API.
 * - Transform response into json
 * - Transform json into time series data
 * - Put time series data into InfluxDB using HTTP POST request
 */
object Client extends App with Json4sSupport {

  val systemId = 999999 // replace with your system id
  val apiKey   = "replace-with-your-api-key"
  val userId   = "replace-with-your-user-id"

  val systemSummaryUrl = s"""/api/v2/systems/$systemId/summary?key=$apiKey&user_id=$userId"""
  println(s"Getting from: $systemSummaryUrl")

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val formats: Formats = DefaultFormats
  implicit val jacksonSerialization: Serialization = jackson.Serialization
  import concurrent.ExecutionContext.Implicits.global

  val httpClient = Http().outgoingConnectionTls(host = "api.enphaseenergy.com")

  private val flow: Future[SystemSummary] = Source.single(HttpRequest(uri = Uri(systemSummaryUrl)))
      .via(httpClient)
      .mapAsync(1)(response => Unmarshal(response.entity).to[SystemSummary])
      .runWith(Sink.head)

  import concurrent.duration._

  val start = System.currentTimeMillis()
  val result = Await.result(flow, 15 seconds)
  val end = System.currentTimeMillis()

  println(s"Result in ${end-start} millis: $result")
}

/**
 * Entity for system summary json:
 * {
 * "current_power": 3322,
 * "energy_lifetime": 19050353,
 * "energy_today": 25639,
 * "last_report_at": 1380632700,
 * "modules": 31,
 * "operational_at": 1201362300,
 * "size_w": 5250,
 * "source": "microinverters",
 * "status": "normal",
 * "summary_date": "2014-01-06",
 * "system_id": 123
 * }
 */
case class SystemSummary(system_id: Int, summary_date: String, status: String, source: String,
                          size_w: Int, operational_at: Long, modules: Int, last_report_at: Long,
                          energy_today: Int, energy_lifetime: Long, current_power: Int)


At first I could not get Heiko's Unmarchallers working and I wrote my own Unmarshaller which is not that difficult looking at some other implementations. The problem was a very vage error saying something was missing, but not exactly what. Today I figured out, it was just missing one of the required implicit arguments, the Json4s Serializers, and then it all worked nicely.

But here's is how to implement a custom Unmarshaller which unmarshalls a HttpResponse instance:

  implicit def responseUnmarshaller[T : Manifest]: FromResponseUnmarshaller[T] = {
    import concurrent.duration._
    import enphase.json.Json4sProtocol._
    import org.json4s.jackson.Serialization._

    new Unmarshaller[HttpResponse, T] {
      override def apply(resp: HttpResponse)(implicit ec: ExecutionContext): Future[T] = {
        resp.entity.withContentType(ContentTypes.`application/json`)
            .toStrict(1 second)
            .map(_.data)
            .map(_.decodeString(resp.entity.contentType.charset.value))
            .map(json => { println(s"Deserialized to: $json"); json })
            .map(json => read[T](json))
      }
    }
  }

The only change in the application needed to use this unmarshaller is to replace the 'mapAsync' line with:

    .mapAsync(1)(Unmarshal(_).to[SystemSummary])

The project build.sbt contains these dependencies:

scalaVersion := "2.11.6"

libraryDependencies ++= Seq(
  "com.typesafe.akka" % "akka-http-experimental_2.11" % "1.0-RC4",
  "de.heikoseeberger" %% "akka-http-json4s" % "0.9.1",
  "org.json4s" %% "json4s-jackson" % "3.2.11",
  "org.scalatest" % "scalatest_2.11" % "2.2.4" % "test"
)

Happy Akka-ing.

Joost



Saturday, July 4, 2015

Selecting a time database

For a new play project, I want to use a time series database to visualise gathered data over time and time be able to use functions like sum/div/etc on the time series data. I thought I could just pick a database and start coding on my project, but instead I ended up spending several evenings trying out several databases to find the right one for my needs.

1) InfluxDB

InfluxDB was my first choice. The installation is very easy and it has a simple CLI, a bit like Mongo.
Data can be injected via an HTTP Put request like 'http://<host>:8086/write?db=$database&precision=ms'. If you pass a timestamp, InfluxDB assumes a nano-timestamp. If you use a different timestamp, for example the system time which is in milliseconds, you need to tell InfluxDB via the 'precision' parameter. If no timestamp is provided, InfluxDB will use the timestamp of the request.
Although InfluxDB is still a very young db, there are already many client libraries available in many languages among which Java and Scala.
The data format is very easy: <measurement>[,<tag-key>=<tag-value>...] <field-key>=<field-value>[,<field2-key>=<field2-value>...] [unix-nano-timestamp]
Measurement is the name of the time series. At least one field key-value is required, others are optional. Tags are also optional. InfluxDB automatically creates indexes for these tag-keys which makes querying very powerful.
Installation is very easy, but even simpler via one of the available Docker containers. I used tutum/influxdb.

Disadvantages:
Version 0.9 is a complete rebuild and incompatible with 0.8. Which means there are not that many query functions available, as compared to Grafite, and not all 3rd party tools, like Grafana, work fully with the latest InfluxDB version. There are no functions yet to combine multiple series and therefore I am unfortunately not able to use InfluxDB for me project yet.
Although there is some basic support for InfluxDB v0.9 in Grafana already, it was still a bit tricky to use it. Even though I had configured Grafana correctly, it didn't show any graph yet, and then suddenly it did and I had no idea what I did any different.

2) Grafite

So, fallback to Grafite then? The reason I did not start out with Grafite is because the installation is just too damn hard to get it right. Grafite consists of 3 parts: Carbon (for capturing data), Whisper (a fixed size db) and Grafite Web (a simple web interface). There are no downloadable installations so those applications have to be build for your environment. 
In the past I have looked multiple times for Vagrant or Docker containers which setup a complete, and up-2-date (!!),  Grafite environment for me, but I alway gave up after spending another couple of hours to get it working without result. Recently I came across the Kamon Grafite-Grafana Docker container, which was the first one which worked right out of the box.

The main disadvantage is the fixed size database. You have to determine up front how much detail you want to keep for how long. For example, per second for 2 week, per hour for a month, per day for a year. For now I'd like to keep all data so I do not want to have to choose when data gets aggregated. Also, because the whole Grafite setup is so complex, I do not want to have to change the Grafite configuration for my project.

The biggest advantage is the extended support by Grafana, which is logical since Grafana was created for Grafite in the first place. Performing functions over multiple series is child's play with Grafite+Grafana.

Since Grafite is ofter used with Statsd, I did a small test with Statsd + Grafite, but Statsd is not suited for what I want to do since Statsd aggregates and summarises metrics before sending it to Grafite. Where, in a similar test application for InfluxDB, I was able to get a graph with multiple sinuses, this was impossible with Statsd.
An alternative would be to store the metrics in Grafite directly, but since Grafites complex installation and fixed size database, I would rather use something else.

So ... then what else is there? Are there other time series databases ... ? Yes there are! Plenty !! See this list of time series databases.
I looked at some of the databases in the list and eventually chose to try out Prometheus since it looked most promising since it has it's own Grafana-like dashboard and it has alerting which none of the other databases has. So ...

3) Prometheus

I chose to try out Prometheus since it has it's own Grafana-like dashboard and it has alerting which none of the other databases has. The alerting allows you to receive notifications of series going above a threshold, or when an event has not occurred for x-time, or some value rises, or falls, too quickly (a trend).
All Prometheus parts are available as Docker containers which makes it very easy to get started.
Prometheus has one characteristic which makes it very different from other (time series) databases: Prometheus uses a pull-mechanism instead of a push-mechnisme like all other databases, which means Prometheus wants to gather all data itself by calling on Rest endpoints of applications. Fortunately, they do provide a Push Gateway which makes it possible to push your own data again. Prometheus will then pull it from the Push Gateway. To use the Docker containers for both Prometheus and the PushGateway, you must link the PushGateway to the Prometheus container so Prometheus can reach the PushGateway Rest endpoint.
Prometheus default runs on port 9090. The Push Gateway runs on port 9091.

Prometheus as a some UI to display it's configuration and to do some queries either visualised in a table or a graph. The Grafana-like dashboard is call PromDash, which runs default on port 3000.

There is a Java client available for using the PushGateway, but it works a bit strange. The construction of a Registry and Gauge for each metric value seems overkill, but trying to reuse those objects resulted in an error. Probably because the whole registry gets pushed to the PushGateway, since the Java client is actually ment to be used for batch processing. Maybe it's easier to just push the value via HTTP Put yourself, but I haven't tried it out yet.

As with InfluxDB and Statsd/Grafite, I created another test application which produces sinus values. Running multiple created multiple series. Prometheus has functions to sum or div multiple values. Just make sure the time series have the same name, but instead you change the job- or instance name. PromDash displayed a graph containing both sinus values and the sum of both values, which is exactly what I want for my project.

Conclusion

So, after spending 4 evenings playing with these databases, I'm going forward with Prometheus. At least, until InfluxDB also has the functions I need and Grafana supports them also, because I still do like InfluxeDB's data structure and ease of installation and use.
So, I think I will also use another database besides Prometheus to store each and every metric value so I can easily replay all data when a more mature InfluxDB is available. Although Mongo is not suitable for write-heavy applications, I'll probably use it anyway since my application does not need to write many data and it is just easy to use.

More on my mysterious project in next posts ....

Saturday, August 9, 2014

Preventing NPE when creation Option

Sometimes you don't know whether values are present and you want to use an Option to represent that posibility. While Scala API's often support returning an Option, this is not so when integrating with Java. You might want to get a value somewhere from an object hierarchy like this:
user.getAddress().getStreet()
Since you might expect a value is not present, you want to wrap this in an Option. However, if either 'user' or 'address' is null, Option will throw a NPE.

To guard against this, you want to catch any NPE and return a normal None. However, the Option trait is sealed so it is not possible to extend it. Here is a solution how it can be done by using an object with an 'apply' method. This SafeOption returns a normal Option (Some or None), but catches any NPE and returns a None in that case.
/**
 * A 'safe' type of Option which catches any exception, so also NPE's
 * when creating the option.
 * So 'SafeOption(null.name)' will return None.
 */
object SafeOption {

  def apply[A](value: => A): Option[A] = try {
    Option(value)
  } catch {
    case _: NullPointerException => None
  }
}
Use the SafeOption like this, not much different from creating a normal Option:
SafeOption(user.getAddress().getStreet())
Note: use this only in case when you don't care about the null's and a None is a valid option for you in those cases.

Another note: It would be possible for SafeOption to catch any kind of Exception or Throwable instead of only a NullPointerException, but this would totally depend on you application whether that is a valid solution or not. I just choose to only catch NullPointerException so other kinds of exceptions can still be handled differently by the application.

Happy Optioning ;-)

Wednesday, July 30, 2014

Running Jetty as an application for fast starts and easy debugging

Here a simple utility that will safe you a lot of time doing restart or redeployments of your webapp in Jetty. It also makes debugging a lot easier. And the integration of tools like JRebel is now a real breeze.
Before I used the Jetty Maven plugin, but that always recompiled all code and since I have mixed in Scala with Java code, compiling has not become any faster.

Here is the code of a simple App which embeds Jetty and allows you run or debug your webapp from any IDE. The code is in Scala (of course :-)), but is easy translatable to Java.

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.webapp.WebAppContext

/**
 * App to run Jetty for testing.
 */
object JettyApp extends App {

  val server = new Server(8080)

  Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
    override def run(): Unit = server.stop
  }))

  val context = new WebAppContext()
  context.setDescriptor(context + "/WEB-INF/web.xml")
  context.setResourceBase("src/main/webapp")
  context.setContextPath("/")
  context.setParentLoaderPriority(true)

  server.setHandler(context)

  server.start()
  server.join()
}


Put this in your src/test/[scala|java] folder and the server will use the resources and classes from both the main and test locations.

The only dependency you need to add to you project is org.eclipse.jetty:jetty-webapp:9.2.2.v20140723:test.

The shutdown hook might not be necessary, but I threw it in anyway.

Happy Jetty-ing. ;-)

Thursday, July 10, 2014

Java8 fix for jaxws-maven-plugin

Today I can across an issue while trying to move a project to Java 8. The jaxws-maven-plugin we use to generate code from WSDL files. With Java8 this plugin caused this issue:
Exception in thread "main" java.lang.ClassCastException: java.lang.AssertionError cannot be cast to java.lang.Exception
  at org.jvnet.jax_ws_commons.jaxws.Invoker.main(Invoker.java:87)

This was a known issue already, but not fixed by anyone yet. So I decided to fix it myself.
The fix was rather easy: not case the exception anymore and update the maven-plugin-plugin dependency.

Then in our project, besides using the custom build jaxws-maven-plugin, I also had to update the jaxws-rt dependency to the latest version (2.2.10-b140319.1121) because an older version caused a NPE in AbstractJaxwsMojo.isArgSupported(..). I tried to remove this dependency completely because the JDK also has contains javax.xml.ws packages, but the AbstractJaxwsMojo does not support that (yet).

Since the jaxws-maven-plugin is still in a Subversion repo, I created a new Git repo with my fix which is available at https://bitbucket.org/diversit/jaxws-maven-plugin.

References:
Jaxws-common project home : https://jax-ws-commons.java.net/
Issue JAX_WS_COMMONS-129 : https://java.net/jira/browse/JAX_WS_COMMONS-129
Jaxwx-commons source code : https://java.net/projects/jax-ws-commons/sources/svn/show

Friday, June 6, 2014

Loading dynamic Angular content via JQuery

Sometimes you cannot start a project from scratch. Sometimes you're just stuck with crap created by others. And often you want to improve that.

Say, you're stuck with a web application full with JQuery (created in some not to be named country in South Asia starting with 'I' and ending with 'ndia') and you are really trying to make something better of it. You don't understand anything of this Indian JQuery magic, so you gradually replace it with some clear, tested! and short Angular code.

You created a nice Angular component, added it to your application, but somehow it works in one page but not on another. After some debugging you notice that when the component doesn't work, it is in some html fragment which is dynamically loaded using 'ajaxSubmit' function from JQuery's Form plugin. This dynamic loading is outside of Angular's scope, so Angular does not know this is happening and therefore cannot do any Angular stuff on the new fragment.

The trick to solve this, is to 'compile' the new html fragment using Angular and then insert it into the DOM. This gives Angular the opportunity to activate any Angular components in the fragment. Now, the big question is, how to do this 'compiling' ? Let's go through it step by step.

Before

This was how it was before
$(this).ajaxSubmit({target:'#body',url:'/my/mvc/url'});
This replaced the content of the html tag with id 'body' (so not the contents '<body>' tag itself!) with the html returned by the url '/my/mvc/url'.

Calling an Angular function from plain JavaScript

Somehow the returned html fragment must be compiled, so we need a function which call's Angular's compiler and pass the new html. But, JQuery cannot access Angular's providers or services, so the function must be in Angular. For JQuery to be able to access the function, you must put it somewhere where JQuery can reach it. At first I put the function on Angular's $rootScope like this:
app.run(function($rootScope) {

    $rootScope.refresh = function() {
        console.log('refreshing body');
    }
});
You can then reach this function by getting the rootScope like this:
var rootScope = angular.element(document).scope();
rootScope.refresh();
You can even get the scope of an element by adding a find('') before the 'scope()':
var elementScope = angular.element(document).find('elemId').scope();
elementScope.refresh();
This also works because an Angular Scope inherit from it's parent Scope. The 'app.run' function is called when the page is loaded so it creates the function on the object at that time.

I did not like this solution, because you have to get the scope first before you're able to call the wanted function. So, I opted for another solution to put the function on the Window object so the function would be globally available. In Angular you can just add the $window argument to a function and then create a new function on that object which can then be called from anywhere in JavaScript.
app.run(function($window) {

    $window.refresh = function() {
        console.log('refreshing body');
    }
});

Compile the new html fragment

Angular has a $compile provider with which you can compile the html fragment.
Using it is as easy as adding the '$compile' argument to the run function. The html can be passed as an argument and the return value is the compiled html which you can then put into the dom.

Adding it to the dom

Somehow adding the html to the dom correctly wasn't very straight forward. An element supports both a 'replaceWith' and a 'html' method to modify is contents, but using:
elem.replaceWith( $compile( newHtml )(elemScope))
displayed the html wrong.
elem.html( $compile( newHtml )(elemScope))
This did not work at all.
Finally:
$(target)['html']( $compile( newHtml )(targetScope));
this seemed to work fine.
For me, it all looked like the same code, just written differently, but apparently it makes a difference.

Calling compile function from ajaxSubmit

Initially, the 'ajaxSubmit' call would 1) call the url and 2) replace the content of the provided target with the new html. Because we have to compile the html before it can be injected, a success handler is required to capture the retrieved data. Then it can be compiled and added to the dom.
$(this).ajaxSubmit({
  // no target! Angularfy function will put code into dom.
  url:'&lt;@core.basePath/&gt;my/mvc/url',
  success: function(data) {
    // call the compile function and add compiled html to dom.
  }
});

The full solution

Putting everything together, this is the solution I ended up with. The 'angularfy' function compiles the html and adds it to the provided target.
app.run(function($window, $compile) {

    /*
     * Function to 'Angular-fy' dynamically loaded content
     * by JQuery. This compiles the new html code and injects it
     * into the DOM so Angular 'knows' about the new code.
     */
    $window.angularfy = function(target, newHtml) {
        // must use JQuery to query for element by id.
        // must wrap as angular element otherwise 'scope' function is not defined.
        var targetScope = angular.element($(target)).scope();

//        elem.replaceWith( $compile( newHtml )(elemScope)); Displays html wrong.
//        elem.html( $compile( newHtml )(elemScope)); Does not work.
        $(target)['html']( $compile( newHtml )(targetScope)); // Does work!
        targetScope.$apply();
    }
});
In the web page, the ajaxSubmit calls the 'angularfy' function from the success handler.
$(this).ajaxSubmit({
  // no target! Angularfy function will put code into dom.
  url:'&lt;@core.basePath/&gt;my/mvc/url',
  success: function(data) {
    angularfy('#body', data); // to notify Angular of new html code in dom.
  }
});

Happy Angular-JQuery-ing ! ;-)

Btw: I am a hardcore Java/Scala backender and do not pretend to be a frontend/Angular guru. This solution might not be the nicest one, but for me, at this moment, it works. If you know any improvements, let me know.

Wednesday, May 28, 2014

Java 8's Optional frustrations

At my current project we started working with Java 8 this week. Mainly because of the new Streaming Api so we could do some functional coding in Java and get rid of the Google Guava dependency (which I kind of introduced some weeks ago, because, well, I just hate doing for-loops and null-checks. Once you’ve done Scala you don’t want to go back ;-).
Being used to work with Scala’s Option, I eagerly got started to remove all null-checks and start working with Java’s Optional. The Optional is a nice addition but after working with it for a few days I feel it is dead-wrong in 2 basic and primary methods:

1. To create an Optional, you use Optional.of(..). I threw in a value which could be null and immediately got a NPE. Apparently there is an Optional.ofNullable() method. WTF!! I cannot think of any reason why the ‘of’ method should not accept null’s. Why oh why would you otherwise want to use an Optional if you know the value is never null ! Having to use ‘ofNullable’ is just unnecessary typing and is polluting the code. The ‘of’ method is just useless.

2. Working with requests and contexts, I sometimes want to get one attribute, but, if it’s not set, I want to fall back to another attribute which also might not be set. So, I want to create an Optional and use ‘orElse’ to use another optional value. However, in Java the Optional.orElse returns ’T’ and not an ‘Optional<T>’. In contrast to Scala's Option which ‘orElse’ does return ‘Option[T]’. The ‘Optional.orElse’ method is really a ‘getOrElse’ returning the value of the Optional or an alternative. A real ‘orElse’ function is just missing. I don’t want the value T, I want to keep working with the Optional in a functional way. Now, to get around this you have to do something like this:
Optional.ofNullable(Optional.ofNullable(getValueA()).orElse(getValueB())). How is that for readability? 

Besides these clear mistakes (IMHO) I regret the Optional has not found it’s way into some other classes, like a Map.getOptional method for example, to be able to get Optional instances right away instead of having to do the wrapping yourself.