"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "services/galley/src/Galley/Intra/Push.hs" between
wire-server-2020-06-10.tar.gz and wire-server-2020-06-19.tar.gz

About: Wire (server) offers end-to-end encrypted messaging, file-sharing, video and voice calls, and guest rooms for external communication (back-end server).

Push.hs  (wire-server-2020-06-10):Push.hs  (wire-server-2020-06-19)
skipping to change at line 57 skipping to change at line 57
) )
where where
import Bilge hiding (options) import Bilge hiding (options)
import Bilge.RPC import Bilge.RPC
import Bilge.Retry import Bilge.Retry
import Control.Lens ((&), (.~), (^.), makeLenses, set, view) import Control.Lens ((&), (.~), (^.), makeLenses, set, view)
import Control.Monad.Catch import Control.Monad.Catch
import Control.Retry import Control.Retry
import Data.Aeson (Object) import Data.Aeson (Object)
import Data.Id import Data.Id (ConnId, UserId)
import qualified Data.Id as Id
import Data.IdMapping (IdMapping, MappedOrLocalId (Local, Mapped))
import Data.Json.Util import Data.Json.Util
import Data.List.Extra (chunksOf) import Data.List.Extra (chunksOf)
import Data.List.NonEmpty (nonEmpty)
import Data.List1 import Data.List1
import Data.Misc import Data.Misc
import Data.Range import Data.Range
import qualified Data.Set as Set import qualified Data.Set as Set
import Data.Text.Encoding (encodeUtf8) import Data.Text.Encoding (encodeUtf8)
import qualified Data.Text.Lazy as LT import qualified Data.Text.Lazy as LT
import Galley.App import Galley.App
import Galley.Options import Galley.Options
import Galley.Types import Galley.Types
import qualified Galley.Types.Teams as Teams import qualified Galley.Types.Teams as Teams
skipping to change at line 87 skipping to change at line 90
import Util.Options import Util.Options
data PushEvent data PushEvent
= ConvEvent Event = ConvEvent Event
| TeamEvent Teams.Event | TeamEvent Teams.Event
pushEventJson :: PushEvent -> Object pushEventJson :: PushEvent -> Object
pushEventJson (ConvEvent e) = toJSONObject e pushEventJson (ConvEvent e) = toJSONObject e
pushEventJson (TeamEvent e) = toJSONObject e pushEventJson (TeamEvent e) = toJSONObject e
data Recipient = Recipient type Recipient = RecipientBy (MappedOrLocalId Id.U)
{ _recipientUserId :: UserId,
data RecipientBy user = Recipient
{ _recipientUserId :: user,
_recipientClients :: RecipientClients _recipientClients :: RecipientClients
} }
deriving stock (Functor, Foldable, Traversable)
makeLenses ''Recipient makeLenses ''RecipientBy
recipient :: Member -> Recipient recipient :: Member -> Recipient
recipient m = Recipient (memId m) RecipientClientsAll recipient = userRecipient . memId
userRecipient :: UserId -> Recipient userRecipient :: user -> RecipientBy user
userRecipient u = Recipient u RecipientClientsAll userRecipient u = Recipient u RecipientClientsAll
data Push = Push type Push = PushTo (MappedOrLocalId Id.U)
data PushTo user = Push
{ _pushConn :: Maybe ConnId, { _pushConn :: Maybe ConnId,
_pushTransient :: Bool, _pushTransient :: Bool,
_pushRoute :: Gundeck.Route, _pushRoute :: Gundeck.Route,
_pushNativePriority :: Maybe Gundeck.Priority, _pushNativePriority :: Maybe Gundeck.Priority,
_pushAsync :: Bool, _pushAsync :: Bool,
pushOrigin :: UserId, pushOrigin :: UserId,
pushRecipients :: List1 Recipient, pushRecipients :: List1 (RecipientBy user),
pushJson :: Object, pushJson :: Object,
pushRecipientListType :: Teams.ListType pushRecipientListType :: Teams.ListType
} }
deriving stock (Functor, Foldable, Traversable)
makeLenses ''Push makeLenses ''PushTo
newPush1 :: Teams.ListType -> UserId -> PushEvent -> List1 Recipient -> Push newPush1 :: Teams.ListType -> UserId -> PushEvent -> List1 Recipient -> Push
newPush1 recipientListType from e rr = newPush1 recipientListType from e rr =
Push Push
{ _pushConn = Nothing, { _pushConn = Nothing,
_pushTransient = False, _pushTransient = False,
_pushRoute = Gundeck.RouteAny, _pushRoute = Gundeck.RouteAny,
_pushNativePriority = Nothing, _pushNativePriority = Nothing,
_pushAsync = False, _pushAsync = False,
pushRecipientListType = recipientListType, pushRecipientListType = recipientListType,
skipping to change at line 141 skipping to change at line 150
-- | Asynchronously send a single push, chunking it into multiple -- | Asynchronously send a single push, chunking it into multiple
-- requests if there are more than 128 recipients. -- requests if there are more than 128 recipients.
push1 :: Push -> Galley () push1 :: Push -> Galley ()
push1 p = push (list1 p []) push1 p = push (list1 p [])
pushSome :: [Push] -> Galley () pushSome :: [Push] -> Galley ()
pushSome [] = return () pushSome [] = return ()
pushSome (x : xs) = push (list1 x xs) pushSome (x : xs) = push (list1 x xs)
push :: List1 Push -> Galley ()
push ps = do
let (localPushes, remotePushes) = foldMap (bimap toList toList . splitPush) (t
oList ps)
traverse_ (pushLocal . List1) (nonEmpty localPushes)
traverse_ (pushRemote . List1) (nonEmpty remotePushes)
where
splitPush :: Push -> (Maybe (PushTo UserId), Maybe (PushTo (IdMapping Id.U))
)
splitPush p =
(mkPushTo localRecipients p, mkPushTo remoteRecipients p)
where
(localRecipients, remoteRecipients) =
partitionEithers . fmap localOrRemoteRecipient . toList $ pushRecipien
ts p
--
localOrRemoteRecipient :: RecipientBy (MappedOrLocalId Id.U) -> Either (Reci
pientBy UserId) (RecipientBy (IdMapping Id.U))
localOrRemoteRecipient rcp = case _recipientUserId rcp of
Local localId -> Left $ rcp {_recipientUserId = localId}
Mapped idMapping -> Right $ rcp {_recipientUserId = idMapping}
--
mkPushTo :: [RecipientBy a] -> PushTo b -> Maybe (PushTo a)
mkPushTo recipients p =
nonEmpty recipients <&> \nonEmptyRecipients ->
p {pushRecipients = List1 nonEmptyRecipients}
-- | Asynchronously send multiple pushes, aggregating them into as -- | Asynchronously send multiple pushes, aggregating them into as
-- few requests as possible, such that no single request targets -- few requests as possible, such that no single request targets
-- more than 128 recipients. -- more than 128 recipients.
push :: List1 Push -> Galley () pushLocal :: List1 (PushTo UserId) -> Galley ()
push ps = do pushLocal ps = do
limit <- fanoutLimit limit <- fanoutLimit
-- Do not fan out for very large teams -- Do not fan out for very large teams
let (async, sync) = partition _pushAsync (removeIfLargeFanout limit $ toList p s) let (async, sync) = partition _pushAsync (removeIfLargeFanout limit $ toList p s)
forM_ (pushes async) $ gundeckReq >=> callAsync "gundeck" forM_ (pushes async) $ gundeckReq >=> callAsync "gundeck"
void $ mapConcurrently (gundeckReq >=> call "gundeck") (pushes sync) void $ mapConcurrently (gundeckReq >=> call "gundeck") (pushes sync)
return () return ()
where where
pushes = fst . foldr chunk ([], 0) pushes = fst . foldr chunk ([], 0)
chunk p (pss, !n) = chunk p (pss, !n) =
let r = recipientList p let r = recipientList p
skipping to change at line 184 skipping to change at line 216
Gundeck.recipient (_recipientUserId r) (_pushRoute p) Gundeck.recipient (_recipientUserId r) (_pushRoute p)
& Gundeck.recipientClients .~ _recipientClients r & Gundeck.recipientClients .~ _recipientClients r
-- Ensure that under no circumstances we exceed the threshold -- Ensure that under no circumstances we exceed the threshold
removeIfLargeFanout limit = removeIfLargeFanout limit =
filter filter
( \p -> ( \p ->
(pushRecipientListType p == Teams.ListComplete) (pushRecipientListType p == Teams.ListComplete)
&& (length (pushRecipients p) <= (fromIntegral $ fromRange limit)) && (length (pushRecipients p) <= (fromIntegral $ fromRange limit))
) )
-- instead of IdMapping, we could also just take qualified IDs
pushRemote :: List1 (PushTo (IdMapping Id.U)) -> Galley ()
pushRemote _ps = do
-- FUTUREWORK(federation, #1261): send these to the other backends
pure ()
----------------------------------------------------------------------------- -----------------------------------------------------------------------------
-- Helpers -- Helpers
gundeckReq :: [Gundeck.Push] -> Galley (Request -> Request) gundeckReq :: [Gundeck.Push] -> Galley (Request -> Request)
gundeckReq ps = do gundeckReq ps = do
o <- view options o <- view options
return $ return $
host (encodeUtf8 $ o ^. optGundeck . epHost) host (encodeUtf8 $ o ^. optGundeck . epHost)
. port (portNumber $ fromIntegral (o ^. optGundeck . epPort)) . port (portNumber $ fromIntegral (o ^. optGundeck . epPort))
. method POST . method POST
 End of changes. 14 change blocks. 
11 lines changed or deleted 53 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)