Skip to content

Commit

Permalink
fix race condition with repo add files
Browse files Browse the repository at this point in the history
Do all relevant database reading/modifying inside `maybeRunTaskInBackground`.

Notably, `LoadComplete` will load the reflist of a repo. if this is done outside of a background operation,
the data might be outdated when the background tasks runs.
  • Loading branch information
neolynx committed Oct 21, 2024
1 parent a771707 commit 4a5703c
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 68 deletions.
46 changes: 29 additions & 17 deletions api/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,6 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
}

resources = append(resources, string(snapshot.ResourceKey()))
err = snapshotCollection.LoadComplete(snapshot)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to publish: %s", err))
return
}

sources = append(sources, snapshot)
}
Expand All @@ -259,11 +254,6 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
}

resources = append(resources, string(localRepo.Key()))
err = localCollection.LoadComplete(localRepo)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to publish: %s", err))
}

sources = append(sources, localRepo)
}
} else {
Expand All @@ -275,14 +265,15 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
if b.MultiDist != nil {
multiDist = *b.MultiDist
}

collection := collectionFactory.PublishedRepoCollection()

published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, multiDist)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to publish: %s", err))
return
}

collection := collectionFactory.PublishedRepoCollection()

resources = append(resources, string(published.Key()))
taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"",
b.SourceKind, published.StoragePrefix(), published.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`))
Expand All @@ -295,6 +286,22 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
PublishDetail: taskDetail,
}

for _, source := range sources {
switch s := source.(type) {
case *deb.Snapshot:
snapshotCollection := collectionFactory.SnapshotCollection()
err = snapshotCollection.LoadComplete(s)
case *deb.LocalRepo:
localCollection := collectionFactory.LocalRepoCollection()
err = localCollection.LoadComplete(s)
default:
err = fmt.Errorf("unexpected type for source: %T", source)
}
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
}
}

if b.Origin != "" {
published.Origin = b.Origin
}
Expand Down Expand Up @@ -402,11 +409,6 @@ func apiPublishUpdateSwitch(c *gin.Context) {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
return
}
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
return
}

if published.SourceKind == deb.SourceLocalRepo {
if len(b.Snapshots) > 0 {
Expand Down Expand Up @@ -445,6 +447,10 @@ func apiPublishUpdateSwitch(c *gin.Context) {
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("Unable to update: %s", err)
}
result, err := published.Update(collectionFactory, out)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("Unable to update: %s", err)
Expand Down Expand Up @@ -551,6 +557,7 @@ func apiPublishAddSource(c *gin.Context) {
return
}

// FIXME
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to create: %s", err))
Expand Down Expand Up @@ -663,6 +670,7 @@ func apiPublishSetSources(c *gin.Context) {
return
}

// FIXME
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
Expand Down Expand Up @@ -722,6 +730,7 @@ func apiPublishDropChanges(c *gin.Context) {
return
}

// FIXME
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
Expand Down Expand Up @@ -772,6 +781,7 @@ func apiPublishUpdateSource(c *gin.Context) {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
return
}
// FIXME

err = collection.LoadComplete(published, collectionFactory)
if err != nil {
Expand Down Expand Up @@ -842,6 +852,7 @@ func apiPublishRemoveSource(c *gin.Context) {
return
}

// FIXME
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
Expand Down Expand Up @@ -929,6 +940,7 @@ func apiPublishUpdate(c *gin.Context) {
return
}

// FIXME
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
Expand Down
54 changes: 24 additions & 30 deletions api/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,14 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
return
}

err = collection.LoadComplete(repo)
if err != nil {
AbortWithJSONError(c, 500, err)
return
}

resources := []string{string(repo.Key())}

maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.LoadComplete(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}

out.Printf("Loading packages...\n")
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
if err != nil {
Expand Down Expand Up @@ -360,12 +360,6 @@ func apiReposPackageFromDir(c *gin.Context) {
return
}

err = collection.LoadComplete(repo)
if err != nil {
AbortWithJSONError(c, 500, err)
return
}

var taskName string
var sources []string
if fileParam == "" {
Expand All @@ -379,6 +373,11 @@ func apiReposPackageFromDir(c *gin.Context) {
resources := []string{string(repo.Key())}
resources = append(resources, sources...)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.LoadComplete(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}

verifier := context.GetVerifier()

var (
Expand Down Expand Up @@ -480,17 +479,7 @@ func apiReposCopyPackage(c *gin.Context) {
return
}

err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
if err != nil {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("dest repo error: %s", err))
return
}

var (
srcRefList *deb.PackageRefList
srcRepo *deb.LocalRepo
)

var srcRepo *deb.LocalRepo
srcRepo, err = collectionFactory.LocalRepoCollection().ByName(srcRepoName)
if err != nil {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("src repo error: %s", err))
Expand All @@ -502,17 +491,22 @@ func apiReposCopyPackage(c *gin.Context) {
return
}

err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
if err != nil {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("src repo error: %s", err))
return
}

srcRefList = srcRepo.RefList()
taskName := fmt.Sprintf("Copy packages from repo %s to repo %s", srcRepoName, dstRepoName)
resources := []string{string(dstRepo.Key()), string(srcRepo.Key())}

maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
}

err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
}

srcRefList := srcRepo.RefList()

reporter := &aptly.RecordingResultReporter{
Warnings: []string{},
AddedLines: []string{},
Expand Down
45 changes: 24 additions & 21 deletions api/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,17 @@ func apiSnapshotsCreate(c *gin.Context) {
return
}

err = snapshotCollection.LoadComplete(sources[i])
if err != nil {
AbortWithJSONError(c, 500, err)
return
}

resources = append(resources, string(sources[i].ResourceKey()))
}

maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
for i := range sources {
err = snapshotCollection.LoadComplete(sources[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
}

list := deb.NewPackageList()

// verify package refs and build package list
Expand Down Expand Up @@ -468,17 +469,20 @@ func apiSnapshotsMerge(c *gin.Context) {
return
}

err = snapshotCollection.LoadComplete(sources[i])
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, err)
return
}
resources[i] = string(sources[i].ResourceKey())
}

maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = snapshotCollection.LoadComplete(sources[0])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
result := sources[0].RefList()
for i := 1; i < len(sources); i++ {
err = snapshotCollection.LoadComplete(sources[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
result = result.Merge(sources[i].RefList(), overrideMatching, false)
}

Expand Down Expand Up @@ -566,27 +570,26 @@ func apiSnapshotsPull(c *gin.Context) {
AbortWithJSONError(c, http.StatusNotFound, err)
return
}
err = collectionFactory.SnapshotCollection().LoadComplete(toSnapshot)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, err)
return
}

// Load <Source> snapshot
sourceSnapshot, err := collectionFactory.SnapshotCollection().ByName(body.Source)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, err)
return
}
err = collectionFactory.SnapshotCollection().LoadComplete(sourceSnapshot)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, err)
return
}

resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())}
taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collectionFactory.SnapshotCollection().LoadComplete(toSnapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = collectionFactory.SnapshotCollection().LoadComplete(sourceSnapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}

// convert snapshots to package list
toPackageList, err := deb.NewPackageListFromRefList(toSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
if err != nil {
Expand Down

0 comments on commit 4a5703c

Please sign in to comment.