DiversIT Europe
Previous Home Next Jul 8, 2015 Using Akka Http to perform a Rest call and deserialise json Tags: akka akka-http akka-streams json

I have been playing with Akka Streams and Akka Http to create a flow to get some data from a public Rest endpoint and deserialize the json using Json4s.
Since there are not that many examples yet, and documentation only has a few examples, I’m sharing my little app here.

Default Akka Http only supports Spray Json, but fortunately Heiko already created a small akka-http-json library for Json4s or Play Json.

Here’s is small code sample on how to create a Akka Streams Flow and run it. This was just to test the calling of the Rest endpoint and deserialise the result json into a case class. Next step is then to extend the flow to do something useful with the retrieved data. I’ll put putting it into a time series database called Prometheus, and maybe also into Mongo.

package enphase  

import akka.actor.ActorSystem  
import akka.http.scaladsl.Http  
import akka.http.scaladsl.model.{HttpRequest, Uri}  
import akka.http.scaladsl.unmarshalling.Unmarshal  
import akka.stream.ActorMaterializer  
import akka.stream.scaladsl.{Sink, Source}  
import de.heikoseeberger.akkahttpjson4s.Json4sSupport  
import org.json4s.{DefaultFormats, Formats, Serialization, jackson}  

import scala.concurrent.{Await, Future}  

/**  
 * Enphase API Client which gets Enphase data and put those into InfluxDB  
 *  
 * - Start with HTTP GET request to Enphase API.  
 * - Transform response into json  
 * - Transform json into time series data  
 * - Put time series data into InfluxDB using HTTP POST request  
 */  
object Client extends App with Json4sSupport {  

  val systemId = 999999 // replace with your system id  
  val apiKey   = "replace-with-your-api-key"  
  val userId   = "replace-with-your-user-id"  

  val systemSummaryUrl = s"""/api/v2/systems/$systemId/summary?key=$apiKey&user_id=$userId"""  
  println(s"Getting from: $systemSummaryUrl")  

  implicit val system = ActorSystem()  
  implicit val materializer = ActorMaterializer()  
  implicit val formats: Formats = DefaultFormats  
  implicit val jacksonSerialization: Serialization = jackson.Serialization  
  import concurrent.ExecutionContext.Implicits.global  

  val httpClient = Http().outgoingConnectionTls(host = "api.enphaseenergy.com")  

  private val flow: Future[SystemSummary] = Source.single(HttpRequest(uri = Uri(systemSummaryUrl)))  
      .via(httpClient)  
      .mapAsync(1)(response => Unmarshal(response.entity).to[SystemSummary])  
      .runWith(Sink.head)  

  import concurrent.duration._  

  val start = System.currentTimeMillis()  
  val result = Await.result(flow, 15 seconds)  
  val end = System.currentTimeMillis()  

  println(s"Result in ${end-start} millis: $result")  
}  

/**  
 * Entity for system summary json:  
 * {  
 * "current_power": 3322,  
 * "energy_lifetime": 19050353,  
 * "energy_today": 25639,  
 * "last_report_at": 1380632700,  
 * "modules": 31,  
 * "operational_at": 1201362300,  
 * "size_w": 5250,  
 * "source": "microinverters",  
 * "status": "normal",  
 * "summary_date": "2014-01-06",  
 * "system_id": 123  
 * }  
 */  
case class SystemSummary(system_id: Int, summary_date: String, status: String, source: String,  
                          size_w: Int, operational_at: Long, modules: Int, last_report_at: Long,  
                          energy_today: Int, energy_lifetime: Long, current_power: Int)  

At first I could not get Heiko’s Unmarchallers working and I wrote my own Unmarshaller which is not that difficult looking at some other implementations. The problem was a very vage error saying something was missing, but not exactly what. Today I figured out, it was just missing one of the required implicit arguments, the Json4s Serializers, and then it all worked nicely.

But here’s is how to implement a custom Unmarshaller which unmarshalls a HttpResponse instance:

  implicit def responseUnmarshaller[T : Manifest]: FromResponseUnmarshaller[T] = {  
    import concurrent.duration._  
    import enphase.json.Json4sProtocol._  
    import org.json4s.jackson.Serialization._  

    new Unmarshaller[HttpResponse, T] {  
      override def apply(resp: HttpResponse)(implicit ec: ExecutionContext): Future[T] = {  
        resp.entity.withContentType(ContentTypes.`application/json`)  
            .toStrict(1 second)  
            .map(_.data)  
            .map(_.decodeString(resp.entity.contentType.charset.value))  
            .map(json => { println(s"Deserialized to: $json"); json })  
            .map(json => read[T](json))  
      }  
    }  
  }  

The only change in the application needed to use this unmarshaller is to replace the ‘mapAsync’ line with:

    .mapAsync(1)(Unmarshal(_).to[SystemSummary])

The project build.sbt contains these dependencies:

scalaVersion := "2.11.6"  

libraryDependencies ++= Seq(  
  "com.typesafe.akka" % "akka-http-experimental_2.11" % "1.0-RC4",  
  "de.heikoseeberger" %% "akka-http-json4s" % "0.9.1",  
  "org.json4s" %% "json4s-jackson" % "3.2.11",  
  "org.scalatest" % "scalatest_2.11" % "2.2.4" % "test"  
)  

Happy Akka-ing!