从akka.stream.scaladsl.Source阅读第一个字节

amrnrhlw  于 2022-11-06  发布在  Scala
关注(0)|答案(3)|浏览(154)

我尝试从akka.stream.scaladsl.Source[ByteString, Any]读取前16个字节,并返回[Array[Byte], Source[ByteString, Any]]
在阅读前16个字节之后,我想像往常一样对剩余的Source进行流处理。

使用案例:

Source[ByteString, Any]是一个加密的流,流中的前16个字节是初始化向量。我需要得到初始化向量才能解密流的其余部分。
这就是我所尝试的:

Source.single(ByteString("This is my test string"))
      .prefixAndTail(16).runWith(Sink.head)

我想这样的东西,但是prefixAndTail需要输入的元素数。元素数不是字节数。
如果你有什么建议,请告诉我。谢谢!

bxpogfeg

bxpogfeg1#

下面的示例对您的用例做了几个假设:

  • Source中的第一个ByteString元素总是包含16字节的初始化向量(我在这里将其称为“密钥”)。第一个元素中的其余字节(即前16个字节以外的字节)可以用密钥解密。(为简单起见,本示例将前三个字节视为密钥。)
  • 解密后的值为String
val b1 = ByteString.fromString("abcdef")
val b2 = ByteString.fromString("ghijkl")
val b3 = ByteString.fromString("mnopqr")
val b4 = ByteString.fromString("stuvwx")

val byteStringSource = Source(Vector(b1, b2, b3, b4))

// The first value in the tuple argument is the ByteString key, the second is
// the encrypted ByteString. Returns the original encrypted ByteString and the
// decrypted String as a Some (or None if the decryption fails).
def decrypt(keyAndEncrypted: (ByteString, ByteString)): (ByteString, Option[String]) = {
  // do fancy decryption stuff with the key
  (keyAndEncrypted._2, Option(keyAndEncrypted._2.utf8String.toUpperCase))
}

val decryptionFlow = Flow.fromFunction(decrypt)

val decryptedSource: Source[(ByteString, Option[String]), NotUsed] =
  byteStringSource
    .prefixAndTail(1)
    .map {
      case (prefix, tail) =>
        val (key, rest) = prefix.head.splitAt(3) // using head instead of headOption for simplicity
        (key, Source(Vector(rest)).concat(tail))
    }
    .collect { case (key, bSource) => bSource.map(b => (key, b)) }
    .flatMapConcat(identity)
    .via(decryptionFlow)

decryptedSource.runForeach {
  case (encrypted, decrypted) =>
    println((encrypted.utf8String, decrypted))
}

运行上面的命令将打印以下内容:

(def,Some(DEF))
(ghijkl,Some(GHIJKL))
(mnopqr,Some(MNOPQR))
(stuvwx,Some(STUVWX))

在本例中,我取Source中第一个ByteString的前三个字节作为键,将ByteString中剩余的三个字节作为Source的其余部分的前缀(尾巴)、然后,变换所得到的x1M7N1x,使得密钥与每个x1M8N1x元素耦合。然后,Source通过Flow扁平化并解密。Flow返回原始加密的ByteString和包含解密值的Option[String]
希望这至少能给您的用例带来一些灵感和想法。

raogr8fs

raogr8fs2#

需要注意的几件事:
1.由于输入源来自网络,因此某些ByteString可能为空
1.我们需要前16个字节来正确解密流的其余部分
我将在代码中留下一些注解作为解释。

source
    .via(Flow[ByteString].map(d => {
      // Converts Source[ByteString] to Source[List[Byte]]
      d.toByteBuffer.array().toList
    }))
    // Source[List[Byte]] to Source[Byte]
    .mapConcat(identity)
    // Get the first 16 bytes from Source[Byte] and a stream of the remaining bytes Source[(Seq[byte], Source[Byte])
    .prefixAndTail(16)
    // Source[(Seq[byte], Source[Byte]) to Source[Source[(Seq[Byte], Array[Byte])]]
    .collect { case (key, source) =>      
      source.map(b => (key, Array(b)))
    }
    // Source[Source[(Seq[Byte], Array[Byte])]] to Source[(Seq[Byte], Array[Byte])]
    .flatMapConcat(identity)
    .runForeach {
      case (key, rest) =>
        println(s"${key.map(_.toChar).mkString} : ${rest.map(_.toChar).mkString}")
    }

包含空ByteString的测试示例:

val source = Source(Iterable[ByteString](
    ByteString(""), // empty ByteString to imitate empty element from database stream
    ByteString("abcdefghijklmnop <- first 16 bytes"))
  )

结果预期abcdefghijklmnop为前16个字节

abcdefghijklmnop :  
abcdefghijklmnop : <
abcdefghijklmnop : -
abcdefghijklmnop :  
abcdefghijklmnop : f
abcdefghijklmnop : i
abcdefghijklmnop : r
abcdefghijklmnop : s
abcdefghijklmnop : t
abcdefghijklmnop :  
abcdefghijklmnop : 1
abcdefghijklmnop : 6
abcdefghijklmnop :  
abcdefghijklmnop : b
abcdefghijklmnop : y
abcdefghijklmnop : t
abcdefghijklmnop : e
abcdefghijklmnop : s
brjng4g3

brjng4g33#

老好人 java 来救场了:

val ivBytesBuffer = new Array[Byte](16)
val is = new FileInputStream(fileName)
is.read(ivBytesBuffer)

val source = StreamConverters.fromInputStream(() => is)
decryptAes(source, keySpec, ivBytesBuffer)

Read First 4 Bytes of File中所述

相关问题