Wednesday, July 8, 2015

Using Akka Http to perform a Rest call and deserialise json

I have been playing with Akka Streams and Akka Http to create a flow to get some data from a public Rest endpoint and deserialize the json using Json4s.
Since there are not that many examples yet, and documentation only has a few examples, I'm sharing my little app here.

Default Akka Http only supports Spray Json, but fortunately Heiko already created a small akka-http-json library for Json4s or Play Json.

Here's is small code sample on how to create a Akka Streams Flow and run it. This was just to test the calling of the Rest endpoint and deserialise the result json into a case class. Next step is then to extend the flow to do something useful with the retrieved data. I'll put putting it into a time series database called Prometheus, and maybe also into Mongo.

package enphase

import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, Uri}
import akka.http.scaladsl.unmarshalling.Unmarshal
import{Sink, Source}
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.{DefaultFormats, Formats, Serialization, jackson}

import scala.concurrent.{Await, Future}

 * Enphase API Client which gets Enphase data and put those into InfluxDB
 * - Start with HTTP GET request to Enphase API.
 * - Transform response into json
 * - Transform json into time series data
 * - Put time series data into InfluxDB using HTTP POST request
object Client extends App with Json4sSupport {

  val systemId = 999999 // replace with your system id
  val apiKey   = "replace-with-your-api-key"
  val userId   = "replace-with-your-user-id"

  val systemSummaryUrl = s"""/api/v2/systems/$systemId/summary?key=$apiKey&user_id=$userId"""
  println(s"Getting from: $systemSummaryUrl")

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val formats: Formats = DefaultFormats
  implicit val jacksonSerialization: Serialization = jackson.Serialization

  val httpClient = Http().outgoingConnectionTls(host = "")

  private val flow: Future[SystemSummary] = Source.single(HttpRequest(uri = Uri(systemSummaryUrl)))
      .mapAsync(1)(response => Unmarshal(response.entity).to[SystemSummary])

  import concurrent.duration._

  val start = System.currentTimeMillis()
  val result = Await.result(flow, 15 seconds)
  val end = System.currentTimeMillis()

  println(s"Result in ${end-start} millis: $result")

 * Entity for system summary json:
 * {
 * "current_power": 3322,
 * "energy_lifetime": 19050353,
 * "energy_today": 25639,
 * "last_report_at": 1380632700,
 * "modules": 31,
 * "operational_at": 1201362300,
 * "size_w": 5250,
 * "source": "microinverters",
 * "status": "normal",
 * "summary_date": "2014-01-06",
 * "system_id": 123
 * }
case class SystemSummary(system_id: Int, summary_date: String, status: String, source: String,
                          size_w: Int, operational_at: Long, modules: Int, last_report_at: Long,
                          energy_today: Int, energy_lifetime: Long, current_power: Int)

At first I could not get Heiko's Unmarchallers working and I wrote my own Unmarshaller which is not that difficult looking at some other implementations. The problem was a very vage error saying something was missing, but not exactly what. Today I figured out, it was just missing one of the required implicit arguments, the Json4s Serializers, and then it all worked nicely.

But here's is how to implement a custom Unmarshaller which unmarshalls a HttpResponse instance:

  implicit def responseUnmarshaller[T : Manifest]: FromResponseUnmarshaller[T] = {
    import concurrent.duration._
    import enphase.json.Json4sProtocol._
    import org.json4s.jackson.Serialization._

    new Unmarshaller[HttpResponse, T] {
      override def apply(resp: HttpResponse)(implicit ec: ExecutionContext): Future[T] = {
            .toStrict(1 second)
            .map(json => { println(s"Deserialized to: $json"); json })
            .map(json => read[T](json))

The only change in the application needed to use this unmarshaller is to replace the 'mapAsync' line with:


The project build.sbt contains these dependencies:

scalaVersion := "2.11.6"

libraryDependencies ++= Seq(
  "com.typesafe.akka" % "akka-http-experimental_2.11" % "1.0-RC4",
  "de.heikoseeberger" %% "akka-http-json4s" % "0.9.1",
  "org.json4s" %% "json4s-jackson" % "3.2.11",
  "org.scalatest" % "scalatest_2.11" % "2.2.4" % "test"

Happy Akka-ing.


No comments: