Wednesday, July 8, 2015

Using Akka Http to perform a Rest call and deserialise json

I have been playing with Akka Streams and Akka Http to create a flow to get some data from a public Rest endpoint and deserialize the json using Json4s.
Since there are not that many examples yet, and documentation only has a few examples, I'm sharing my little app here.

Default Akka Http only supports Spray Json, but fortunately Heiko already created a small akka-http-json library for Json4s or Play Json.

Here's is small code sample on how to create a Akka Streams Flow and run it. This was just to test the calling of the Rest endpoint and deserialise the result json into a case class. Next step is then to extend the flow to do something useful with the retrieved data. I'll put putting it into a time series database called Prometheus, and maybe also into Mongo.

package enphase

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, Uri}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.{DefaultFormats, Formats, Serialization, jackson}

import scala.concurrent.{Await, Future}

/**
 * Enphase API Client which gets Enphase data and put those into InfluxDB
 *
 * - Start with HTTP GET request to Enphase API.
 * - Transform response into json
 * - Transform json into time series data
 * - Put time series data into InfluxDB using HTTP POST request
 */
object Client extends App with Json4sSupport {

  val systemId = 999999 // replace with your system id
  val apiKey   = "replace-with-your-api-key"
  val userId   = "replace-with-your-user-id"

  val systemSummaryUrl = s"""/api/v2/systems/$systemId/summary?key=$apiKey&user_id=$userId"""
  println(s"Getting from: $systemSummaryUrl")

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val formats: Formats = DefaultFormats
  implicit val jacksonSerialization: Serialization = jackson.Serialization
  import concurrent.ExecutionContext.Implicits.global

  val httpClient = Http().outgoingConnectionTls(host = "api.enphaseenergy.com")

  private val flow: Future[SystemSummary] = Source.single(HttpRequest(uri = Uri(systemSummaryUrl)))
      .via(httpClient)
      .mapAsync(1)(response => Unmarshal(response.entity).to[SystemSummary])
      .runWith(Sink.head)

  import concurrent.duration._

  val start = System.currentTimeMillis()
  val result = Await.result(flow, 15 seconds)
  val end = System.currentTimeMillis()

  println(s"Result in ${end-start} millis: $result")
}

/**
 * Entity for system summary json:
 * {
 * "current_power": 3322,
 * "energy_lifetime": 19050353,
 * "energy_today": 25639,
 * "last_report_at": 1380632700,
 * "modules": 31,
 * "operational_at": 1201362300,
 * "size_w": 5250,
 * "source": "microinverters",
 * "status": "normal",
 * "summary_date": "2014-01-06",
 * "system_id": 123
 * }
 */
case class SystemSummary(system_id: Int, summary_date: String, status: String, source: String,
                          size_w: Int, operational_at: Long, modules: Int, last_report_at: Long,
                          energy_today: Int, energy_lifetime: Long, current_power: Int)


At first I could not get Heiko's Unmarchallers working and I wrote my own Unmarshaller which is not that difficult looking at some other implementations. The problem was a very vage error saying something was missing, but not exactly what. Today I figured out, it was just missing one of the required implicit arguments, the Json4s Serializers, and then it all worked nicely.

But here's is how to implement a custom Unmarshaller which unmarshalls a HttpResponse instance:

  implicit def responseUnmarshaller[T : Manifest]: FromResponseUnmarshaller[T] = {
    import concurrent.duration._
    import enphase.json.Json4sProtocol._
    import org.json4s.jackson.Serialization._

    new Unmarshaller[HttpResponse, T] {
      override def apply(resp: HttpResponse)(implicit ec: ExecutionContext): Future[T] = {
        resp.entity.withContentType(ContentTypes.`application/json`)
            .toStrict(1 second)
            .map(_.data)
            .map(_.decodeString(resp.entity.contentType.charset.value))
            .map(json => { println(s"Deserialized to: $json"); json })
            .map(json => read[T](json))
      }
    }
  }

The only change in the application needed to use this unmarshaller is to replace the 'mapAsync' line with:

    .mapAsync(1)(Unmarshal(_).to[SystemSummary])

The project build.sbt contains these dependencies:

scalaVersion := "2.11.6"

libraryDependencies ++= Seq(
  "com.typesafe.akka" % "akka-http-experimental_2.11" % "1.0-RC4",
  "de.heikoseeberger" %% "akka-http-json4s" % "0.9.1",
  "org.json4s" %% "json4s-jackson" % "3.2.11",
  "org.scalatest" % "scalatest_2.11" % "2.2.4" % "test"
)

Happy Akka-ing.

Joost



Saturday, July 4, 2015

Selecting a time database

For a new play project, I want to use a time series database to visualise gathered data over time and time be able to use functions like sum/div/etc on the time series data. I thought I could just pick a database and start coding on my project, but instead I ended up spending several evenings trying out several databases to find the right one for my needs.

1) InfluxDB

InfluxDB was my first choice. The installation is very easy and it has a simple CLI, a bit like Mongo.
Data can be injected via an HTTP Put request like 'http://<host>:8086/write?db=$database&precision=ms'. If you pass a timestamp, InfluxDB assumes a nano-timestamp. If you use a different timestamp, for example the system time which is in milliseconds, you need to tell InfluxDB via the 'precision' parameter. If no timestamp is provided, InfluxDB will use the timestamp of the request.
Although InfluxDB is still a very young db, there are already many client libraries available in many languages among which Java and Scala.
The data format is very easy: <measurement>[,<tag-key>=<tag-value>...] <field-key>=<field-value>[,<field2-key>=<field2-value>...] [unix-nano-timestamp]
Measurement is the name of the time series. At least one field key-value is required, others are optional. Tags are also optional. InfluxDB automatically creates indexes for these tag-keys which makes querying very powerful.
Installation is very easy, but even simpler via one of the available Docker containers. I used tutum/influxdb.

Disadvantages:
Version 0.9 is a complete rebuild and incompatible with 0.8. Which means there are not that many query functions available, as compared to Grafite, and not all 3rd party tools, like Grafana, work fully with the latest InfluxDB version. There are no functions yet to combine multiple series and therefore I am unfortunately not able to use InfluxDB for me project yet.
Although there is some basic support for InfluxDB v0.9 in Grafana already, it was still a bit tricky to use it. Even though I had configured Grafana correctly, it didn't show any graph yet, and then suddenly it did and I had no idea what I did any different.

2) Grafite

So, fallback to Grafite then? The reason I did not start out with Grafite is because the installation is just too damn hard to get it right. Grafite consists of 3 parts: Carbon (for capturing data), Whisper (a fixed size db) and Grafite Web (a simple web interface). There are no downloadable installations so those applications have to be build for your environment. 
In the past I have looked multiple times for Vagrant or Docker containers which setup a complete, and up-2-date (!!),  Grafite environment for me, but I alway gave up after spending another couple of hours to get it working without result. Recently I came across the Kamon Grafite-Grafana Docker container, which was the first one which worked right out of the box.

The main disadvantage is the fixed size database. You have to determine up front how much detail you want to keep for how long. For example, per second for 2 week, per hour for a month, per day for a year. For now I'd like to keep all data so I do not want to have to choose when data gets aggregated. Also, because the whole Grafite setup is so complex, I do not want to have to change the Grafite configuration for my project.

The biggest advantage is the extended support by Grafana, which is logical since Grafana was created for Grafite in the first place. Performing functions over multiple series is child's play with Grafite+Grafana.

Since Grafite is ofter used with Statsd, I did a small test with Statsd + Grafite, but Statsd is not suited for what I want to do since Statsd aggregates and summarises metrics before sending it to Grafite. Where, in a similar test application for InfluxDB, I was able to get a graph with multiple sinuses, this was impossible with Statsd.
An alternative would be to store the metrics in Grafite directly, but since Grafites complex installation and fixed size database, I would rather use something else.

So ... then what else is there? Are there other time series databases ... ? Yes there are! Plenty !! See this list of time series databases.
I looked at some of the databases in the list and eventually chose to try out Prometheus since it looked most promising since it has it's own Grafana-like dashboard and it has alerting which none of the other databases has. So ...

3) Prometheus

I chose to try out Prometheus since it has it's own Grafana-like dashboard and it has alerting which none of the other databases has. The alerting allows you to receive notifications of series going above a threshold, or when an event has not occurred for x-time, or some value rises, or falls, too quickly (a trend).
All Prometheus parts are available as Docker containers which makes it very easy to get started.
Prometheus has one characteristic which makes it very different from other (time series) databases: Prometheus uses a pull-mechanism instead of a push-mechnisme like all other databases, which means Prometheus wants to gather all data itself by calling on Rest endpoints of applications. Fortunately, they do provide a Push Gateway which makes it possible to push your own data again. Prometheus will then pull it from the Push Gateway. To use the Docker containers for both Prometheus and the PushGateway, you must link the PushGateway to the Prometheus container so Prometheus can reach the PushGateway Rest endpoint.
Prometheus default runs on port 9090. The Push Gateway runs on port 9091.

Prometheus as a some UI to display it's configuration and to do some queries either visualised in a table or a graph. The Grafana-like dashboard is call PromDash, which runs default on port 3000.

There is a Java client available for using the PushGateway, but it works a bit strange. The construction of a Registry and Gauge for each metric value seems overkill, but trying to reuse those objects resulted in an error. Probably because the whole registry gets pushed to the PushGateway, since the Java client is actually ment to be used for batch processing. Maybe it's easier to just push the value via HTTP Put yourself, but I haven't tried it out yet.

As with InfluxDB and Statsd/Grafite, I created another test application which produces sinus values. Running multiple created multiple series. Prometheus has functions to sum or div multiple values. Just make sure the time series have the same name, but instead you change the job- or instance name. PromDash displayed a graph containing both sinus values and the sum of both values, which is exactly what I want for my project.

Conclusion

So, after spending 4 evenings playing with these databases, I'm going forward with Prometheus. At least, until InfluxDB also has the functions I need and Grafana supports them also, because I still do like InfluxeDB's data structure and ease of installation and use.
So, I think I will also use another database besides Prometheus to store each and every metric value so I can easily replay all data when a more mature InfluxDB is available. Although Mongo is not suitable for write-heavy applications, I'll probably use it anyway since my application does not need to write many data and it is just easy to use.

More on my mysterious project in next posts ....