Skip to content

Commit

Permalink
feat: peer injects existing task metadata to scheduler at startup
Browse files Browse the repository at this point in the history
Signed-off-by: BruceAko <chongzhi@hust.edu.cn>
  • Loading branch information
BruceAko committed Jul 23, 2024
1 parent 29ab308 commit 14cedf3
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 1 deletion.
14 changes: 14 additions & 0 deletions scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,20 @@ var (
Help: "Counter of the number of failed of the leaving host.",
})

AnnouncePeersCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_peers_total",
Help: "Counter of the number of the announcing peers.",
})

AnnouncePeersFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_peers_failure_total",
Help: "Counter of the number of failed of the announcing peers.",
})

SyncProbesCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Expand Down
9 changes: 8 additions & 1 deletion scheduler/rpcserver/scheduler_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,16 @@ func (s *schedulerServerV2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesSe
return nil
}

// TODO Implement the following methods.
// AnnouncePeers announces peers to scheduler.
func (s *schedulerServerV2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) error {
// Collect AnnouncePeersCount metrics.
metrics.AnnouncePeersCount.Inc()
if err := s.service.AnnouncePeers(stream); err != nil {
// Collect AnnouncePeersFailureCount metrics.
metrics.AnnouncePeersFailureCount.Inc()
return err
}

return nil
}

Expand Down
140 changes: 140 additions & 0 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,36 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {
}
}

// AnnouncePeers announces peers to scheduler at startup.
func (v *V2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) error {
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()

for {
select {
case <-ctx.Done():
logger.Info("context was done")
return ctx.Err()
default:
}

request, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}

logger.Errorf("receive error: %s", err.Error())
return err
}

if err := v.handleAnnouncePeersRequest(ctx, request); err != nil {
logger.Error(err)
return err
}
}
}

// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error {
// Handle resource included host, task, and peer.
Expand Down Expand Up @@ -1436,3 +1466,113 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, taskID string, download

return nil
}

// handleAnnouncePeersRequest handles AnnouncePeersRequest.
func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv2.AnnouncePeersRequest) error {
for _, p := range request.Peers {
hostID := p.GetHost().GetId()
peerTask := p.GetTask()
taskID := peerTask.GetId()
peerID := p.GetId()
download := &commonv2.Download{
PieceLength: peerTask.GetPieceLength(),
Digest: peerTask.Digest,
Url: peerTask.GetUrl(),
Tag: peerTask.Tag,
Application: peerTask.Application,
Type: peerTask.GetType(),
FilteredQueryParams: peerTask.GetFilteredQueryParams(),
RequestHeader: peerTask.GetRequestHeader(),
Priority: p.GetPriority(),
Range: p.GetRange(),
}

_, task, peer, err := v.handleResource(ctx, nil, hostID, taskID, peerID, download)
if err != nil {
return err
}

// If the task state is not TaskStateSucceeded,
// advance the task state to TaskStateSucceeded.
if !task.FSM.Is(resource.TaskStateSucceeded) {
if task.FSM.Can(resource.TaskEventDownload) {
if err := task.FSM.Event(ctx, resource.TaskEventDownload); err != nil {
msg := fmt.Sprintf("task fsm event failed: %s", err.Error())
peer.Log.Error(msg)
return status.Error(codes.Internal, err.Error())
}
}

// Construct piece.
for _, pieceInfo := range p.Pieces {
piece := &resource.Piece{
Number: int32(pieceInfo.GetNumber()),
ParentID: pieceInfo.GetParentId(),
Offset: pieceInfo.GetOffset(),
Length: pieceInfo.GetLength(),
TrafficType: commonv2.TrafficType_LOCAL_PEER,
Cost: 0,
CreatedAt: time.Now(),
}

if len(pieceInfo.GetDigest()) > 0 {
d, err := digest.Parse(pieceInfo.GetDigest())
if err != nil {
return status.Errorf(codes.InvalidArgument, err.Error())
}

piece.Digest = d
}

// Handle peer with piece finished request. When the piece is downloaded successfully, peer.UpdatedAt needs
// to be updated to prevent the peer from being GC during the download process.
peer.StorePiece(piece)
peer.FinishedPieces.Set(uint(piece.Number))
peer.AppendPieceCost(piece.Cost)
peer.PieceUpdatedAt.Store(time.Now())
peer.UpdatedAt.Store(time.Now())

// Handle task with piece back-to-source finished request.
task.StorePiece(piece)
task.UpdatedAt.Store(time.Now())
}

// Handle task with peer back-to-source finished request, peer can only represent
// a successful task after downloading the complete task.
if peer.Range == nil && !peer.Task.FSM.Is(resource.TaskStateSucceeded) {
peer.Task.ContentLength.Store(int64(len(p.Pieces)))
peer.Task.TotalPieceCount.Store(int32(task.ContentLength.Load()))
if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadSucceeded); err != nil {
return status.Error(codes.Internal, err.Error())
}
}
}

// If the peer state is not PeerStateSucceeded,
// advance the peer state to PeerStateSucceeded.
if !peer.FSM.Is(resource.PeerStateSucceeded) {
if peer.FSM.Is(resource.PeerStatePending) {
if err := peer.FSM.Event(ctx, resource.PeerEventRegisterNormal); err != nil {
return status.Error(codes.Internal, err.Error())
}
}

// Handle peer with peer started request.
if !peer.FSM.Is(resource.PeerStateRunning) {
if err := peer.FSM.Event(ctx, resource.PeerEventDownload); err != nil {
// Collect DownloadPeerStartedFailureCount metrics.
metrics.DownloadPeerStartedFailureCount.WithLabelValues(p.GetPriority().String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
return status.Error(codes.Internal, err.Error())
}
}

// Handle peer with peer finished request.
if err := peer.FSM.Event(ctx, resource.PeerEventDownloadSucceeded); err != nil {
return status.Error(codes.Internal, err.Error())
}
}
}

return nil
}

0 comments on commit 14cedf3

Please sign in to comment.