{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.Server.Worker (
worker
) where
import Control.Concurrent.STM
import Control.Exception (SomeException(..), AsyncException(..))
import qualified Control.Exception as E
import Data.IORef
import qualified Network.HTTP.Types as H
import qualified System.TimeManager as T
import Imports hiding (insert)
import Network.HPACK
import Network.HPACK.Token
import Network.HTTP2
import Network.HTTP2.Priority
import Network.HTTP2.Server.API
import Network.HTTP2.Server.Context
import Network.HTTP2.Server.EncodeFrame
import Network.HTTP2.Server.Manager
import Network.HTTP2.Server.Queue
import Network.HTTP2.Server.Stream
import Network.HTTP2.Server.Types
pushStream :: Context
-> Stream
-> ValueTable
-> [PushPromise]
-> IO OutputType
pushStream :: Context -> Stream -> ValueTable -> [PushPromise] -> IO OutputType
pushStream _ _ _ [] = OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
ORspn
pushStream ctx :: Context
ctx@Context{..} pstrm :: Stream
pstrm reqvt :: ValueTable
reqvt pps0 :: [PushPromise]
pps0
| StreamId
len StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== 0 = OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
ORspn
| Bool
otherwise = do
Bool
pushable <- Settings -> Bool
enablePush (Settings -> Bool) -> IO Settings -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef IORef Settings
http2settings
if Bool
pushable then do
TVar StreamId
tvar <- StreamId -> IO (TVar StreamId)
forall a. a -> IO (TVar a)
newTVarIO 0
StreamId
lim <- TVar StreamId -> [PushPromise] -> StreamId -> IO StreamId
forall a.
Num a =>
TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar StreamId
tvar [PushPromise]
pps0 0
if StreamId
lim StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== 0 then
OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
ORspn
else
OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return (OutputType -> IO OutputType) -> OutputType -> IO OutputType
forall a b. (a -> b) -> a -> b
$ IO () -> OutputType
OWait (StreamId -> TVar StreamId -> IO ()
forall a. Ord a => a -> TVar a -> IO ()
waiter StreamId
lim TVar StreamId
tvar)
else
OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
ORspn
where
!pid :: StreamId
pid = Stream -> StreamId
streamNumber Stream
pstrm
!len :: StreamId
len = [PushPromise] -> StreamId
forall (t :: * -> *) a. Foldable t => t a -> StreamId
length [PushPromise]
pps0
increment :: TVar a -> IO ()
increment tvar :: TVar a
tvar = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar a -> (a -> a) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar a
tvar (a -> a -> a
forall a. Num a => a -> a -> a
+1)
waiter :: a -> TVar a -> IO ()
waiter lim :: a
lim tvar :: TVar a
tvar = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
a
n <- TVar a -> STM a
forall a. TVar a -> STM a
readTVar TVar a
tvar
Bool -> STM ()
check (a
n a -> a -> Bool
forall a. Ord a => a -> a -> Bool
>= a
lim)
push :: TVar a -> [PushPromise] -> StreamId -> IO StreamId
push _ [] !StreamId
n = StreamId -> IO StreamId
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
n :: Int)
push tvar :: TVar a
tvar (pp :: PushPromise
pp:pps :: [PushPromise]
pps) !StreamId
n = do
StreamId
ws <- Settings -> StreamId
initialWindowSize (Settings -> StreamId) -> IO Settings -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef IORef Settings
http2settings
let !w :: StreamId
w = PushPromise -> StreamId
promiseWeight PushPromise
pp
!pri :: Priority
pri = Priority
defaultPriority { weight :: StreamId
weight = StreamId
w }
!pre :: Precedence
pre = Priority -> Precedence
toPrecedence Priority
pri
Stream
newstrm <- Context -> StreamId -> Precedence -> IO Stream
newPushStream Context
ctx StreamId
ws Precedence
pre
let !sid :: StreamId
sid = Stream -> StreamId
streamNumber Stream
newstrm
StreamTable -> StreamId -> Stream -> IO ()
insert StreamTable
streamTable StreamId
sid Stream
newstrm
let !scheme :: HeaderValue
scheme = Maybe HeaderValue -> HeaderValue
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe HeaderValue -> HeaderValue)
-> Maybe HeaderValue -> HeaderValue
forall a b. (a -> b) -> a -> b
$ Token -> ValueTable -> Maybe HeaderValue
getHeaderValue Token
tokenScheme ValueTable
reqvt
!auth :: HeaderValue
auth = Maybe HeaderValue -> HeaderValue
forall a. HasCallStack => Maybe a -> a
fromJust (Token -> ValueTable -> Maybe HeaderValue
getHeaderValue Token
tokenHost ValueTable
reqvt
Maybe HeaderValue -> Maybe HeaderValue -> Maybe HeaderValue
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Token -> ValueTable -> Maybe HeaderValue
getHeaderValue Token
tokenAuthority ValueTable
reqvt)
!path :: HeaderValue
path = PushPromise -> HeaderValue
promiseRequestPath PushPromise
pp
!promiseRequest :: [(Token, HeaderValue)]
promiseRequest = [(Token
tokenMethod, HeaderValue
H.methodGet)
,(Token
tokenScheme, HeaderValue
scheme)
,(Token
tokenAuthority, HeaderValue
auth)
,(Token
tokenPath, HeaderValue
path)]
!ot :: OutputType
ot = [(Token, HeaderValue)] -> StreamId -> OutputType
OPush [(Token, HeaderValue)]
promiseRequest StreamId
pid
!rsp :: Response
rsp = PushPromise -> Response
promiseResponse PushPromise
pp
!out :: Output
out = Stream
-> Response
-> OutputType
-> Maybe (TBQueue RspStreaming)
-> IO ()
-> Output
Output Stream
newstrm Response
rsp OutputType
ot Maybe (TBQueue RspStreaming)
forall a. Maybe a
Nothing (IO () -> Output) -> IO () -> Output
forall a b. (a -> b) -> a -> b
$ TVar a -> IO ()
forall a. Num a => TVar a -> IO ()
increment TVar a
tvar
PriorityTree Output -> Output -> IO ()
enqueueOutput PriorityTree Output
outputQ Output
out
TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar a
tvar [PushPromise]
pps (StreamId
n StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
+ 1)
response :: Context -> Manager -> T.Handle -> ThreadContinue -> Stream -> Request -> Response -> [PushPromise] -> IO ()
response :: Context
-> Manager
-> Handle
-> ThreadContinue
-> Stream
-> Request
-> Response
-> [PushPromise]
-> IO ()
response ctx :: Context
ctx@Context{..} mgr :: Manager
mgr th :: Handle
th tconf :: ThreadContinue
tconf strm :: Stream
strm req :: Request
req rsp :: Response
rsp pps :: [PushPromise]
pps = case Response -> ResponseBody
responseBody Response
rsp of
RspNoBody -> do
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
PriorityTree Output -> Output -> IO ()
enqueueOutput PriorityTree Output
outputQ (Output -> IO ()) -> Output -> IO ()
forall a b. (a -> b) -> a -> b
$ Stream
-> Response
-> OutputType
-> Maybe (TBQueue RspStreaming)
-> IO ()
-> Output
Output Stream
strm Response
rsp OutputType
ORspn Maybe (TBQueue RspStreaming)
forall a. Maybe a
Nothing (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
RspBuilder _ -> do
OutputType
otyp <- Context -> Stream -> ValueTable -> [PushPromise] -> IO OutputType
pushStream Context
ctx Stream
strm ValueTable
reqvt [PushPromise]
pps
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
PriorityTree Output -> Output -> IO ()
enqueueOutput PriorityTree Output
outputQ (Output -> IO ()) -> Output -> IO ()
forall a b. (a -> b) -> a -> b
$ Stream
-> Response
-> OutputType
-> Maybe (TBQueue RspStreaming)
-> IO ()
-> Output
Output Stream
strm Response
rsp OutputType
otyp Maybe (TBQueue RspStreaming)
forall a. Maybe a
Nothing (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
RspFile _ -> do
OutputType
otyp <- Context -> Stream -> ValueTable -> [PushPromise] -> IO OutputType
pushStream Context
ctx Stream
strm ValueTable
reqvt [PushPromise]
pps
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
PriorityTree Output -> Output -> IO ()
enqueueOutput PriorityTree Output
outputQ (Output -> IO ()) -> Output -> IO ()
forall a b. (a -> b) -> a -> b
$ Stream
-> Response
-> OutputType
-> Maybe (TBQueue RspStreaming)
-> IO ()
-> Output
Output Stream
strm Response
rsp OutputType
otyp Maybe (TBQueue RspStreaming)
forall a. Maybe a
Nothing (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
RspStreaming strmbdy :: (Builder -> IO ()) -> IO () -> IO ()
strmbdy -> do
OutputType
otyp <- Context -> Stream -> ValueTable -> [PushPromise] -> IO OutputType
pushStream Context
ctx Stream
strm ValueTable
reqvt [PushPromise]
pps
Manager -> IO ()
spawnAction Manager
mgr
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
False
TBQueue RspStreaming
tbq <- Natural -> IO (TBQueue RspStreaming)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO 10
PriorityTree Output -> Output -> IO ()
enqueueOutput PriorityTree Output
outputQ (Output -> IO ()) -> Output -> IO ()
forall a b. (a -> b) -> a -> b
$ Stream
-> Response
-> OutputType
-> Maybe (TBQueue RspStreaming)
-> IO ()
-> Output
Output Stream
strm Response
rsp OutputType
otyp (TBQueue RspStreaming -> Maybe (TBQueue RspStreaming)
forall a. a -> Maybe a
Just TBQueue RspStreaming
tbq) (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
let push :: Builder -> IO ()
push b :: Builder
b = do
Handle -> IO ()
T.pause Handle
th
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue RspStreaming -> RspStreaming -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue RspStreaming
tbq (Builder -> RspStreaming
RSBuilder Builder
b)
Handle -> IO ()
T.resume Handle
th
flush :: IO ()
flush = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue RspStreaming -> RspStreaming -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue RspStreaming
tbq RspStreaming
RSFlush
(Builder -> IO ()) -> IO () -> IO ()
strmbdy Builder -> IO ()
push IO ()
flush
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue RspStreaming -> RspStreaming -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue RspStreaming
tbq RspStreaming
RSFinish
Manager -> IO ()
deleteMyId Manager
mgr
where
(_,reqvt :: ValueTable
reqvt) = Request -> ([(Token, HeaderValue)], ValueTable)
requestHeaders Request
req
worker :: Context -> Manager -> Server -> Action
worker :: Context -> Manager -> Server -> IO ()
worker ctx :: Context
ctx@Context{TQueue Input
inputQ :: TQueue Input
inputQ :: Context -> TQueue Input
inputQ,TQueue Control
controlQ :: TQueue Control
controlQ :: Context -> TQueue Control
controlQ} mgr :: Manager
mgr server :: Server
server = do
StreamInfo
sinfo <- IO StreamInfo
newStreamInfo
ThreadContinue
tcont <- IO ThreadContinue
newThreadContinue
Manager -> (Handle -> IO ()) -> IO ()
timeoutKillThread Manager
mgr ((Handle -> IO ()) -> IO ()) -> (Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamInfo -> ThreadContinue -> Handle -> IO ()
go StreamInfo
sinfo ThreadContinue
tcont
where
go :: StreamInfo -> ThreadContinue -> Handle -> IO ()
go sinfo :: StreamInfo
sinfo tcont :: ThreadContinue
tcont th :: Handle
th = do
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tcont Bool
True
Either SomeException ()
ex <- IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
E.try (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ do
Handle -> IO ()
T.pause Handle
th
Input strm :: Stream
strm req :: Request
req <- STM Input -> IO Input
forall a. STM a -> IO a
atomically (STM Input -> IO Input) -> STM Input -> IO Input
forall a b. (a -> b) -> a -> b
$ TQueue Input -> STM Input
forall a. TQueue a -> STM a
readTQueue TQueue Input
inputQ
let req' :: Request
req' = Request -> Handle -> Request
pauseRequestBody Request
req Handle
th
StreamInfo -> Stream -> IO ()
setStreamInfo StreamInfo
sinfo Stream
strm
Handle -> IO ()
T.resume Handle
th
Handle -> IO ()
T.tickle Handle
th
let aux :: Aux
aux = Handle -> Aux
Aux Handle
th
Server
server Request
req Aux
aux ((Response -> [PushPromise] -> IO ()) -> IO ())
-> (Response -> [PushPromise] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Context
-> Manager
-> Handle
-> ThreadContinue
-> Stream
-> Request
-> Response
-> [PushPromise]
-> IO ()
response Context
ctx Manager
mgr Handle
th ThreadContinue
tcont Stream
strm Request
req'
Bool
cont1 <- case Either SomeException ()
ex of
Right () -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Left e :: SomeException
e@(SomeException _)
| Just ThreadKilled <- SomeException -> Maybe AsyncException
forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
| Just T.TimeoutThread <- SomeException -> Maybe TimeoutThread
forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> do
StreamInfo -> IO ()
cleanup StreamInfo
sinfo
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
| Bool
otherwise -> do
StreamInfo -> IO ()
cleanup StreamInfo
sinfo
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Bool
cont2 <- ThreadContinue -> IO Bool
getThreadContinue ThreadContinue
tcont
StreamInfo -> IO ()
clearStreamInfo StreamInfo
sinfo
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
cont1 Bool -> Bool -> Bool
&& Bool
cont2) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamInfo -> ThreadContinue -> Handle -> IO ()
go StreamInfo
sinfo ThreadContinue
tcont Handle
th
pauseRequestBody :: Request -> Handle -> Request
pauseRequestBody req :: Request
req th :: Handle
th = Request
req { requestBody :: IO HeaderValue
requestBody = IO HeaderValue
readBody' }
where
!readBody :: IO HeaderValue
readBody = Request -> IO HeaderValue
requestBody Request
req
!readBody' :: IO HeaderValue
readBody' = do
Handle -> IO ()
T.pause Handle
th
HeaderValue
bs <- IO HeaderValue
readBody
Handle -> IO ()
T.resume Handle
th
HeaderValue -> IO HeaderValue
forall (m :: * -> *) a. Monad m => a -> m a
return HeaderValue
bs
cleanup :: StreamInfo -> IO ()
cleanup sinfo :: StreamInfo
sinfo = do
Maybe Stream
minp <- StreamInfo -> IO (Maybe Stream)
getStreamInfo StreamInfo
sinfo
case Maybe Stream
minp of
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just strm :: Stream
strm -> do
Context -> Stream -> ClosedCode -> IO ()
closed Context
ctx Stream
strm ClosedCode
Killed
let !frame :: HeaderValue
frame = ErrorCodeId -> StreamId -> HeaderValue
resetFrame ErrorCodeId
InternalError (Stream -> StreamId
streamNumber Stream
strm)
TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ HeaderValue -> Control
CFrame HeaderValue
frame
newtype ThreadContinue = ThreadContinue (IORef Bool)
{-# INLINE newThreadContinue #-}
newThreadContinue :: IO ThreadContinue
newThreadContinue :: IO ThreadContinue
newThreadContinue = IORef Bool -> ThreadContinue
ThreadContinue (IORef Bool -> ThreadContinue)
-> IO (IORef Bool) -> IO ThreadContinue
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
True
{-# INLINE setThreadContinue #-}
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue (ThreadContinue ref :: IORef Bool
ref) x :: Bool
x = IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
ref Bool
x
{-# INLINE getThreadContinue #-}
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue (ThreadContinue ref :: IORef Bool
ref) = IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
ref
newtype StreamInfo = StreamInfo (IORef (Maybe Stream))
{-# INLINE newStreamInfo #-}
newStreamInfo :: IO StreamInfo
newStreamInfo :: IO StreamInfo
newStreamInfo = IORef (Maybe Stream) -> StreamInfo
StreamInfo (IORef (Maybe Stream) -> StreamInfo)
-> IO (IORef (Maybe Stream)) -> IO StreamInfo
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Stream -> IO (IORef (Maybe Stream))
forall a. a -> IO (IORef a)
newIORef Maybe Stream
forall a. Maybe a
Nothing
{-# INLINE clearStreamInfo #-}
clearStreamInfo :: StreamInfo -> IO ()
clearStreamInfo :: StreamInfo -> IO ()
clearStreamInfo (StreamInfo ref :: IORef (Maybe Stream)
ref) = IORef (Maybe Stream) -> Maybe Stream -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe Stream)
ref Maybe Stream
forall a. Maybe a
Nothing
{-# INLINE setStreamInfo #-}
setStreamInfo :: StreamInfo -> Stream -> IO ()
setStreamInfo :: StreamInfo -> Stream -> IO ()
setStreamInfo (StreamInfo ref :: IORef (Maybe Stream)
ref) inp :: Stream
inp = IORef (Maybe Stream) -> Maybe Stream -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe Stream)
ref (Maybe Stream -> IO ()) -> Maybe Stream -> IO ()
forall a b. (a -> b) -> a -> b
$ Stream -> Maybe Stream
forall a. a -> Maybe a
Just Stream
inp
{-# INLINE getStreamInfo #-}
getStreamInfo :: StreamInfo -> IO (Maybe Stream)
getStreamInfo :: StreamInfo -> IO (Maybe Stream)
getStreamInfo (StreamInfo ref :: IORef (Maybe Stream)
ref) = IORef (Maybe Stream) -> IO (Maybe Stream)
forall a. IORef a -> IO a
readIORef IORef (Maybe Stream)
ref