自从2013年接触Haskell以来,我正在编写一个小型的Web.Scotty服务来管理S3 bucket(使用Amazonka-2.0)。
网络。Scotty部分和Amazonka很清楚,但我不知道如何使它一起工作:
main :: IO ()
main = do
env <- Amazonka.newEnv Amazonka.discover
scotty 3000 (app env)
app :: Amazonka.Env -> ScottyM ()
app env = do
get "/stream-file" $ do
runResourceT $ do
resp <- runResourceT $ Amazonka.send env (newGetObject "bucket" "file")
(resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . print))
lift $ stream $ \send flush -> do
(resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . send) >> liftIO flush)
字符串
我试着删除runResourceT
在这里,没有任何变化:
resp <- Amazonka.send env (newGetObject "bucket" "file")
型
这将工作并成功打印到控制台:
(resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . print))
型
这不起作用(如果打印部分被注解掉),并出现错误:
lift $ stream $ \send flush -> do
(resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . send) >> liftIO flush)
型
错误代码:
HttpExceptionRequest Request {
host = "bucket.s3.us-east-1.amazonaws.com"
port = 443
secure = True
requestHeaders = [("X-Amz-Content-SHA256",""),("X-Amz-Date",""),("Host","bucket.s3.us-east-1.amazonaws.com"),("Authorization","<REDACTED>")]
path = "/file"
queryString = ""
method = "GET"
proxy = Nothing
rawBody = False
redirectCount = 0
responseTimeout = ResponseTimeoutMicro 70000000
requestVersion = HTTP/1.1
proxySecureMode = ProxySecureWithConnect
}
ConnectionClosed
型
我错过了什么?
2条答案
按热度按时间holgip5t1#
如果您尝试:
字符串
你会看到:
型
这表明
stream
实际上只“设置”了管道,但它实际上是在处理程序完成之后执行的,即在您的资源被释放之后(在您的情况下,连接到AWS)。hrysbysz2#
看起来Amazonka要求执行
Amazonka.send
操作的ResourceT
保持打开状态,直到身体管道实际被流式传输。这在Amazonka.Response
module中有某种程度的记录。在您的代码中,
stream
调用设置了流操作,但实际上并没有执行sinkBody
,因此外部ResourceT
会打包并允许在Scotty调用流操作(包括执行sinkBody
)之前关闭连接。在Scotty服务器中运行一个
ResourceT
似乎是最安全和最简单的,它在服务器启动时打开,只有在服务器终止时才关闭。(我担心这可能会泄漏连接,但Amazonka似乎参与了足够的连接管理,这不是一个问题。)要做到这一点,而不给Scotty软件包带来重大的大脑手术,您可以定义以下函数,允许您“unlift”
ResourceT
Transformer -基本上,在IO
中使用“escape hatch”对单个共享ResourceT
执行所有操作:字符串
有了这个函数,你可以在一个活动的
ResourceT
上下文中运行你的应用程序,如下所示:型
其中,
app
在基于IO的普通ScottyM
monad中运行,在需要时使用withResourceT
。我在这里避免了sinkBody
,因为它通过runConduitRes
调用自己的freshrunResourceT
。相反,我使用withResourceT
手动运行body管道:型
这是我的完整程序。我测试了它,它似乎可以工作。连接有时会打开一点(比如说,30秒左右),但它们最终会关闭,所以它似乎没有泄漏任何东西。
型