公海赌船网址文件交换流程包括读取文件里的bytes,系统里面数据互换平常涉及文件或者数据库表类型的多少上传下载

 
 在头里几篇研商里我们都涉嫌过:Akka-http是一项系统融为一体工具库。它是以数据互换的格局开展系统融为一体的。所以,Akka-http的主干职能应该是数据互换的落实了:应该能通过某种公开的数目格式和传导标准相比较便于的兑现包括异类系统里面通过网上展开的数据交流。覆盖包括:数据编码、发送和数目接受、解析全经过。Akka-http提供了成百上千网上传输标准数据的统揽模型以及数据类型转换方法,可以使编程人士很有利的构建网上往来的Request和Response。但是,现实中的数据互换远远不止针对request和response操作能够满意的。系统里面数据互换平日涉及文件或者数据库表类型的数额上传下载。即使在Http标准中讲述了咋样通过MultiPart音信类型举行批量数据的传输,不过这些专业提到的实现细节包括数据内容叙述、数据分段形式、信息数据长度总括等等简直可以立时令人却步。Akka-http是遵照Akka-stream开发的:不但它的办事流程能够用Akka-stream来表述,它还襄助stream化的多少传输。我们通晓:Akka-stream提供了效率强大的FileIO和Data-Streaming,可以用Stream-Source代表文件或数据库数据源。简单来讲:Akka-http的新闻数据内容HttpEntity可以协理理论上极其长度的data-stream。最珍奇的是:这一个Source是个Reactive-Stream-Source,具备了back-pressure机制,可以使得应付数据交流出席两方Reactive端点不同的数码传输速率。

 
 在前方几篇啄磨里我们都涉嫌过:Akka-http是一项系统融为一体工具库。它是以数据交流的花样举行系统融为一体的。所以,Akka-http的着力职能应该是数据沟通的兑现了:应该能经过某种公开的数目格式和传导标准相比较便利的落实包括异类系统里头通过网上展开的数据互换。覆盖包括:数据编码、发送和多少接受、解析全经过。Akka-http提供了广大网上传输标准数量的不外乎模型以及数据类型转换方法,可以使编程人士很有益于的构建网上往来的Request和Response。不过,现实中的数据互换远远不止针对request和response操作可以满意的。系统里面数据互换经常涉及文件或者数据库表类型的数码上传下载。即使在Http标准中描述了什么通过MultiPart音讯类型进行批量多少的传导,不过这么些专业提到的兑现细节包括数据内容叙述、数据分段形式、音信数据长度统计等等简直可以立刻令人却步。Akka-http是基于Akka-stream开发的:不但它的办事流程可以用Akka-stream来抒发,它还扶助stream化的数额传输。大家知晓:Akka-stream提供了功能强大的FileIO和Data-Streaming,可以用Stream-Source代表文件或数据库数据源。简单的讲:Akka-http的信息数据内容HttpEntity可以支撑理论上最为长度的data-stream。最难能可贵的是:这么些Source是个Reactive-Stream-Source,具备了back-pressure机制,能够有效应付数据交流出席两方Reactive端点不同的数码传输速率。

  所谓文件交流指的是Http协议中服务端和客户端之间文件的上传和下载。Akka-http作为一种系统融为一体工具应该拥有高功用的数据交换形式包括文件交流和数据库表行的上传下载。Akka-http的数据交流格局援助流式操作:代表互换数据可以是一种无限长度流的要素。那种格局首先解决了纯Http大数量通过Multipart传输所必须开展的多寡分段操作和复杂的信息属性设定等需要的技巧门槛,再者用户还足以很有益于的选择Akka-stream对数码举办深度处理,免去了数额转换的分神。更关键的是:Akka-http还协理reactive-stream,可以制止由传输速率所发出的各样问题。在本篇大家啄磨利用Akka-http举办文件的双向传送。

 
Akka-http的stream类型数据内容是以Source[T,_]品种表示的。首先,Akka-stream通过FileIO对象提供了足足多的file-io操作函数,其中有个fromPath函数可以用某个文件内容数据构建一个Source类型:

 
Akka-http的stream类型数据内容是以Source[T,_]花色表示的。首先,Akka-stream通过FileIO对象提供了充足多的file-io操作函数,其中有个fromPath函数可以用某个文件内容数据构建一个Source类型:

 任何文件的内容储存格式无论在硬盘、内存如故数据线上都是一堆bytes。文件互换流程包括读取文件里的bytes,传送这多少个bytes,最后把那么些bytes写入文件。我们看到此间每个环节操作目标都是bytes,所以可能在先后里是不需要任何数据转换过程的。Akka提供了一组文件读写函数,如下:

/**
   * Creates a Source from a files contents.
   * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,
   * except the final element, which will be up to `chunkSize` in size.
   *
   * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
   * set it for a given Source by using [[akka.stream.ActorAttributes]].
   *
   * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
   * and a possible exception if IO operation was not completed successfully.
   *
   * @param f         the file path to read from
   * @param chunkSize the size of each read operation, defaults to 8192
   */
  def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
    fromPath(f, chunkSize, startPosition = 0)
/**
   * Creates a Source from a files contents.
   * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,
   * except the final element, which will be up to `chunkSize` in size.
   *
   * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
   * set it for a given Source by using [[akka.stream.ActorAttributes]].
   *
   * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
   * and a possible exception if IO operation was not completed successfully.
   *
   * @param f         the file path to read from
   * @param chunkSize the size of each read operation, defaults to 8192
   */
  def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
    fromPath(f, chunkSize, startPosition = 0)
  def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
    fromPath(f, chunkSize, startPosition = 0)

  def fromPath(f: Path, chunkSize: Int, startPosition: Long): Source[ByteString, Future[IOResult]] =
    Source.fromGraph(new FileSource(f, chunkSize, startPosition, DefaultAttributes.fileSource, sourceShape("FileSource")))

  def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]] =
    toPath(f, options, startPosition = 0)

  def toPath(f: Path, options: Set[OpenOption], startPosition: Long): Sink[ByteString, Future[IOResult]] =
    Sink.fromGraph(new FileSink(f, startPosition, options, DefaultAttributes.fileSink, sinkShape("FileSink")))

这多少个函数构建了Source[ByteString,Future[IOResult]],我们需要把ByteString转化成MessageEntity。首先需要在implicit-scope内提供马尔斯haller[ByteString,MessageEntity]体系的隐式实例:

以此函数构建了Source[ByteString,Future[IOResult]],我们需要把ByteString转化成MessageEntity。首先需要在implicit-scope内提供马尔斯haller[ByteString,MessageEntity]花色的隐式实例:

咱俩来看:fromPath类型是Source[ByteSgtring,_],toPath类型是Sink[ByteString,_],直接就是流型式,应该可以平昔放入Http音讯的Entity中,如下: 

trait JsonCodec extends Json4sSupport {
  import org.json4s.DefaultFormats
  import org.json4s.ext.JodaTimeSerializers
  implicit val serilizer = jackson.Serialization
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec

object ServerStreaming extends App {
  import JsConverters._
...
trait JsonCodec extends Json4sSupport {
  import org.json4s.DefaultFormats
  import org.json4s.ext.JodaTimeSerializers
  implicit val serilizer = jackson.Serialization
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec

object ServerStreaming extends App {
  import JsConverters._
...
  def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = {
    def loadFile = {
      //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
      val file = Paths.get(filePath)
      FileIO.fromPath(file, chunkSize)
        .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
    }
    limitableByteSource(loadFile)
  }

咱俩还亟需Json-Streaming补助:

大家还索要Json-Streaming辅助:

fileStream是Source[ByteString,_]可以间接放进Entity:

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
      .withParallelMarshalling(parallelism = 8, unordered = false)
  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
      .withParallelMarshalling(parallelism = 8, unordered = false)
  val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/text")
  val textData = HttpEntity(
    ContentTypes.`application/octet-stream`,
    fileStream("/Users/tiger-macpro/downloads/A4.TIF",256)
  )

FileIO是blocking操作,大家还是可以拔取独立的线程供blocking操作使用:

FileIO是blocking操作,我们还足以拔取独立的线程供blocking操作使用:

大家把fileStream放入了HttpRequest中。对于HttpResponse能够用下边的措施:

   FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
   FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
 val route = pathPrefix("file") {
    (get & path("text" / Remaining)) { fp =>
      withoutSizeLimit {
        complete(
          HttpEntity(
            ContentTypes.`application/octet-stream`,
            fileStream("/users/tiger-macpro/" + fp, 256))
        )
      }

现今我们可以从在server上用一个文书构建Source然后再转成Response:

明天我们得以从在server上用一个文书构建Source然后再转成Response:

注意:complete进行了HttpResponse的构建。因为Entity.dataByes就是Source[ByteString,_],所以我们可以直接把它导入Sink:

  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } 
    }
  def loadFile(path: String) = {
 //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val file = Paths.get("/Users/tiger/"+path)
    FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
      .map(_.utf8String)
  }
  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } 
    }
  def loadFile(path: String) = {
 //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val file = Paths.get("/Users/tiger/"+path)
    FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
      .map(_.utf8String)
  }
          entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath)))
            .onComplete { case _ => println(s"Download file saved to: $destPath") }

平等,大家也可以把数据库表内数据转成Akka-Stream-Source,然后再落实到MessageEntity的转换。转换过程包括用Query读取数据库表内数据后转成Reactive-Publisher,然后把publisher转成Akka-Stream-Source,如下:

同一,大家也可以把数据库表内数据转成Akka-Stream-Source,然后再落实到MessageEntity的更换。转换过程包括用Query读取数据库表内数据后转成Reactive-Publisher,然后把publisher转成Akka-Stream-Source,如下:

下面我们提过FileIO.toPath就是一个Sink。由于我们的指标是巨型的文本交换,所以随便上传下载都采纳了withoutSizeLimit:

object SlickDAO {
  import slick.jdbc.H2Profile.api._
  val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")
  val db = dbConfig.db

  case class CountyModel(id: Int, name: String)
  case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
    def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
    def name = column[String]("NAME",O.Length(64))
    def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
  }
  val CountyQuery = TableQuery[CountyTable]

  def loadTable(filter: String) = {
    //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val publisher = db.stream(qry.result)
    Source.fromPublisher(publisher = publisher)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
  }
}
object SlickDAO {
  import slick.jdbc.H2Profile.api._
  val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")
  val db = dbConfig.db

  case class CountyModel(id: Int, name: String)
  case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
    def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
    def name = column[String]("NAME",O.Length(64))
    def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
  }
  val CountyQuery = TableQuery[CountyTable]

  def loadTable(filter: String) = {
    //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val publisher = db.stream(qry.result)
    Source.fromPublisher(publisher = publisher)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
  }
}
 val route = pathPrefix("file") {
    (get & path("exchange" / Remaining)) { fp =>
      withoutSizeLimit {
        complete(
          HttpEntity(
            ContentTypes.`application/octet-stream`,
            fileStream("/users/tiger-macpro/" + fp, 256))
        )
      }
    } ~
      (post & path("exchange")) {
        withoutSizeLimit {
          extractDataBytes { bytes =>
            val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath)))
            onComplete(fut) { _ =>
              complete(s"Save upload file to: $destPath")
            }
          }
        }

      }

下一场开展到MessageEntity的变换:

然后举行到MessageEntity的转移:

好了下边的以身作则代码里对字符型或二进制文件都开展了置换的言传身教操作:

  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } ~
      path("tables"/Segment) { t =>
        complete(SlickDAO.loadTable(t))
      }
    }
  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } ~
      path("tables"/Segment) { t =>
        complete(SlickDAO.loadTable(t))
      }
    }

服务端:

下边是此次示范的完好源代码:

上边是此次示范的全体源代码:

 

import java.nio.file._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.common._
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson


object SlickDAO {
  import slick.jdbc.H2Profile.api._
  val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")
  val db = dbConfig.db

  case class CountyModel(id: Int, name: String)
  case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
    def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
    def name = column[String]("NAME",O.Length(64))
    def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
  }
  val CountyQuery = TableQuery[CountyTable]

  def loadTable(filter: String) = {
    //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val publisher = db.stream(qry.result)
    Source.fromPublisher(publisher = publisher)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
  }
}

trait JsonCodec extends Json4sSupport {
  import org.json4s.DefaultFormats
  import org.json4s.ext.JodaTimeSerializers
  implicit val serilizer = jackson.Serialization
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec

object ServerStreaming extends App {
  import JsConverters._

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
      .withParallelMarshalling(parallelism = 8, unordered = false)



  val (port, host) = (8011,"localhost")

  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } ~
      path("tables"/Segment) { t =>
        complete(SlickDAO.loadTable(t))
      }
    }

  def loadFile(path: String) = {
 //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val file = Paths.get("/Users/tiger/"+path)
    FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
      .map(_.utf8String)
  }

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}
import java.nio.file._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.common._
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson


object SlickDAO {
  import slick.jdbc.H2Profile.api._
  val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")
  val db = dbConfig.db

  case class CountyModel(id: Int, name: String)
  case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
    def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
    def name = column[String]("NAME",O.Length(64))
    def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
  }
  val CountyQuery = TableQuery[CountyTable]

  def loadTable(filter: String) = {
    //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val publisher = db.stream(qry.result)
    Source.fromPublisher(publisher = publisher)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
  }
}

trait JsonCodec extends Json4sSupport {
  import org.json4s.DefaultFormats
  import org.json4s.ext.JodaTimeSerializers
  implicit val serilizer = jackson.Serialization
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec

object ServerStreaming extends App {
  import JsConverters._

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
      .withParallelMarshalling(parallelism = 8, unordered = false)



  val (port, host) = (8011,"localhost")

  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } ~
      path("tables"/Segment) { t =>
        complete(SlickDAO.loadTable(t))
      }
    }

  def loadFile(path: String) = {
 //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val file = Paths.get("/Users/tiger/"+path)
    FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
      .map(_.utf8String)
  }

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpEntity._
import java.nio.file._

object FileServer extends App {

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher

  def fileStream(filePath: String, chunkSize: Int) = {
     def loadFile = {
       //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
       val file = Paths.get(filePath)
       FileIO.fromPath(file, chunkSize)
         .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
     }
    limitableByteSource(loadFile)
  }
  val destPath = "/users/tiger-macpro/downloads/A4-1.TIF"
  val route = pathPrefix("file") {
    (get & path("exchange" / Remaining)) { fp =>
      withoutSizeLimit {
        complete(
          HttpEntity(
            ContentTypes.`application/octet-stream`,
            fileStream("/users/tiger-macpro/" + fp, 256))
        )
      }
    } ~
      (post & path("exchange")) {
        withoutSizeLimit {
          extractDataBytes { bytes =>
            val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath)))
            onComplete(fut) { _ =>
              complete(s"Save upload file to: $destPath")
            }
          }
        }

      }
  }

  val (port, host) = (8011,"localhost")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}

 

 

 

 

 

客户端:

 

 

 

 

 

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpEntity.limitableByteSource
import akka.http.scaladsl.model._
import java.nio.file._
import akka.util.ByteString
import scala.util._

object FileClient extends App {

  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher

  def downloadFileTo(request: HttpRequest, destPath: String) = {
    val futResp = Http(sys).singleRequest(request)
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath)))
            .onComplete { case _ => println(s"Download file saved to: $destPath") }
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Download request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to download file!")
        case Failure(err) => println(s"Download failed: ${err.getMessage}")
      }

  }

  val dlFile = "Downloads/readme.txt"
  val downloadText = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile)

  downloadFileTo(downloadText, "/users/tiger-macpro/downloads/sample.txt")
  scala.io.StdIn.readLine()

  val dlFile2 = "Downloads/image.png"
  val downloadText2 = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile2)
  downloadFileTo(downloadText2, "/users/tiger-macpro/downloads/sample.png")
  scala.io.StdIn.readLine()

  def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {
    val futResp = Http(sys).singleRequest(
        request.copy(entity = dataEntity)
      )
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
        entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")

      }
  }

  def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = {
    def loadFile = {
      //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
      val file = Paths.get(filePath)
      FileIO.fromPath(file, chunkSize)
        .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
    }
    limitableByteSource(loadFile)
  }

  val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/exchange")
  val textData = HttpEntity(
    ContentTypes.`application/octet-stream`,
    fileStream("/Users/tiger-macpro/downloads/readme.txt",256)
  )

  uploadFile(uploadText,textData)

  scala.io.StdIn.readLine()

  sys.terminate()


}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

相关文章