Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: peer automatically injects existing task metadata to scheduler upon restart #3385

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,20 @@ var (
Help: "Counter of the number of failed of the leaving host.",
})

AnnouncePeersCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_peers_total",
Help: "Counter of the number of the announcing peers.",
})

AnnouncePeersFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_peers_failure_total",
Help: "Counter of the number of failed of the announcing peers.",
})

SyncProbesCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Expand Down
9 changes: 8 additions & 1 deletion scheduler/rpcserver/scheduler_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,16 @@ func (s *schedulerServerV2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesSe
return nil
}

// TODO Implement the following methods.
// AnnouncePeers announces peers to scheduler.
func (s *schedulerServerV2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) error {
// Collect AnnouncePeersCount metrics.
metrics.AnnouncePeersCount.Inc()
if err := s.service.AnnouncePeers(stream); err != nil {
// Collect AnnouncePeersFailureCount metrics.
metrics.AnnouncePeersFailureCount.Inc()
return err
}

return nil
}

Expand Down
151 changes: 145 additions & 6 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
for {
select {
case <-ctx.Done():
logger.Info("context was done")
logger.Info("announce peer context was done")
return ctx.Err()
default:
}
Expand Down Expand Up @@ -141,29 +141,29 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest:
downloadPeerFinishedRequest := announcePeerRequest.DownloadPeerFinishedRequest
log.Infof("receive DownloadPeerFinishedRequest, content length: %d, piece count: %d", downloadPeerFinishedRequest.GetContentLength(), downloadPeerFinishedRequest.GetPieceCount())
// Notice: Handler uses context.Background() to avoid stream cancel by dfdameon.
// Notice: Handler uses context.Background() to avoid stream cancel by dfdaemon.
if err := v.handleDownloadPeerFinishedRequest(context.Background(), req.GetPeerId()); err != nil {
log.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest:
downloadPeerBackToSourceFinishedRequest := announcePeerRequest.DownloadPeerBackToSourceFinishedRequest
log.Infof("receive DownloadPeerBackToSourceFinishedRequest, content length: %d, piece count: %d", downloadPeerBackToSourceFinishedRequest.GetContentLength(), downloadPeerBackToSourceFinishedRequest.GetPieceCount())
// Notice: Handler uses context.Background() to avoid stream cancel by dfdameon.
// Notice: Handler uses context.Background() to avoid stream cancel by dfdaemon.
if err := v.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), req.GetPeerId(), downloadPeerBackToSourceFinishedRequest); err != nil {
log.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFailedRequest:
log.Infof("receive DownloadPeerFailedRequest, description: %s", announcePeerRequest.DownloadPeerFailedRequest.GetDescription())
// Notice: Handler uses context.Background() to avoid stream cancel by dfdameon.
// Notice: Handler uses context.Background() to avoid stream cancel by dfdaemon.
if err := v.handleDownloadPeerFailedRequest(context.Background(), req.GetPeerId()); err != nil {
log.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFailedRequest:
log.Infof("receive DownloadPeerBackToSourceFailedRequest, description: %s", announcePeerRequest.DownloadPeerBackToSourceFailedRequest.GetDescription())
// Notice: Handler uses context.Background() to avoid stream cancel by dfdameon.
// Notice: Handler uses context.Background() to avoid stream cancel by dfdaemon.
if err := v.handleDownloadPeerBackToSourceFailedRequest(context.Background(), req.GetPeerId()); err != nil {
log.Error(err)
return err
Expand Down Expand Up @@ -862,6 +862,41 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {
}
}

// AnnouncePeers announces peers to scheduler at startup.
func (v *V2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) error {
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()

announcePeersCount := 0

for {
select {
case <-ctx.Done():
logger.Info("announce peers context was done")
return ctx.Err()
default:
}

request, err := stream.Recv()
if err != nil {
if err == io.EOF {
logger.Infof("announce %d peers", announcePeersCount)
return nil
}

logger.Errorf("receive error: %s", err.Error())
return err
}

peers, err := v.handleAnnouncePeersRequest(ctx, request)
if err != nil {
logger.Error(err)
return err
}
announcePeersCount += len(peers)
}
}

// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error {
// Handle resource included host, task, and peer.
Expand Down Expand Up @@ -1363,7 +1398,10 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An
// Store new peer or load peer.
peer, loaded := v.resource.PeerManager().Load(peerID)
if !loaded {
options := []resource.PeerOption{resource.WithPriority(download.GetPriority()), resource.WithAnnouncePeerStream(stream)}
options := []resource.PeerOption{resource.WithPriority(download.GetPriority())}
if stream != nil {
options = append(options, resource.WithAnnouncePeerStream(stream))
}
if download.GetRange() != nil {
options = append(options, resource.WithRange(http.Range{Start: int64(download.Range.GetStart()), Length: int64(download.Range.GetLength())}))
}
Expand Down Expand Up @@ -1449,3 +1487,104 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, taskID string, download

return nil
}

// handleAnnouncePeersRequest handles AnnouncePeersRequest.
func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv2.AnnouncePeersRequest) (peers []*resource.Peer, err error) {
for _, p := range request.Peers {
hostID := p.GetHost().GetId()
peerTask := p.GetTask()
if peerTask == nil {
return nil, status.Error(codes.InvalidArgument, "request is invalid and doesn't contain a task")
}
taskID := peerTask.GetId()
peerID := p.GetId()
download := &commonv2.Download{
PieceLength: &peerTask.PieceLength,
Digest: peerTask.Digest,
Url: peerTask.GetUrl(),
Tag: peerTask.Tag,
Application: peerTask.Application,
Type: peerTask.GetType(),
FilteredQueryParams: peerTask.GetFilteredQueryParams(),
RequestHeader: peerTask.GetRequestHeader(),
Priority: p.GetPriority(),
Range: p.GetRange(),
}

_, task, peer, err := v.handleResource(ctx, nil, hostID, taskID, peerID, download)
if err != nil {
return nil, err
}

// If the task dag size exceeds the limit,
// then set the peer state to PeerStateLeave.
if task.PeerCount() > resource.PeerCountLimitForTask || task.FSM.Is(resource.TaskEventLeave) {
peer.Log.Info("task dag size exceeds the limit, causing the peer to leave")
if err := peer.FSM.Event(context.Background(), resource.PeerEventLeave); err != nil {
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return nil, status.Error(codes.Internal, err.Error())
}

v.resource.PeerManager().Delete(peer.ID)
peer.Log.Info("peer has been reclaimed")
continue
}

// If the task state is not TaskStateSucceeded,
// advance the task state to TaskStateSucceeded.
if !task.FSM.Is(resource.TaskStateSucceeded) {
if task.FSM.Can(resource.TaskEventDownload) {
task.FSM.SetState(resource.TaskStateRunning)
}

// Construct piece.
for _, pieceInfo := range p.Pieces {
piece := &resource.Piece{
Number: int32(pieceInfo.GetNumber()),
ParentID: pieceInfo.GetParentId(),
Offset: pieceInfo.GetOffset(),
Length: pieceInfo.GetLength(),
TrafficType: commonv2.TrafficType_LOCAL_PEER,
Cost: 0,
CreatedAt: time.Now(),
}

if len(pieceInfo.GetDigest()) > 0 {
d, err := digest.Parse(pieceInfo.GetDigest())
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}

piece.Digest = d
}

// Update peer.UpdatedAt to prevent the peer from being GC.
peer.StorePiece(piece)
peer.FinishedPieces.Set(uint(piece.Number))
peer.AppendPieceCost(piece.Cost)
peer.PieceUpdatedAt.Store(time.Now())
peer.UpdatedAt.Store(time.Now())

// Handle the task.
task.StorePiece(piece)
task.UpdatedAt.Store(time.Now())
}

if peer.Range == nil && !peer.Task.FSM.Is(resource.TaskStateSucceeded) {
peer.Task.ContentLength.Store(int64(len(p.Pieces)))
peer.Task.TotalPieceCount.Store(int32(task.ContentLength.Load()))
peer.Task.FSM.SetState(resource.TaskStateSucceeded)
}
}

// If the peer state is not PeerStateSucceeded,
// advance the peer state to PeerStateSucceeded.
if !peer.FSM.Is(resource.PeerStateSucceeded) {
peer.FSM.SetState(resource.PeerStateSucceeded)
}

peers = append(peers, peer)
}

return peers, nil
}
Loading
Loading