Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start of node compatible socket interface #1097

Merged
merged 2 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ library
build-depends:
, cardano-api >=1.35
, cardano-ledger-alonzo
, cardano-ledger-babbage
, cardano-ledger-core
, cardano-ledger-shelley
, cardano-ledger-shelley-ma
, cardano-slotting
, iohk-monitoring
, ouroboros-consensus
, ouroboros-consensus-cardano
, ouroboros-consensus-shelley
, ouroboros-network
, ouroboros-network-framework
, plutus-core >=1.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@ module Cardano.Node.Socket.Emulator.Chain where

import Cardano.Node.Emulator.Internal.Node (Params)
import Cardano.Node.Emulator.Internal.Node.Chain qualified as EC
import Cardano.Protocol.Socket.Type (BlockId (..), Tip, blockId)
import Control.Concurrent
import Control.Concurrent.STM
import Control.Lens hiding (index)
import Control.Monad.Freer
import Control.Monad.Freer.Extras.Log (LogMsg)
import Control.Monad.Freer.State (State, gets, modify)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Coerce (coerce)
import Data.Foldable (traverse_)
import Data.Functor (void)
import Data.Maybe (listToMaybe)
import GHC.Generics (Generic)
import Ledger (Block, CardanoTx, Slot (..))
import Ledger.Index qualified as Index
import Ouroboros.Consensus.HardFork.Combinator.AcrossEras (OneEraHash (..))
import Ouroboros.Network.Block qualified as O

type TxPool = [CardanoTx]

Expand All @@ -35,7 +38,7 @@ data MockNodeServerChainState = MockNodeServerChainState
, _index :: Index.UtxoIndex
, _currentSlot :: Slot
, _channel :: TChan Block
, _tip :: Maybe Block
, _tip :: Tip
} deriving (Generic)

makeLenses ''MockNodeServerChainState
Expand All @@ -51,7 +54,7 @@ instance Show MockNodeServerChainState where
emptyChainState :: MonadIO m => m MockNodeServerChainState
emptyChainState = do
chan <- liftIO . atomically $ newTChan
pure $ MockNodeServerChainState [] mempty 0 chan Nothing
pure $ MockNodeServerChainState [] mempty 0 chan O.TipGenesis

getChannel :: MonadIO m => MVar MockNodeServerChainState -> m (TChan Block)
getChannel mv = liftIO (readMVar mv) <&> view channel
Expand All @@ -66,17 +69,13 @@ fromEmulatorChainState EC.ChainState {EC._txPool, EC._index, EC._chainCurrentSlo
, _txPool = _txPool
, _index = _index
, _currentSlot = _chainCurrentSlot
, _tip = listToMaybe _chainNewestFirst
, _tip = O.TipGenesis
koslambrou marked this conversation as resolved.
Show resolved Hide resolved
}

-- Get the current tip or wait for one if there are no blocks.
getTip :: forall m. MonadIO m => MVar MockNodeServerChainState -> m Block
-- Get the current tip.
getTip :: forall m. MonadIO m => MVar MockNodeServerChainState -> m Tip
getTip mvChainState = liftIO $ readMVar mvChainState >>= \case
MockNodeServerChainState { _tip = Just tip' } -> pure tip'
MockNodeServerChainState { _channel } -> do
-- Wait for the initial block.
void $ liftIO $ atomically $ peekTChan _channel
getTip mvChainState
MockNodeServerChainState { _tip } -> pure _tip

handleControlChain ::
( Member (State MockNodeServerChainState) effs
Expand All @@ -94,7 +93,7 @@ handleControlChain params = \case
let EC.ValidatedBlock block events idx' = EC.validateBlock params slot idx pool

modify $ txPool .~ []
modify $ tip ?~ block
modify $ tip .~ O.Tip (fromIntegral slot) (coerce $ blockId block) (fromIntegral slot)
modify $ index .~ idx'

traverse_ EC.logEvent events
Expand All @@ -117,12 +116,12 @@ addTxToPool = (:)

-- | Fetch the currently stored chain by iterating over the channel until
-- there is nothing left to be returned.
chainNewestFirst :: forall m. MonadIO m => TChan Block -> m [Block]
chainNewestFirst :: forall m b. MonadIO m => TChan b -> m [b]
chainNewestFirst ch = do
localChannel <- liftIO $ atomically $ cloneTChan ch
go localChannel []
where
go :: TChan Block -> [Block] -> m [Block]
go :: TChan b -> [b] -> m [b]
go local acc =
(liftIO $ atomically $ tryReadTChan local) >>= \case
Nothing -> pure acc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ module Cardano.Node.Socket.Emulator.Server (ServerHandler, runServerNode, proces

import Cardano.BM.Data.Trace (Trace)
import Data.ByteString.Lazy qualified as LBS
import Data.Coerce (coerce)
import Data.List (intersect)
import Data.Maybe (listToMaybe)
import Data.SOP.Strict (NS (S, Z))
import Data.Void (Void)

import Control.Concurrent
Expand All @@ -32,7 +34,14 @@ import Ouroboros.Network.Protocol.LocalTxSubmission.Type qualified as TxSubmissi
import Plutus.Monitoring.Util qualified as LM

import Cardano.Slotting.Slot (SlotNo (..), WithOrigin (..))
import Ouroboros.Consensus.Cardano.Block (CardanoBlock)
import Ouroboros.Consensus.HardFork.Combinator qualified as Consensus
import Ouroboros.Consensus.Ledger.SupportsMempool (ApplyTxErr)
import Ouroboros.Consensus.Shelley.Eras (StandardCrypto)
import Ouroboros.Consensus.Shelley.Ledger qualified as Consensus
import Ouroboros.Consensus.Shelley.Ledger qualified as Shelley
import Ouroboros.Network.Block (Point (..), pointSlot)
import Ouroboros.Network.Block qualified as O
import Ouroboros.Network.IOManager
import Ouroboros.Network.Mux
import Ouroboros.Network.NodeToClient (NodeToClientProtocols (..), nodeToClientCodecCBORTerm,
Expand All @@ -44,13 +53,14 @@ import Ouroboros.Network.Snocket
import Ouroboros.Network.Socket

import Cardano.Api qualified as C
import Cardano.Api.Shelley qualified as C

import Cardano.Protocol.Socket.Type

import Cardano.Node.Emulator.Internal.Node (ChainEvent, Params)
import Cardano.Node.Emulator.Internal.Node.Chain qualified as Chain
import Cardano.Node.Socket.Emulator.Chain (MockNodeServerChainState (..), addTxToPool, chainNewestFirst, channel,
currentSlot, getChannel, getTip, handleControlChain, tip, txPool)
currentSlot, getChannel, getTip, handleControlChain, txPool)
import Ledger (Block, CardanoTx (..), Slot (..))

data CommandChannel = CommandChannel
Expand Down Expand Up @@ -206,9 +216,10 @@ runChainSync = flip runReaderT
intersection. -}
idleState ::
( MonadReader (MVar MockNodeServerChainState) m
, MonadIO m )
, MonadIO m
, block ~ CardanoBlock StandardCrypto)
=> LocalChannel
-> m (ServerStIdle Block (Point Block) Tip m ())
-> m (ServerStIdle block (Point block) Tip m ())
idleState channel' =
pure ServerStIdle {
recvMsgRequestNext = nextState channel',
Expand All @@ -221,32 +232,35 @@ idleState channel' =
next block (Nothing/Right branch) -}
nextState ::
( MonadReader (MVar MockNodeServerChainState) m
, MonadIO m )
, MonadIO m
, block ~ CardanoBlock StandardCrypto)
=> LocalChannel
-> m (Either (ServerStNext Block (Point Block) Tip m ())
(m (ServerStNext Block (Point Block) Tip m ())))
-> m (Either (ServerStNext block (Point block) Tip m ())
(m (ServerStNext block (Point block) Tip m ())))
nextState localChannel@(LocalChannel channel') = do
chainState <- ask
tip' <- getTip chainState
let blockHeader = undefined -- TODO
(liftIO . atomically $ tryReadTChan channel') >>= \case
Nothing -> do
Right . pure <$> do
nextBlock <- liftIO . atomically $ readTChan channel'
liftIO $ modifyMVar_ chainState (pure . (tip ?~ nextBlock))
sendRollForward localChannel tip' nextBlock
-- liftIO $ modifyMVar_ chainState (pure . (tip ?~ nextBlock))
sendRollForward localChannel tip' $ toCardanoBlock blockHeader nextBlock
Just nextBlock -> do
liftIO $ modifyMVar_ chainState (pure . (tip ?~ nextBlock))
Left <$> sendRollForward localChannel tip' nextBlock
-- liftIO $ modifyMVar_ chainState (pure . (tip ?~ nextBlock))
Left <$> sendRollForward localChannel tip' (toCardanoBlock blockHeader nextBlock)

{- This protocol state will search for a block intersection
with some client provided blocks. When an intersection is found
the client state is reset to the new offset (the Just branch)
or to the genesis block if no intersection was found. -}
findIntersect ::
( MonadReader (MVar MockNodeServerChainState) m
, MonadIO m )
=> [Point Block]
-> m (ServerStIntersect Block (Point Block) Tip m ())
, MonadIO m
, block ~ CardanoBlock StandardCrypto)
=> [Point block]
-> m (ServerStIntersect block (Point block) Tip m ())
findIntersect clientPoints = do
mvState <- ask
chainState <- liftIO $ readMVar mvState
Expand All @@ -258,7 +272,7 @@ findIntersect clientPoints = do
pure $ case point of
Nothing ->
SendMsgIntersectNotFound
tip'
O.TipGenesis
-- No intersection found. Resume from origin.
(ChainSyncServer $ cloneChainFrom 0 >>= idleState)
Just point' ->
Expand All @@ -271,11 +285,12 @@ findIntersect clientPoints = do
{- This is a wrapper around the creation of a `ServerStNext` -}
sendRollForward ::
( MonadReader (MVar MockNodeServerChainState) m
, MonadIO m )
, MonadIO m
, block ~ CardanoBlock StandardCrypto)
=> LocalChannel
-> Block -- tip
-> Block -- current
-> m (ServerStNext Block (Point Block) Tip m ())
-> Tip -- tip
-> block -- current
-> m (ServerStNext block (Point block) Tip m ())
sendRollForward channel' tip' current = pure $
SendMsgRollForward
current
Expand All @@ -287,8 +302,9 @@ sendRollForward channel' tip' current = pure $
makes more sense to start in the `StIntersect` state. -}
chainSyncServer ::
( MonadReader (MVar MockNodeServerChainState) m
, MonadIO m )
=> ChainSyncServer Block (Point Block) Tip m ()
, MonadIO m
, block ~ CardanoBlock StandardCrypto)
=> ChainSyncServer block (Point block) Tip m ()
chainSyncServer =
ChainSyncServer (cloneChainFrom 0 >>= idleState)

Expand Down Expand Up @@ -325,8 +341,8 @@ cloneChainFrom offset = LocalChannel <$> go

hoistChainSync ::
MonadReader (MVar MockNodeServerChainState) m
=> ChainSyncServer Block (Point Block) Tip ChainSyncMonad a
-> m (ChainSyncServer Block (Point Block) Tip IO a)
=> ChainSyncServer block (Point block) Tip ChainSyncMonad a
-> m (ChainSyncServer block (Point block) Tip IO a)
hoistChainSync machine = do
internalState <- ask
pure ChainSyncServer {
Expand All @@ -339,8 +355,8 @@ hoistChainSync machine = do

hoistStIdle ::
MonadReader (MVar MockNodeServerChainState) m
=> ServerStIdle Block (Point Block) Tip ChainSyncMonad a
-> m (ServerStIdle Block (Point Block) Tip IO a)
=> ServerStIdle block (Point block) Tip ChainSyncMonad a
-> m (ServerStIdle block (Point block) Tip IO a)
hoistStIdle (ServerStIdle nextState' findIntersect' done) = do
internalState <- ask
pure ServerStIdle {
Expand All @@ -357,17 +373,17 @@ hoistStIdle (ServerStIdle nextState' findIntersect' done) = do

hoistStIntersect ::
MonadReader (MVar MockNodeServerChainState) m
=> ServerStIntersect Block (Point Block) Tip ChainSyncMonad a
-> m (ServerStIntersect Block (Point Block) Tip IO a)
=> ServerStIntersect block (Point block) Tip ChainSyncMonad a
-> m (ServerStIntersect block (Point block) Tip IO a)
hoistStIntersect (SendMsgIntersectFound point tip' nextState') =
SendMsgIntersectFound point tip' <$> hoistChainSync nextState'
hoistStIntersect (SendMsgIntersectNotFound tip' nextState') =
SendMsgIntersectNotFound tip' <$> hoistChainSync nextState'

hoistStNext ::
MonadReader (MVar MockNodeServerChainState) m
=> ServerStNext Block (Point Block) Tip ChainSyncMonad a
-> m (ServerStNext Block (Point Block) Tip IO a)
=> ServerStNext block (Point block) Tip ChainSyncMonad a
-> m (ServerStNext block (Point block) Tip IO a)
hoistStNext (SendMsgRollForward header tip' nextState') =
SendMsgRollForward header tip' <$> hoistChainSync nextState'
hoistStNext (SendMsgRollBackward header tip' nextState') =
Expand Down Expand Up @@ -440,43 +456,51 @@ txSubmission mvChainState =
-- * Computing intersections

-- Given a `Point` find its offset into the chain.
pointOffset :: Point Block
pointOffset :: Point block
-> Integer
pointOffset pt =
case pointSlot pt of
Origin -> 0
At (SlotNo s) -> fromIntegral s

-- Currently selects all points from the blockchain.
getChainPoints :: MonadIO m => TChan Block -> MockNodeServerChainState -> m [Point Block]
getChainPoints
:: forall m block. (MonadIO m, block ~ CardanoBlock StandardCrypto)
=> TChan Block -> MockNodeServerChainState -> m [Point block]
getChainPoints ch st = do
chain <- chainNewestFirst ch
pure $ zipWith mkPoint
[st ^. currentSlot, st ^. currentSlot - 1 .. 0]
chain
where
mkPoint :: Slot -> Block -> Point Block
mkPoint (Slot s) block =
Point (At (OP.Block (SlotNo $ fromIntegral s)
(blockId block)))
mkPoint :: Slot -> Block -> Point block
mkPoint s block =
Point (At (OP.Block (fromIntegral s)
(coerce $ blockId block)))

-- * TxSubmission protocol

{- I did not use the same approach for this protocol as I did
for the `ChainSync`. This protocol has only one state and
it is much simpler. -}

txSubmissionServer ::
MVar MockNodeServerChainState
-> TxSubmission.LocalTxSubmissionServer (C.Tx C.BabbageEra) String IO ()
txSubmissionServer :: forall block. (block ~ CardanoBlock StandardCrypto)
=> MVar MockNodeServerChainState
-> TxSubmission.LocalTxSubmissionServer (Shelley.GenTx block) (ApplyTxErr block) IO ()
txSubmissionServer state = txSubmissionState
where
txSubmissionState :: TxSubmission.LocalTxSubmissionServer (C.Tx C.BabbageEra) String IO ()
txSubmissionState :: TxSubmission.LocalTxSubmissionServer (Shelley.GenTx block) (ApplyTxErr block) IO ()
txSubmissionState =
TxSubmission.LocalTxSubmissionServer {
TxSubmission.recvMsgSubmitTx =
\tx -> do
modifyMVar_ state (pure . over txPool (addTxToPool (CardanoEmulatorEraTx tx)))
case tx of
(Consensus.HardForkGenTx (Consensus.OneEraGenTx (S (S (S (S (S (Z tx')))))))) -> do
let Consensus.ShelleyTx _txid shelleyEraTx = tx'
let ctx = CardanoEmulatorEraTx (C.ShelleyTx C.ShelleyBasedEraBabbage shelleyEraTx)
_ <- modifyMVar_ state (pure . over txPool (addTxToPool ctx))
pure ()
_ -> pure ()
return (TxSubmission.SubmitSuccess, txSubmissionState)
, TxSubmission.recvMsgDone = ()
}
11 changes: 7 additions & 4 deletions plutus-chain-index-core/plutus-chain-index-core.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,22 @@ library
-- Other IOG dependencies
--------------------------
build-depends:
, cardano-api >=1.35
, cardano-api >=1.35
, cardano-ledger-byron
, cardano-ledger-core
, cardano-ledger-shelley
, io-classes
, iohk-monitoring
, ouroboros-consensus
, ouroboros-consensus-byron
, ouroboros-consensus-cardano
, ouroboros-consensus-protocol
, ouroboros-consensus-shelley
, ouroboros-network
, ouroboros-network-framework
, plutus-core >=1.0.0
, plutus-ledger-api >=1.0.0
, plutus-tx >=1.0.0
, plutus-core >=1.0.0
, plutus-ledger-api >=1.0.0
, plutus-tx >=1.0.0
, typed-protocols

------------------------
Expand Down Expand Up @@ -139,6 +141,7 @@ library
, servant-swagger-ui
, sqlite-simple
, stm
, strict-containers
, text
, text-class
, unordered-containers
Expand Down
Loading