diff --git a/config/samples/arcadia_v1alpha1_knowledgebase.yaml b/config/samples/arcadia_v1alpha1_knowledgebase.yaml index aee810206..d73bc5dee 100644 --- a/config/samples/arcadia_v1alpha1_knowledgebase.yaml +++ b/config/samples/arcadia_v1alpha1_knowledgebase.yaml @@ -20,4 +20,4 @@ spec: name: dataset-playground-v1 namespace: arcadia paths: - - dataset/dataset-playground/v1/qa.csv + - qa.csv diff --git a/controllers/knowledgebase_controller.go b/controllers/knowledgebase_controller.go index ee479cb93..75cb02d3d 100644 --- a/controllers/knowledgebase_controller.go +++ b/controllers/knowledgebase_controller.go @@ -244,9 +244,8 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo if err = r.Get(ctx, types.NamespacedName{Name: group.Source.Name, Namespace: ns}, versionedDataset); err != nil { if apierrors.IsNotFound(err) { return errNoSource - } else { - return err } + return err } if !versionedDataset.Status.IsReady() { return errDataSourceNotReady @@ -289,23 +288,29 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo errs := make([]error, 0) for _, path := range group.Paths { - fileDatail, ok := pathMap[path] + fileDetail, ok := pathMap[path] if !ok { - fileDatail = &arcadiav1alpha1.FileDetails{ + fileDetail = &arcadiav1alpha1.FileDetails{ Path: path, Checksum: "", LastUpdateTime: metav1.Now(), Phase: arcadiav1alpha1.FileProcessPhasePending, ErrMessage: "", } - fileGroupDetail.FileDetails = append(fileGroupDetail.FileDetails, *fileDatail) + fileGroupDetail.FileDetails = append(fileGroupDetail.FileDetails, *fileDetail) + } + if versionedDataset.Spec.Dataset == nil { + err = fmt.Errorf("versionedDataset.Spec.Dataset is nil") + errs = append(errs, err) + fileDetail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) + continue } - info.Object = path + info.Object = filepath.Join("dataset", versionedDataset.Spec.Dataset.Name, versionedDataset.Spec.Version, path) stat, err := ds.StatFile(ctx, info) log.V(0).Info(fmt.Sprintf("raw StatFile:%#v", stat), "path", path) if err != nil { errs = append(errs, err) - fileDatail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) + fileDetail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) continue } @@ -314,38 +319,38 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo if !ok { err = fmt.Errorf("failed to convert stat to minio.ObjectInfo:%s", path) errs = append(errs, err) - fileDatail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) + fileDetail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) continue } - if objectStat.ETag == fileDatail.Checksum { - fileDatail.LastUpdateTime = metav1.Now() + if objectStat.ETag == fileDetail.Checksum { + fileDetail.LastUpdateTime = metav1.Now() continue } - fileDatail.Checksum = objectStat.ETag + fileDetail.Checksum = objectStat.ETag tags, err := ds.GetTags(ctx, info) if err != nil { errs = append(errs, err) - fileDatail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) + fileDetail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) continue } file, err := ds.ReadFile(ctx, info) if err != nil { errs = append(errs, err) - fileDatail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) + fileDetail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) continue } defer file.Close() if err = r.handleFile(ctx, log, file, info.Object, tags, kb, vectorStore, embedder); err != nil { if errors.Is(err, errFileSkipped) { - fileDatail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseSkipped) + fileDetail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseSkipped) continue } err = fmt.Errorf("failed to handle file:%s: %w", path, err) errs = append(errs, err) - fileDatail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) + fileDetail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) continue } - fileDatail.UpdateErr(nil, arcadiav1alpha1.FileProcessPhaseSucceeded) + fileDetail.UpdateErr(nil, arcadiav1alpha1.FileProcessPhaseSucceeded) } return utilerrors.NewAggregate(errs) }