haskell 理解Web.Scotty流

kcrjzv8t  于 2023-11-18  发布在  其他
关注(0)|答案(2)|浏览(114)

自从2013年接触Haskell以来,我正在编写一个小型的Web.Scotty服务来管理S3 bucket(使用Amazonka-2.0)。
网络。Scotty部分和Amazonka很清楚,但我不知道如何使它一起工作:

main :: IO ()
main = do
    env <- Amazonka.newEnv Amazonka.discover
    scotty 3000 (app env)

app :: Amazonka.Env -> ScottyM ()
app env = do
    get "/stream-file" $ do
        runResourceT $ do
            resp <- runResourceT $ Amazonka.send env (newGetObject "bucket" "file")

            (resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . print))

            lift $ stream $ \send flush -> do
                (resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . send) >> liftIO flush)

字符串
我试着删除runResourceT在这里,没有任何变化:

resp <- Amazonka.send env (newGetObject "bucket" "file")


这将工作并成功打印到控制台:

(resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . print))


这不起作用(如果打印部分被注解掉),并出现错误:

lift $ stream $ \send flush -> do
    (resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . send) >> liftIO flush)


错误代码:

HttpExceptionRequest Request {
  host                 = "bucket.s3.us-east-1.amazonaws.com"
  port                 = 443
  secure               = True
  requestHeaders       = [("X-Amz-Content-SHA256",""),("X-Amz-Date",""),("Host","bucket.s3.us-east-1.amazonaws.com"),("Authorization","<REDACTED>")]
  path                 = "/file"
  queryString          = ""
  method               = "GET"
  proxy                = Nothing
  rawBody              = False
  redirectCount        = 0
  responseTimeout      = ResponseTimeoutMicro 70000000
  requestVersion       = HTTP/1.1
  proxySecureMode      = ProxySecureWithConnect
}
 ConnectionClosed


我错过了什么?

holgip5t

holgip5t1#

如果您尝试:

{-# LANGUAGE OverloadedStrings #-}

module Main where

import Data.Binary.Builder (fromByteString)
import Web.Scotty
import Web.Scotty
import Data.Conduit ((.|), ConduitT, yield, runConduit)
import qualified Data.Conduit.Combinators as CC
import Control.Monad.IO.Class
import Control.Lens
import Control.Monad.Trans.Class (lift)
import Control.Concurrent (threadDelay)
import Data.ByteString (ByteString)

import Data.IORef

slowSource :: MonadIO m => IORef Bool -> ConduitT a ByteString m ()
slowSource state = do
  x <- liftIO $ readIORef state
  yield ("state: " <> (if x then "T" else "F") <> "\n")
  liftIO $ threadDelay 1000000
  slowSource state

main :: IO ()
main = do
    state <- newIORef False
    scotty 3000 (app state)

app :: IORef Bool -> ScottyM ()
app state = do
    get "/stream-file" $ do
      liftIO $ writeIORef state True

      stream $ \send flush -> do
          runConduit $ slowSource state .| CC.map fromByteString .| CC.mapM_ (\chunk -> liftIO (send chunk >> flush))

      liftIO $ writeIORef state False

字符串
你会看到:

curl http://localhost:3000/stream-file
state: F
state: F
state: F
state: F
state: F
^C


这表明stream实际上只“设置”了管道,但它实际上是在处理程序完成之后执行的,即在您的资源被释放之后(在您的情况下,连接到AWS)。

hrysbysz

hrysbysz2#

看起来Amazonka要求执行Amazonka.send操作的ResourceT保持打开状态,直到身体管道实际被流式传输。这在Amazonka.Response module中有某种程度的记录。
在您的代码中,stream调用设置了流操作,但实际上并没有执行sinkBody,因此外部ResourceT会打包并允许在Scotty调用流操作(包括执行sinkBody)之前关闭连接。
在Scotty服务器中运行一个ResourceT似乎是最安全和最简单的,它在服务器启动时打开,只有在服务器终止时才关闭。(我担心这可能会泄漏连接,但Amazonka似乎参与了足够的连接管理,这不是一个问题。)
要做到这一点,而不给Scotty软件包带来重大的大脑手术,您可以定义以下函数,允许您“unlift”ResourceT Transformer -基本上,在IO中使用“escape hatch”对单个共享ResourceT执行所有操作:

runWithResourceT :: ((forall m a. (MonadIO m) => ResourceT IO a -> m a) -> IO b) -> IO b
runWithResourceT act = runResourceT $ withRunInIO $ \runInIO -> act (liftIO . runInIO)

字符串
有了这个函数,你可以在一个活动的ResourceT上下文中运行你的应用程序,如下所示:

main :: IO ()
main = do
  ...
  runWithResourceT $ \withResourceT -> scotty 3000 (app env withResourceT)


其中,app在基于IO的普通ScottyM monad中运行,在需要时使用withResourceT。我在这里避免了sinkBody,因为它通过runConduitRes调用自己的fresh runResourceT。相反,我使用withResourceT手动运行body管道:

app :: Amazonka.Env -> (forall m a. (MonadIO m) => ResourceT IO a -> m a) -> ScottyM ()
app env withResourceT = get "/stream-file" $ do
  resp <- withResourceT $ Amazonka.send env (newGetObject "bucket" "file")
  stream $ \send flush -> do
    withResourceT $ runConduit $
      (resp ^. getObjectResponse_body._ResponseBody)
      .| mapC fromByteString
      .| mapM_C (liftIO . send)
    flush


这是我的完整程序。我测试了它,它似乎可以工作。连接有时会打开一点(比如说,30秒左右),但它们最终会关闭,所以它似乎没有泄漏任何东西。

{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE OverloadedStrings #-}

module Main where

import Amazonka
import Amazonka.S3
import Amazonka.S3.Lens
import Conduit
import Control.Lens
import Data.Binary.Builder
import System.IO
import Web.Scotty

runWithResourceT :: ((forall m a. (MonadIO m) => ResourceT IO a -> m a) -> IO b) -> IO b
runWithResourceT act = runResourceT $ withRunInIO $ \runInIO -> act (liftIO . runInIO)

main :: IO ()
main = do
  logger <- newLogger Debug stdout
  discover <- newEnv Amazonka.discover
  let env = discover
        { Amazonka.logger = logger
        , Amazonka.region = Amazonka.Ohio
        }
  runWithResourceT $ \withResourceT -> scotty 3000 (app env withResourceT)

app :: Amazonka.Env -> (forall m a. (MonadIO m) => ResourceT IO a -> m a) -> ScottyM ()
app env withResourceT = get "/stream-file" $ do
  resp <- withResourceT $ Amazonka.send env (newGetObject "bucket" "file")
  stream $ \send flush -> do
    withResourceT $ runConduit $
      (resp ^. getObjectResponse_body._ResponseBody)
      .| mapC fromByteString
      .| mapM_C (liftIO . send)
    flush

相关问题