High-level Server-Side API
In addition to the Low-Level Server-Side API Akka HTTP provides a very flexible "Routing DSL" for elegantly defining RESTful web services. It picks up where the low-level API leaves off and offers much of the higher-level functionality of typical web servers or frameworks, like deconstruction of URIs, content negotiation or static content serving.
注釈
It is recommended to read the Implications of the streaming nature of Request/Response Entities section, as it explains the underlying full-stack streaming concepts, which may be unexpected when coming from a background with non-"streaming first" HTTP Servers.
Minimal Example
This is a complete, very basic Akka HTTP application relying on the Routing DSL:
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.io.StdIn
object WebServer {
def main(args: Array[String]) {
implicit val system = ActorSystem("my-system")
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val route =
path("hello") {
get {
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
}
}
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
}
It starts an HTTP Server on localhost and replies to GET requests to /hello
with a simple response.
Longer Example
The following is an Akka HTTP route definition that tries to show off a few features. The resulting service does not really do anything useful but its definition should give you a feel for what an actual API definition with the Routing DSL will look like:
import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.coding.Deflate
import akka.http.scaladsl.marshalling.ToResponseMarshaller
import akka.http.scaladsl.model.StatusCodes.MovedPermanently
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.unmarshalling.FromRequestUnmarshaller
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.util.Timeout
// types used by the API routes
type Money = Double // only for demo purposes, don't try this at home!
type TransactionResult = String
case class User(name: String)
case class Order(email: String, amount: Money)
case class Update(order: Order)
case class OrderItem(i: Int, os: Option[String], s: String)
// marshalling would usually be derived automatically using libraries
implicit val orderUM: FromRequestUnmarshaller[Order] = ???
implicit val orderM: ToResponseMarshaller[Order] = ???
implicit val orderSeqM: ToResponseMarshaller[Seq[Order]] = ???
implicit val timeout: Timeout = ??? // for actor asks
implicit val ec: ExecutionContext = ???
implicit val mat: ActorMaterializer = ???
implicit val sys: ActorSystem = ???
// backend entry points
def myAuthenticator: Authenticator[User] = ???
def retrieveOrdersFromDB: Seq[Order] = ???
def myDbActor: ActorRef = ???
def processOrderRequest(id: Int, complete: Order => Unit): Unit = ???
val route = {
path("orders") {
authenticateBasic(realm = "admin area", myAuthenticator) { user =>
get {
encodeResponseWith(Deflate) {
complete {
// marshal custom object with in-scope marshaller
retrieveOrdersFromDB
}
}
} ~
post {
// decompress gzipped or deflated requests if required
decodeRequest {
// unmarshal with in-scope unmarshaller
entity(as[Order]) { order =>
complete {
// ... write order to DB
"Order received"
}
}
}
}
}
} ~
// extract URI path element as Int
pathPrefix("order" / IntNumber) { orderId =>
pathEnd {
(put | parameter('method ! "put")) {
// form extraction from multipart or www-url-encoded forms
formFields(('email, 'total.as[Money])).as(Order) { order =>
complete {
// complete with serialized Future result
(myDbActor ? Update(order)).mapTo[TransactionResult]
}
}
} ~
get {
// debugging helper
logRequest("GET-ORDER") {
// use in-scope marshaller to create completer function
completeWith(instanceOf[Order]) { completer =>
// custom
processOrderRequest(orderId, completer)
}
}
}
} ~
path("items") {
get {
// parameters to case class extraction
parameters(('size.as[Int], 'color ?, 'dangerous ? "no"))
.as(OrderItem) { orderItem =>
// ... route using case class instance created from
// required and optional query parameters
}
}
}
} ~
pathPrefix("documentation") {
// optionally compresses the response with Gzip or Deflate
// if the client accepts compressed responses
encodeResponse {
// serve up static content from a JAR resource
getFromResourceDirectory("docs")
}
} ~
path("oldApi" / Remaining) { pathRest =>
redirect("http://oldapi.example.com/" + pathRest, MovedPermanently)
}
}
tream random numbers" in compileOnlySpec {
Handling HTTP Server failures in the High-Level API
There are various situations when failure may occur while initialising or running an Akka HTTP server. Akka by default will log all these failures, however sometimes one may want to react to failures in addition to them just being logged, for example by shutting down the actor system, or notifying some external monitoring end-point explicitly.
Bind failures
For example the server might be unable to bind to the given port. For example when the port
is already taken by another application, or if the port is privileged (i.e. only usable by root
).
In this case the "binding future" will fail immediately, and we can react to if by listening on the Future's completion:
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.concurrent.Future
object WebServer {
def main(args: Array[String]) {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// needed for the future onFailure in the end
implicit val executionContext = system.dispatcher
val handler = get {
complete("Hello world!")
}
// let's say the OS won't allow us to bind to 80.
val (host, port) = ("localhost", 80)
val bindingFuture: Future[ServerBinding] =
Http().bindAndHandle(handler, host, port)
bindingFuture.onFailure {
case ex: Exception =>
log.error(ex, "Failed to bind to {}:{}!", host, port)
}
}
}
注釈
For a more low-level overview of the kinds of failures that can happen and also more fine-grained control over them refer to the Handling HTTP Server failures in the Low-Level API documentation.
Failures and exceptions inside the Routing DSL
Exception handling within the Routing DSL is done by providing ExceptionHandler
s which are documented in-depth
in the Exception Handling section of the documtnation. You can use them to transform exceptions into
HttpResponse
s with apropriate error codes and human-readable failure descriptions.
File uploads
For high level directives to handle uploads see the FileUploadDirectives.
Handling a simple file upload from for example a browser form with a file input can be done by accepting a Multipart.FormData entity, note that the body parts are Source rather than all available right away, and so is the individual body part payload so you will need to consume those streams both for the file and for the form fields.
Here is a simple example which just dumps the uploaded file into a temporary file on disk, collects some form fields and saves an entry to a fictive database:
val uploadVideo =
path("video") {
entity(as[Multipart.FormData]) { formData =>
// collect all parts of the multipart as it arrives into a map
val allPartsF: Future[Map[String, Any]] = formData.parts.mapAsync[(String, Any)](1) {
case b: BodyPart if b.name == "file" =>
// stream into a file as the chunks of it arrives and return a future
// file to where it got stored
val file = File.createTempFile("upload", "tmp")
b.entity.dataBytes.runWith(FileIO.toPath(file.toPath)).map(_ =>
(b.name -> file))
case b: BodyPart =>
// collect form field values
b.toStrict(2.seconds).map(strict =>
(b.name -> strict.entity.data.utf8String))
}.runFold(Map.empty[String, Any])((map, tuple) => map + tuple)
val done = allPartsF.map { allParts =>
// You would have some better validation/unmarshalling here
db.create(Video(
file = allParts("file").asInstanceOf[File],
title = allParts("title").asInstanceOf[String],
author = allParts("author").asInstanceOf[String]))
}
// when processing have finished create a response for the user
onSuccess(allPartsF) { allParts =>
complete {
"ok!"
}
}
}
}
You can transform the uploaded files as they arrive rather than storing then in a temporary file as
in the previous example. In this example we accept any number of .csv
files, parse those into lines
and split each line before we send it to an actor for further processing:
val splitLines = Framing.delimiter(ByteString("\n"), 256)
val csvUploads =
path("metadata" / LongNumber) { id =>
entity(as[Multipart.FormData]) { formData =>
val done: Future[Done] = formData.parts.mapAsync(1) {
case b: BodyPart if b.filename.exists(_.endsWith(".csv")) =>
b.entity.dataBytes
.via(splitLines)
.map(_.utf8String.split(",").toVector)
.runForeach(csv =>
metadataActor ! MetadataActor.Entry(id, csv))
case _ => Future.successful(Done)
}.runWith(Sink.ignore)
// when processing have finished create a response for the user
onSuccess(done) { _ =>
complete {
"ok!"
}
}
}
}
Configuring Server-side HTTPS
For detailed documentation about configuring and using HTTPS on the server-side refer to Server-Side HTTPS Support.
Contents